Skip to main content

sift_stream/stream/mode/
live_with_backups.rs

1use crate::metrics::SiftStreamMetrics;
2use crate::stream::flow::FlowDescriptor;
3use crate::stream::mode::ingestion_config::IngestionConfigEncoder;
4use crate::stream::send_error::{SendError, TrySendError};
5use crate::stream::tasks::{ControlMessage, DataMessage, LiveWithBackupsTaskConfig, TaskBuilder};
6use crate::stream::{SiftStream, Transport, private::Sealed};
7use async_trait::async_trait;
8use sift_error::prelude::*;
9use sift_rs::{
10    ingest::v1::IngestWithConfigDataStreamRequest, ingestion_configs::v2::IngestionConfig,
11    runs::v2::Run,
12};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use uuid::Uuid;
18
19/// Transport for real-time streaming with periodic checkpointing and disk backups.
20///
21/// Maintains two internal bounded channels:
22///
23/// - **backup channel** — the primary durability path and the sole source of backpressure.
24///   [`send`](crate::SiftStream::send) awaits when this channel is full. Capacity is set via
25///   [`LiveWithBackupsBuilder::backup_data_channel_capacity`](crate::LiveWithBackupsBuilder::backup_data_channel_capacity)
26///   (default: [`DATA_CHANNEL_CAPACITY`](crate::stream::tasks::DATA_CHANNEL_CAPACITY)).
27/// - **ingestion channel** — forwards messages to the gRPC task using force-send. When full,
28///   the **oldest buffered message is evicted** rather than blocking the caller. Evicted
29///   messages are redirected to the backup channel. Capacity is set via
30///   [`LiveWithBackupsBuilder::ingestion_data_channel_capacity`](crate::LiveWithBackupsBuilder::ingestion_data_channel_capacity).
31///
32/// Because of force-send eviction, the message inside a [`SendError`](crate::SendError) or
33/// [`TrySendError`](crate::TrySendError) returned by [`send`](crate::SiftStream::send) /
34/// [`try_send`](crate::SiftStream::try_send) may be an **older displaced message**, not
35/// necessarily the one passed to the current call.
36pub struct LiveStreamingWithBackups {
37    message_id_counter: u64,
38    backup_tx: async_channel::Sender<DataMessage>,
39    ingestion_tx: async_channel::Sender<DataMessage>,
40    control_tx: broadcast::Sender<ControlMessage>,
41    ingestion_task: JoinHandle<Result<()>>,
42    backup_manager: JoinHandle<Result<()>>,
43    reingestion_task: JoinHandle<Result<()>>,
44    metrics_streaming: Option<JoinHandle<Result<()>>>,
45    flows_seen: HashSet<String>,
46    metrics: Arc<SiftStreamMetrics>,
47}
48
49impl Sealed for LiveStreamingWithBackups {}
50
51impl LiveStreamingWithBackups {
52    fn prepare_message(
53        &mut self,
54        stream_id: &Uuid,
55        message: IngestWithConfigDataStreamRequest,
56    ) -> DataMessage {
57        #[cfg(feature = "tracing")]
58        {
59            if !self.flows_seen.contains(&message.flow) {
60                self.metrics.unique_flows_received.increment();
61                self.flows_seen.insert(message.flow.clone());
62                tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow);
63            }
64        }
65
66        self.metrics
67            .ingestion_channel_depth
68            .set(self.ingestion_tx.len() as u64);
69        self.metrics
70            .backup_channel_depth
71            .set(self.backup_tx.len() as u64);
72        self.metrics.messages_received.increment();
73
74        let data_msg = DataMessage {
75            message_id: self.message_id_counter,
76            request: Arc::new(message),
77            dropped_for_ingestion: false,
78        };
79        self.message_id_counter += 1;
80        data_msg
81    }
82
83    /// Used by `async fn send`. If an oldest message is evicted from the ingestion
84    /// channel, awaits until backup has space to accept it. Returns the undeliverable
85    /// message on backup channel close or ingestion channel close.
86    async fn dispatch_to_ingestion(
87        &mut self,
88        stream_id: &Uuid,
89        data_msg: DataMessage,
90    ) -> Option<IngestWithConfigDataStreamRequest> {
91        match self.ingestion_tx.force_send(data_msg) {
92            Ok(None) => None,
93            Ok(Some(mut oldest)) => {
94                oldest.dropped_for_ingestion = true;
95                self.metrics.old_messages_dropped_for_ingestion.increment();
96                self.metrics.checkpoint.failed_checkpoint_count.increment();
97                match self.backup_tx.send(oldest).await {
98                    Ok(()) => {
99                        self.metrics.messages_sent_to_backup.increment();
100                        None
101                    }
102                    Err(async_channel::SendError(dm)) => {
103                        self.metrics
104                            .old_messages_failed_adding_to_backup
105                            .increment();
106                        #[cfg(feature = "tracing")]
107                        tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message");
108                        Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
109                    }
110                }
111            }
112            Err(async_channel::SendError(dm)) => {
113                #[cfg(feature = "tracing")]
114                tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed");
115                Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
116            }
117        }
118    }
119
120    /// Used by `fn try_send`. If an oldest message is evicted from the ingestion
121    /// channel and backup is full or closed, returns the evicted message to the
122    /// caller. Also returns the message when the ingestion channel itself is closed.
123    fn try_dispatch_to_ingestion(
124        &mut self,
125        stream_id: &Uuid,
126        data_msg: DataMessage,
127    ) -> Option<IngestWithConfigDataStreamRequest> {
128        match self.ingestion_tx.force_send(data_msg) {
129            Ok(None) => None,
130            Ok(Some(mut oldest)) => {
131                oldest.dropped_for_ingestion = true;
132                self.metrics.old_messages_dropped_for_ingestion.increment();
133                self.metrics.checkpoint.failed_checkpoint_count.increment();
134                match self.backup_tx.try_send(oldest) {
135                    Ok(()) => {
136                        self.metrics.messages_sent_to_backup.increment();
137                        None
138                    }
139                    Err(async_channel::TrySendError::Full(dm)) => {
140                        self.metrics
141                            .old_messages_failed_adding_to_backup
142                            .increment();
143                        Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
144                    }
145                    Err(async_channel::TrySendError::Closed(dm)) => {
146                        self.metrics
147                            .old_messages_failed_adding_to_backup
148                            .increment();
149                        #[cfg(feature = "tracing")]
150                        tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message");
151                        Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
152                    }
153                }
154            }
155            Err(async_channel::SendError(dm)) => {
156                #[cfg(feature = "tracing")]
157                tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed");
158                Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
159            }
160        }
161    }
162}
163
164#[async_trait]
165impl Transport for LiveStreamingWithBackups {
166    type Encoder = IngestionConfigEncoder;
167    type Message = IngestWithConfigDataStreamRequest;
168
169    /// Sends a message, awaiting capacity on the **backup channel** if it is full.
170    ///
171    /// Backpressure comes exclusively from the bounded backup channel. Once the backup
172    /// channel accepts the message, the message is dispatched to the ingestion channel via
173    /// force-send: if the ingestion channel is full, the **oldest buffered message in the
174    /// ingestion channel** is evicted and redirected to the backup channel, again awaiting
175    /// for capacity for the displaced message.
176    ///
177    /// An error is returned only when a channel has closed (stream shutdown). Because of
178    /// force-send eviction, **the message returned inside `Err` may be an older displaced
179    /// message**, not necessarily the one passed to this call.
180    async fn send(
181        &mut self,
182        stream_id: &Uuid,
183        message: Self::Message,
184    ) -> std::result::Result<(), SendError<Self::Message>> {
185        let data_msg = self.prepare_message(stream_id, message);
186
187        self.backup_tx
188            .send(data_msg.clone())
189            .await
190            .map_err(|async_channel::SendError(dm)| {
191                SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
192            })?;
193
194        self.metrics.messages_sent_to_backup.increment();
195        if let Some(displaced) = self.dispatch_to_ingestion(stream_id, data_msg).await {
196            return Err(SendError(displaced));
197        }
198        Ok(())
199    }
200
201    /// Attempts to send a message without blocking.
202    ///
203    /// Returns immediately with `TrySendError::Full` if the **backup channel** is at
204    /// capacity, or `TrySendError::Closed` if the backup channel has been closed. If the
205    /// backup channel accepts the message, force-send dispatch to the ingestion channel
206    /// proceeds: if the ingestion channel is full, the oldest buffered message is evicted and
207    /// a non-blocking attempt is made to redirect it to the backup channel. If that
208    /// redirection also fails (backup full or closed), the evicted message is returned as
209    /// `TrySendError::Full`.
210    ///
211    /// Because of force-send eviction, **the message returned inside `Err` may be an older
212    /// displaced message**, not necessarily the one passed to this call.
213    fn try_send(
214        &mut self,
215        stream_id: &Uuid,
216        message: Self::Message,
217    ) -> std::result::Result<(), TrySendError<Self::Message>> {
218        let data_msg = self.prepare_message(stream_id, message);
219
220        match self.backup_tx.try_send(data_msg.clone()) {
221            Ok(()) => {}
222            Err(async_channel::TrySendError::Full(dm)) => {
223                return Err(TrySendError::Full(
224                    Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
225                ));
226            }
227            Err(async_channel::TrySendError::Closed(dm)) => {
228                return Err(TrySendError::Closed(
229                    Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
230                ));
231            }
232        }
233
234        self.metrics.messages_sent_to_backup.increment();
235        if let Some(displaced) = self.try_dispatch_to_ingestion(stream_id, data_msg) {
236            return Err(TrySendError::Full(displaced));
237        }
238        Ok(())
239    }
240
241    /// Sends a batch of messages in order, awaiting capacity for each one.
242    async fn send_requests<I>(
243        &mut self,
244        stream_id: &Uuid,
245        requests: I,
246    ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
247    where
248        I: IntoIterator<Item = Self::Message> + Send,
249        I::IntoIter: Send,
250    {
251        let mut iter = requests.into_iter();
252        while let Some(msg) = iter.next() {
253            if let Err(SendError(failed)) = self.send(stream_id, msg).await {
254                let mut undelivered = vec![failed];
255                undelivered.extend(iter);
256                return Err(SendError(undelivered));
257            }
258        }
259        Ok(())
260    }
261
262    /// Attempts to send a batch of messages in order without blocking.
263    fn try_send_requests<I>(
264        &mut self,
265        stream_id: &Uuid,
266        requests: I,
267    ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
268    where
269        I: IntoIterator<Item = Self::Message> + Send,
270        I::IntoIter: Send,
271    {
272        let mut iter = requests.into_iter();
273        while let Some(msg) = iter.next() {
274            match self.try_send(stream_id, msg) {
275                Ok(()) => {}
276                Err(TrySendError::Full(failed)) => {
277                    let mut undelivered = vec![failed];
278                    undelivered.extend(iter);
279                    return Err(TrySendError::Full(undelivered));
280                }
281                Err(TrySendError::Closed(failed)) => {
282                    let mut undelivered = vec![failed];
283                    undelivered.extend(iter);
284                    return Err(TrySendError::Closed(undelivered));
285                }
286            }
287        }
288        Ok(())
289    }
290
291    /// Closes both internal channels, signals shutdown, and awaits all background tasks.
292    ///
293    /// Dropping both channels causes the ingestion task to drain any already-queued messages
294    /// before acting on the shutdown signal, so all messages sent before `finish` is called
295    /// will be processed. This method also triggers the final checkpoint, which requests
296    /// delivery confirmation from Sift for all data up to this point.
297    ///
298    /// Always call `finish` when done sending — dropping a [`SiftStream`](crate::SiftStream)
299    /// without calling it may result in tail-end data not reaching Sift.
300    async fn finish(self, stream_id: &Uuid) -> Result<()> {
301        drop(self.ingestion_tx);
302        drop(self.backup_tx);
303
304        self.control_tx
305            .send(ControlMessage::Shutdown)
306            .map_err(|e| Error::new(ErrorKind::StreamError, e))
307            .context("failed to send shutdown signal to task-based architecture")?;
308
309        let _ = tokio::try_join!(
310            self.ingestion_task,
311            self.backup_manager,
312            self.reingestion_task,
313        );
314
315        if let Some(metrics_streaming) = self.metrics_streaming {
316            let _ = metrics_streaming.await;
317        }
318
319        #[cfg(feature = "tracing")]
320        tracing::info!(
321            sift_stream_id = %stream_id,
322            "successfully shutdown streaming system"
323        );
324
325        Ok(())
326    }
327}
328
329impl SiftStream<IngestionConfigEncoder, LiveStreamingWithBackups> {
330    pub(crate) async fn new_live_with_backups(
331        ingestion_config: IngestionConfig,
332        flows_by_name: HashMap<String, FlowDescriptor<String>>,
333        run: Option<Run>,
334        task_config: LiveWithBackupsTaskConfig,
335        metrics: Arc<SiftStreamMetrics>,
336    ) -> Result<Self> {
337        #[cfg(feature = "metrics-unstable")]
338        {
339            let uuid = task_config.sift_stream_id.to_string();
340            let m = metrics.clone();
341            tokio::spawn(async move {
342                crate::metrics::register_metrics(uuid, m).await;
343            });
344        }
345
346        metrics.loaded_flows.add(flows_by_name.len() as u64);
347        let sift_stream_id = task_config.sift_stream_id;
348        let grpc_channel = task_config.setup_channel.clone();
349
350        let tasks = TaskBuilder::start_live_with_backups(task_config)
351            .await
352            .context("failed to start live-with-backups streaming tasks")?;
353
354        Ok(Self {
355            grpc_channel: grpc_channel.clone(),
356            encoder: IngestionConfigEncoder {
357                grpc_channel,
358                flows_by_name,
359                ingestion_config,
360                metrics: metrics.clone(),
361            },
362            transport: LiveStreamingWithBackups {
363                message_id_counter: 0,
364                backup_tx: tasks.backup_tx,
365                ingestion_tx: tasks.ingestion_tx,
366                control_tx: tasks.control_tx,
367                ingestion_task: tasks.ingestion,
368                backup_manager: tasks.backup_manager,
369                reingestion_task: tasks.reingestion,
370                metrics_streaming: tasks.metrics_streaming,
371                flows_seen: HashSet::new(),
372                metrics,
373            },
374            run,
375            sift_stream_id,
376        })
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use crate::stream::tasks::ControlMessage;
384    use crate::stream::tasks::DataMessage;
385    use tokio::sync::broadcast;
386
387    fn make_request() -> IngestWithConfigDataStreamRequest {
388        IngestWithConfigDataStreamRequest {
389            ingestion_config_id: uuid::Uuid::new_v4().to_string(),
390            flow: "test_flow".to_string(),
391            timestamp: None,
392            channel_values: vec![],
393            run_id: String::new(),
394            end_stream_on_validation_error: false,
395            organization_id: String::new(),
396        }
397    }
398
399    fn make_live_streaming_with_backups(
400        ingestion_capacity: usize,
401        backup_capacity: usize,
402    ) -> (
403        LiveStreamingWithBackups,
404        async_channel::Receiver<DataMessage>,
405        async_channel::Receiver<DataMessage>,
406    ) {
407        let (control_tx, _) = broadcast::channel(10);
408        let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity);
409        let (backup_tx, backup_rx) = async_channel::bounded(backup_capacity);
410
411        let transport = LiveStreamingWithBackups {
412            message_id_counter: 0,
413            backup_tx,
414            ingestion_tx,
415            control_tx,
416            ingestion_task: tokio::spawn(async { Ok(()) }),
417            backup_manager: tokio::spawn(async { Ok(()) }),
418            reingestion_task: tokio::spawn(async { Ok(()) }),
419            metrics_streaming: None,
420            flows_seen: HashSet::new(),
421            metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
422        };
423
424        (transport, ingestion_rx, backup_rx)
425    }
426
427    #[tokio::test]
428    async fn test_try_send_backup_closed_returns_closed() {
429        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
430        drop(backup_rx);
431        let stream_id = uuid::Uuid::new_v4();
432        let req = make_request();
433        let flow = req.flow.clone();
434        let err = live.try_send(&stream_id, req).unwrap_err();
435        assert!(err.is_closed(), "expected Closed, got {err}");
436        assert_eq!(err.into_inner().flow, flow);
437    }
438
439    #[tokio::test]
440    async fn test_try_send_backup_full_returns_full() {
441        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
442        let dummy = DataMessage {
443            message_id: 0,
444            request: Arc::new(make_request()),
445            dropped_for_ingestion: false,
446        };
447        live.backup_tx.try_send(dummy).unwrap();
448
449        let stream_id = uuid::Uuid::new_v4();
450        let req = make_request();
451        let flow = req.flow.clone();
452        let err = live.try_send(&stream_id, req).unwrap_err();
453        assert!(err.is_full(), "expected Full, got {err}");
454        assert_eq!(err.into_inner().flow, flow);
455        drop(backup_rx);
456    }
457
458    #[tokio::test]
459    async fn test_send_backup_closed_returns_send_error() {
460        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
461        drop(backup_rx);
462        let stream_id = uuid::Uuid::new_v4();
463        let req = make_request();
464        let flow = req.flow.clone();
465        let err = live.send(&stream_id, req).await.unwrap_err();
466        assert_eq!(err.into_inner().flow, flow);
467    }
468
469    #[tokio::test]
470    async fn test_send_blocks_until_backup_space_available() {
471        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
472        let dummy = DataMessage {
473            message_id: 0,
474            request: Arc::new(make_request()),
475            dropped_for_ingestion: false,
476        };
477        live.backup_tx.try_send(dummy).unwrap();
478
479        tokio::spawn(async move {
480            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
481            let _ = backup_rx.recv().await;
482            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
483        });
484
485        let stream_id = uuid::Uuid::new_v4();
486        live.send(&stream_id, make_request()).await.unwrap();
487    }
488
489    #[tokio::test]
490    async fn test_try_send_requests_returns_undelivered_on_full() {
491        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
492        let dummy = DataMessage {
493            message_id: 0,
494            request: Arc::new(make_request()),
495            dropped_for_ingestion: false,
496        };
497        live.backup_tx.try_send(dummy).unwrap();
498
499        let stream_id = uuid::Uuid::new_v4();
500        let reqs = vec![make_request(), make_request(), make_request()];
501        let err = live.try_send_requests(&stream_id, reqs).unwrap_err();
502        assert!(err.is_full(), "expected Full, got {err}");
503        assert_eq!(err.into_inner().len(), 3);
504        drop(backup_rx);
505    }
506
507    #[tokio::test]
508    async fn test_send_requests_returns_undelivered_on_closed() {
509        let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
510        drop(backup_rx);
511
512        let stream_id = uuid::Uuid::new_v4();
513        let reqs = vec![make_request(), make_request(), make_request()];
514        let err = live.send_requests(&stream_id, reqs).await.unwrap_err();
515        assert_eq!(err.into_inner().len(), 3);
516    }
517
518    #[tokio::test]
519    async fn test_send_evicts_oldest_when_ingestion_full() {
520        // ingestion capacity=1 pre-filled; backup has plenty of room.
521        let (mut transport, ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 10);
522
523        // Pre-fill ingestion with a sentinel message.
524        let old_msg = DataMessage {
525            message_id: 99,
526            request: Arc::new(IngestWithConfigDataStreamRequest {
527                ingestion_config_id: uuid::Uuid::new_v4().to_string(),
528                flow: "old_flow".to_string(),
529                timestamp: None,
530                channel_values: vec![],
531                run_id: String::new(),
532                end_stream_on_validation_error: false,
533                organization_id: String::new(),
534            }),
535            dropped_for_ingestion: false,
536        };
537        transport.ingestion_tx.try_send(old_msg).unwrap();
538
539        // Sending a new message should:
540        //   1. send it to backup (backup_rx[0])
541        //   2. force-send it into ingestion, evicting the old message
542        //   3. send the evicted old message to backup (backup_rx[1])
543        let stream_id = uuid::Uuid::new_v4();
544        let new_req = make_request(); // flow = "test_flow"
545        transport.send(&stream_id, new_req).await.unwrap();
546
547        // Ingestion should contain the new message.
548        let in_msg = ingestion_rx.try_recv().unwrap();
549        assert_eq!(in_msg.message_id, 0);
550        assert!(!in_msg.dropped_for_ingestion);
551
552        // Backup first receives the new message (sent before dispatch_to_ingestion)…
553        let backup_first = backup_rx.try_recv().unwrap();
554        assert_eq!(backup_first.message_id, 0);
555        assert!(!backup_first.dropped_for_ingestion);
556
557        // …then the evicted old message.
558        let backup_evicted = backup_rx.try_recv().unwrap();
559        assert_eq!(backup_evicted.message_id, 99);
560        assert!(backup_evicted.dropped_for_ingestion);
561        assert_eq!(backup_evicted.request.flow, "old_flow");
562    }
563
564    #[tokio::test]
565    async fn test_try_send_evicts_oldest_to_backup_when_ingestion_full() {
566        let (mut transport, ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 10);
567
568        let old_msg = DataMessage {
569            message_id: 99,
570            request: Arc::new(IngestWithConfigDataStreamRequest {
571                ingestion_config_id: uuid::Uuid::new_v4().to_string(),
572                flow: "old_flow".to_string(),
573                timestamp: None,
574                channel_values: vec![],
575                run_id: String::new(),
576                end_stream_on_validation_error: false,
577                organization_id: String::new(),
578            }),
579            dropped_for_ingestion: false,
580        };
581        transport.ingestion_tx.try_send(old_msg).unwrap();
582
583        let stream_id = uuid::Uuid::new_v4();
584        transport.try_send(&stream_id, make_request()).unwrap();
585
586        let in_msg = ingestion_rx.try_recv().unwrap();
587        assert_eq!(in_msg.message_id, 0);
588        assert!(!in_msg.dropped_for_ingestion);
589
590        let backup_first = backup_rx.try_recv().unwrap();
591        assert_eq!(backup_first.message_id, 0);
592
593        let backup_evicted = backup_rx.try_recv().unwrap();
594        assert_eq!(backup_evicted.message_id, 99);
595        assert!(backup_evicted.dropped_for_ingestion);
596        assert_eq!(backup_evicted.request.flow, "old_flow");
597    }
598
599    #[tokio::test]
600    async fn test_send_returns_err_when_ingestion_closed() {
601        let (mut transport, ingestion_rx, _backup_rx) = make_live_streaming_with_backups(10, 10);
602        // Close the ingestion channel.
603        drop(ingestion_rx);
604
605        let stream_id = uuid::Uuid::new_v4();
606        let req = make_request();
607        let flow = req.flow.clone();
608        // send() first succeeds writing to backup, then fails when dispatching to ingestion.
609        let err = transport.send(&stream_id, req).await.unwrap_err();
610        assert_eq!(err.into_inner().flow, flow);
611    }
612
613    #[tokio::test]
614    async fn test_try_send_returns_full_when_evicted_and_backup_full() {
615        // backup capacity=1, ingestion capacity=1 (pre-filled).
616        // When try_send is called:
617        //   backup accepts the new msg  (now full)
618        //   force_send evicts old_msg from ingestion
619        //   backup.try_send(old_msg) → Full (backup already has one item)
620        //   try_send returns Full containing the evicted OLD message.
621        let (mut transport, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 1);
622
623        let old_msg = DataMessage {
624            message_id: 99,
625            request: Arc::new(IngestWithConfigDataStreamRequest {
626                ingestion_config_id: uuid::Uuid::new_v4().to_string(),
627                flow: "old_flow".to_string(),
628                timestamp: None,
629                channel_values: vec![],
630                run_id: String::new(),
631                end_stream_on_validation_error: false,
632                organization_id: String::new(),
633            }),
634            dropped_for_ingestion: false,
635        };
636        transport.ingestion_tx.try_send(old_msg).unwrap();
637
638        let stream_id = uuid::Uuid::new_v4();
639        let err = transport.try_send(&stream_id, make_request()).unwrap_err();
640        // The returned message is the evicted (older) one.
641        assert!(err.is_full(), "expected Full, got {err}");
642        assert_eq!(err.into_inner().flow, "old_flow");
643
644        drop(backup_rx);
645    }
646
647    #[tokio::test]
648    async fn test_message_id_counter_increments_monotonically() {
649        let (mut transport, _ingestion_rx, _backup_rx) = make_live_streaming_with_backups(10, 10);
650        let stream_id = uuid::Uuid::new_v4();
651
652        for _ in 0..5 {
653            transport.send(&stream_id, make_request()).await.unwrap();
654        }
655
656        assert_eq!(transport.message_id_counter, 5);
657    }
658
659    #[tokio::test]
660    async fn test_finish_awaits_all_three_tasks() {
661        use std::sync::atomic::{AtomicU32, Ordering};
662
663        let completed = Arc::new(AtomicU32::new(0));
664        // Keep a broadcast receiver alive so control_tx.send() doesn't fail.
665        let (control_tx, _ctrl_rx) = broadcast::channel::<ControlMessage>(10);
666        let (ingestion_tx, _) = async_channel::bounded::<DataMessage>(10);
667        let (backup_tx, _) = async_channel::bounded::<DataMessage>(10);
668
669        macro_rules! counting_task {
670            ($counter:expr) => {{
671                let c = $counter.clone();
672                tokio::spawn(async move {
673                    tokio::time::sleep(std::time::Duration::from_millis(5)).await;
674                    c.fetch_add(1, Ordering::Relaxed);
675                    Ok(())
676                })
677            }};
678        }
679
680        let transport = LiveStreamingWithBackups {
681            message_id_counter: 0,
682            backup_tx,
683            ingestion_tx,
684            control_tx,
685            ingestion_task: counting_task!(completed),
686            backup_manager: counting_task!(completed),
687            reingestion_task: counting_task!(completed),
688            metrics_streaming: None,
689            flows_seen: std::collections::HashSet::new(),
690            metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
691        };
692
693        let stream_id = uuid::Uuid::new_v4();
694        transport.finish(&stream_id).await.unwrap();
695
696        assert_eq!(
697            completed.load(Ordering::Relaxed),
698            3,
699            "finish() must await all three internal tasks before returning"
700        );
701    }
702}