use crate::{
RetryPolicy,
metrics::SiftStreamMetrics,
stream::{
mode::ingestion_config::DataStream,
tasks::{CHECKPOINT_TIMEOUT, ControlMessage, DataMessage},
},
};
use sift_connect::SiftChannel;
use sift_error::prelude::*;
use sift_rs::{
CompressionEncoding, GrpcCode, GrpcStatus,
ingest::v1::ingest_service_client::IngestServiceClient,
};
use std::{
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use tokio::{sync::broadcast, time::Instant};
use uuid::Uuid;
enum CheckpointTimer {
Active(tokio::time::Interval),
Disabled,
}
impl CheckpointTimer {
async fn tick(&mut self) {
match self {
Self::Active(interval) => {
interval.tick().await;
}
Self::Disabled => std::future::pending::<()>().await,
}
}
fn reset_immediately(&mut self) {
if let Self::Active(interval) = self {
interval.reset_immediately();
}
}
}
pub(crate) struct IngestionTaskConfig {
#[allow(dead_code)]
pub(crate) session_name: String,
pub(crate) sift_stream_id: Uuid,
pub(crate) ingestion_channel: SiftChannel,
pub(crate) enable_compression_for_ingestion: bool,
pub(crate) metrics: Arc<SiftStreamMetrics>,
pub(crate) retry_policy: RetryPolicy,
pub(crate) checkpoint_interval: Option<Duration>,
}
pub(crate) struct IngestionTask {
control_tx: broadcast::Sender<ControlMessage>,
control_rx: broadcast::Receiver<ControlMessage>,
data_rx: async_channel::Receiver<DataMessage>,
config: IngestionTaskConfig,
}
impl IngestionTask {
pub(crate) fn new(
control_tx: broadcast::Sender<ControlMessage>,
control_rx: broadcast::Receiver<ControlMessage>,
data_rx: async_channel::Receiver<DataMessage>,
config: IngestionTaskConfig,
) -> Self {
Self {
control_tx,
control_rx,
data_rx,
config,
}
}
pub(crate) async fn run(&mut self) -> Result<()> {
let now = tokio::time::Instant::now();
let mut timer = match self.config.checkpoint_interval.as_ref() {
Some(c) => CheckpointTimer::Active(tokio::time::interval_at(now + *c, *c)),
None => CheckpointTimer::Disabled,
};
let mut stream_created_at = now;
let mut current_wait = Duration::ZERO;
let mut stream = None;
let first_message_id = Arc::new(AtomicU64::new(0));
let last_message_id = Arc::new(AtomicU64::new(0));
loop {
if stream.is_none() {
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"creating new stream"
);
stream_created_at = tokio::time::Instant::now();
let mut client = IngestServiceClient::new(self.config.ingestion_channel.clone());
if self.config.enable_compression_for_ingestion {
client = client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
}
let data_stream = DataStream::new(
self.data_rx.clone(),
self.control_tx.clone(),
self.config.sift_stream_id,
first_message_id.clone(),
last_message_id.clone(),
self.config.metrics.clone(),
);
stream = Some(Box::pin(async move {
tokio::time::sleep(current_wait).await;
let res = client.ingest_with_config_data_stream(data_stream).await;
res.map(|_| ())
}));
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"successfully initialized a new stream to Sift"
);
}
tokio::select! {
res = stream.as_mut().unwrap() => {
match res {
Ok(_) => {
self.config.metrics.grpc_status_counts[0].increment();
self.config.metrics.cur_retry_count.set(0);
current_wait = Duration::ZERO;
}
Err(e) => {
current_wait = self.handle_failed_stream(&e, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
self.control_tx.send(ControlMessage::CheckpointComplete { first_message_id: first_message_id.load(Ordering::Relaxed), last_message_id: last_message_id.load(Ordering::Relaxed) }).map_err(|e| Error::new(ErrorKind::StreamError, e))?;
}
}
stream = None;
if self.data_rx.is_closed() {
break;
}
}
_ = timer.tick() => {
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"checkpoint expired"
);
self.control_tx.send(ControlMessage::SignalNextCheckpoint).map_err(|e| Error::new(ErrorKind::StreamError, e))?;
self.config.metrics.checkpoint.checkpoint_timer_reached_cnt.increment();
match tokio::time::timeout(CHECKPOINT_TIMEOUT, stream.as_mut().unwrap()).await {
Ok(Ok(_)) => {
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"checkpoint succeeded - data streamed to Sift successfully"
);
self.config.metrics.grpc_status_counts[0].increment();
self.config.metrics.cur_retry_count.set(0);
}
Ok(Err(e)) => {
current_wait = self.handle_failed_stream(&e, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
}
Err(_elapsed) => {
let timeout_status = GrpcStatus::deadline_exceeded("checkpoint timed out waiting for Sift");
current_wait = self.handle_failed_stream(&timeout_status, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
}
}
self.config.metrics.checkpoint.next_checkpoint();
self.control_tx.send(ControlMessage::CheckpointComplete { first_message_id: first_message_id.load(Ordering::Relaxed), last_message_id: last_message_id.load(Ordering::Relaxed) }).map_err(|e| Error::new(ErrorKind::StreamError, e))?;
stream = None;
}
ctrl_msg = self.control_rx.recv() => {
match ctrl_msg {
Ok(ControlMessage::BackupFull) => {
#[cfg(feature = "tracing")]
tracing::trace!(
sift_stream_id = %self.config.sift_stream_id,
"backup full"
);
self.config.metrics.checkpoint.checkpoint_manually_reached_cnt.increment();
timer.reset_immediately();
}
Ok(ControlMessage::Shutdown) => {
break;
}
_ => continue,
}
}
}
}
self.shutdown(stream, first_message_id, last_message_id)
.await?;
Ok(())
}
fn handle_failed_stream(
&mut self,
status: &GrpcStatus,
stream_created_at: Instant,
current_wait: Duration,
first_message_id: u64,
last_message_id: u64,
) -> Result<Duration> {
let code = i32::from(status.code());
let code_idx = if (0..=16).contains(&code) {
code as usize
} else {
17
};
self.config.metrics.grpc_status_counts[code_idx].increment();
#[cfg(feature = "tracing")]
{
let msg = match status.code() {
GrpcCode::Cancelled => "stream connection went idle",
_ => "stream connection is being reset",
};
tracing::warn!(
sift_stream_id = %self.config.sift_stream_id,
retry_counter = self.config.metrics.cur_retry_count.get(),
grpc_status = ?status.code(),
msg
);
}
self.config
.metrics
.checkpoint
.failed_checkpoint_count
.increment();
if self.config.checkpoint_interval.is_some() {
self.control_tx
.send(ControlMessage::CheckpointNeedsReingestion {
first_message_id,
last_message_id,
})
.map_err(|e| Error::new(ErrorKind::StreamError, e))?;
}
let backoff = if stream_created_at.elapsed() > self.config.retry_policy.max_backoff * 2 {
self.config.metrics.cur_retry_count.set(0);
Duration::ZERO
} else {
self.config.metrics.cur_retry_count.add(1);
self.config.retry_policy.backoff(current_wait)
};
Ok(backoff)
}
async fn shutdown<T: Future<Output = std::result::Result<(), GrpcStatus>> + Send + 'static>(
&mut self,
mut stream: Option<Pin<Box<T>>>,
first_message_id: Arc<AtomicU64>,
last_message_id: Arc<AtomicU64>,
) -> Result<()> {
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"ingestion task shutting down"
);
if let Some(stream) = stream.as_mut() {
match stream.await {
Ok(_) => {
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %self.config.sift_stream_id,
"final stream completed successfully"
);
}
Err(e) => {
#[cfg(feature = "tracing")]
tracing::error!(
sift_stream_id = %self.config.sift_stream_id,
error = %e,
"final stream failed"
);
self.control_tx
.send(ControlMessage::CheckpointNeedsReingestion {
first_message_id: first_message_id.load(Ordering::Relaxed),
last_message_id: last_message_id.load(Ordering::Relaxed),
})
.map_err(|e| Error::new(ErrorKind::StreamError, e))?;
}
}
}
self.control_tx
.send(ControlMessage::CheckpointComplete {
first_message_id: first_message_id.load(Ordering::Relaxed),
last_message_id: last_message_id.load(Ordering::Relaxed),
})
.map_err(|e| Error::new(ErrorKind::StreamError, e))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use sift_rs::ingest::v1::{
IngestWithConfigDataChannelValue, ingest_with_config_data_channel_value::Type,
};
use crate::{TimeValue, stream::retry::RetryPolicy};
use super::*;
fn make_ingestion_task_config(
ingestion_channel: SiftChannel,
metrics: Arc<SiftStreamMetrics>,
checkpoint_interval: Duration,
) -> IngestionTaskConfig {
IngestionTaskConfig {
session_name: "test-session".to_string(),
sift_stream_id: Uuid::new_v4(),
ingestion_channel,
enable_compression_for_ingestion: false,
metrics,
retry_policy: RetryPolicy::default(),
checkpoint_interval: Some(checkpoint_interval),
}
}
async fn send_messages_for_ingestion(
data_tx: &async_channel::Sender<DataMessage>,
count: usize,
) {
for i in 0..count {
let request = sift_rs::ingest::v1::IngestWithConfigDataStreamRequest {
ingestion_config_id: "test-0".to_string(),
flow: "some_flow".to_string(),
timestamp: Some(*TimeValue::now()),
channel_values: vec![IngestWithConfigDataChannelValue {
r#type: Some(Type::Int32(i as i32)),
}],
run_id: "test-run-id".to_string(),
end_stream_on_validation_error: false,
organization_id: "test-organization-id".to_string(),
};
assert!(
data_tx
.try_send(DataMessage {
message_id: i as u64,
request: Arc::new(request),
dropped_for_ingestion: false
})
.is_ok(),
"failed to send data message to ingestion task"
);
}
for _ in 0..5 {
if data_tx.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("data tx len: {}", data_tx.len());
}
#[tokio::test]
async fn test_ingestion_task_shutdown() {
let (ingestion_channel, _mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_secs(60);
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), checkpoint_interval);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 100).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
assert!(
handle.await.is_ok(),
"ingestion task should complete successfully"
);
assert!(data_tx.is_empty(), "data channel should be empty");
let mut complete_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointComplete {
first_message_id: _,
last_message_id: _
}
) {
complete_count += 1;
}
}
assert_eq!(complete_count, 1, "should have completed 1 checkpoint");
}
#[tokio::test]
async fn test_ingestion_task_shutdown_ungracefully() {
let (ingestion_channel, _mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_secs(60);
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), checkpoint_interval);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 100).await;
data_tx.close();
let res = tokio::time::timeout(Duration::from_secs(10), handle).await;
assert!(res.is_ok(), "ingestion task should complete successfully");
assert!(data_tx.is_empty(), "data channel should be empty");
let mut complete_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointComplete {
first_message_id: _,
last_message_id: _
}
) {
complete_count += 1;
}
}
assert_eq!(complete_count, 1, "should have completed 1 checkpoint");
}
#[tokio::test]
async fn test_ingestion_task_shutdown_errors() {
let (ingestion_channel, mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_secs(60);
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), checkpoint_interval);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
mock_service.set_num_errors_to_return(2);
let handle = tokio::spawn(async move { ingestion_task.run().await });
let send = async {
send_messages_for_ingestion(&data_tx, 100).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
};
let (_, handle_result) = tokio::join!(send, handle);
assert!(
handle_result.is_ok(),
"ingestion task should complete successfully"
);
assert!(data_tx.is_empty(), "data channel should be empty");
let mut complete_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointComplete {
first_message_id: _,
last_message_id: _
}
) {
complete_count += 1;
}
}
assert_eq!(
complete_count, 3,
"should have completed 3 checkpoints (2 from stream failures + 1 from shutdown)"
);
}
#[tokio::test]
async fn test_ingestion_task_stream() {
let (ingestion_channel, mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, _control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), Duration::from_secs(60));
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 10).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
assert!(
handle.await.is_ok(),
"ingestion task should complete successfully"
);
let captured = mock_service.get_captured_data();
assert_eq!(captured.len(), 10, "should have captured 10 messages");
for (i, message) in captured.iter().enumerate() {
assert_eq!(
message.ingestion_config_id, "test-0",
"ingestion config id should be test-0"
);
assert_eq!(message.flow, "some_flow", "flow should be some_flow");
assert_eq!(
message.channel_values.len(),
1,
"should have one channel value"
);
assert_eq!(
message.channel_values[0].r#type,
Some(Type::Int32(i as i32)),
"channel value should be int32({i})"
);
}
assert_eq!(
metrics.messages_sent.get(),
10,
"should have sent 10 messages"
);
assert!(
metrics.bytes_sent.get() >= 10 * 70,
"should have sent at least 10 * 70 bytes"
);
}
#[tokio::test]
async fn test_ingestion_task_stream_retries() {
let (ingestion_channel, mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_millis(100);
let config = IngestionTaskConfig {
session_name: "test-session".to_string(),
sift_stream_id: Uuid::new_v4(),
ingestion_channel,
enable_compression_for_ingestion: false,
metrics: metrics.clone(),
retry_policy: RetryPolicy {
max_attempts: 3,
initial_backoff: Duration::from_millis(1),
max_backoff: Duration::from_millis(100),
backoff_multiplier: 5,
},
checkpoint_interval: Some(checkpoint_interval),
};
let max_attempts = config.retry_policy.max_attempts as usize;
mock_service.set_num_errors_to_return(max_attempts + 1);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 10).await;
tokio::time::sleep(checkpoint_interval).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
let res = tokio::time::timeout(Duration::from_secs(10), handle).await;
assert!(res.is_ok(), "ingestion task should complete successfully");
assert_eq!(
metrics.messages_sent.get(),
10,
"should have sent 10 messages"
);
assert!(
metrics.bytes_sent.get() >= 10 * 70,
"should have sent at least 10 * 70 bytes"
);
assert_eq!(
metrics.checkpoint.failed_checkpoint_count.get(),
4,
"should have failed the checkpoint 4 times"
);
let mut needs_reingestion_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointNeedsReingestion {
first_message_id: _,
last_message_id: _
}
) {
needs_reingestion_count += 1;
}
}
assert_eq!(
needs_reingestion_count, 4,
"should have received 4 checkpoint needs reingestion messages"
);
}
#[tokio::test]
async fn test_ingestion_task_checkpoints() {
let (ingestion_channel, _mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_millis(100);
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), checkpoint_interval);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 100).await;
tokio::time::sleep(checkpoint_interval * 3).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
assert!(
handle.await.is_ok(),
"ingestion task should complete successfully"
);
assert!(
metrics.checkpoint.checkpoint_timer_reached_cnt.get() >= 3,
"should have reached the checkpoint timer at least 3 times"
);
assert!(
metrics.checkpoint.checkpoint_count.get() >= 3,
"should have completed at least 3 checkpoints"
);
let mut complete_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointComplete {
first_message_id: _,
last_message_id: _
}
) {
complete_count += 1;
}
}
assert!(
complete_count >= 3,
"should have completed at least 3 checkpoints"
);
}
#[tokio::test]
async fn test_ingestion_task_backup_full() {
let (ingestion_channel, _mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let checkpoint_interval = Duration::from_secs(60);
let config =
make_ingestion_task_config(ingestion_channel, metrics.clone(), checkpoint_interval);
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 100).await;
assert!(
control_tx.send(ControlMessage::BackupFull).is_ok(),
"failed to send backup full message to ingestion task"
);
send_messages_for_ingestion(&data_tx, 100).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
assert!(
handle.await.is_ok(),
"ingestion task should complete successfully"
);
assert_eq!(
metrics.checkpoint.checkpoint_manually_reached_cnt.get(),
1,
"should have reached the checkpoint manually 1 time"
);
assert!(
metrics.checkpoint.checkpoint_count.get() >= 1,
"should have completed at least 1 checkpoint"
);
let mut complete_count = 0;
while let Ok(msg) = control_rx.try_recv() {
if matches!(
msg,
ControlMessage::CheckpointComplete {
first_message_id: _,
last_message_id: _
}
) {
complete_count += 1;
}
}
assert!(
complete_count >= 2,
"should have completed at least 2 checkpoints (1 for the final checkpoint)"
);
}
fn make_live_only_ingestion_task_config(
ingestion_channel: SiftChannel,
metrics: Arc<SiftStreamMetrics>,
) -> IngestionTaskConfig {
IngestionTaskConfig {
session_name: "test-session".to_string(),
sift_stream_id: Uuid::new_v4(),
ingestion_channel,
enable_compression_for_ingestion: false,
metrics,
retry_policy: RetryPolicy::default(),
checkpoint_interval: None,
}
}
#[tokio::test]
async fn test_ingestion_task_live_only_shutdown() {
let (ingestion_channel, _mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, mut control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let config = make_live_only_ingestion_task_config(ingestion_channel, metrics.clone());
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 100).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
let res = tokio::time::timeout(Duration::from_secs(10), handle).await;
assert!(
res.is_ok(),
"ingestion task should complete without panicking (no timer overflow)"
);
while let Ok(msg) = control_rx.try_recv() {
assert!(
!matches!(msg, ControlMessage::SignalNextCheckpoint),
"live-only mode should not fire the checkpoint timer"
);
}
}
#[tokio::test]
async fn test_ingestion_task_live_only_stream() {
let (ingestion_channel, mock_service) =
crate::test::create_mock_grpc_channel_with_service().await;
let (control_tx, _control_rx) = broadcast::channel(1024);
let (data_tx, data_rx) = async_channel::bounded(1024);
let metrics = Arc::new(SiftStreamMetrics::default());
let config = make_live_only_ingestion_task_config(ingestion_channel, metrics.clone());
let control_rx_task = control_tx.subscribe();
let mut ingestion_task =
IngestionTask::new(control_tx.clone(), control_rx_task, data_rx, config);
let handle = tokio::spawn(async move { ingestion_task.run().await });
send_messages_for_ingestion(&data_tx, 10).await;
data_tx.close();
assert!(
control_tx.send(ControlMessage::Shutdown).is_ok(),
"failed to send shutdown message to ingestion task"
);
assert!(
handle.await.is_ok(),
"ingestion task should complete successfully"
);
let captured = mock_service.get_captured_data();
assert_eq!(captured.len(), 10, "should have captured 10 messages");
assert_eq!(
metrics.messages_sent.get(),
10,
"should have sent 10 messages"
);
}
}