1use crate::metrics::SiftStreamMetrics;
2use crate::stream::flow::FlowDescriptor;
3use crate::stream::mode::ingestion_config::IngestionConfigEncoder;
4use crate::stream::send_error::{SendError, TrySendError};
5use crate::stream::tasks::{ControlMessage, DataMessage, LiveWithBackupsTaskConfig, TaskBuilder};
6use crate::stream::{SiftStream, Transport, private::Sealed};
7use async_trait::async_trait;
8use sift_error::prelude::*;
9use sift_rs::{
10 ingest::v1::IngestWithConfigDataStreamRequest, ingestion_configs::v2::IngestionConfig,
11 runs::v2::Run,
12};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use uuid::Uuid;
18
19pub struct LiveStreamingWithBackups {
37 message_id_counter: u64,
38 backup_tx: async_channel::Sender<DataMessage>,
39 ingestion_tx: async_channel::Sender<DataMessage>,
40 control_tx: broadcast::Sender<ControlMessage>,
41 ingestion_task: JoinHandle<Result<()>>,
42 backup_manager: JoinHandle<Result<()>>,
43 reingestion_task: JoinHandle<Result<()>>,
44 metrics_streaming: Option<JoinHandle<Result<()>>>,
45 flows_seen: HashSet<String>,
46 metrics: Arc<SiftStreamMetrics>,
47}
48
49impl Sealed for LiveStreamingWithBackups {}
50
51impl LiveStreamingWithBackups {
52 fn prepare_message(
53 &mut self,
54 stream_id: &Uuid,
55 message: IngestWithConfigDataStreamRequest,
56 ) -> DataMessage {
57 #[cfg(feature = "tracing")]
58 {
59 if !self.flows_seen.contains(&message.flow) {
60 self.metrics.unique_flows_received.increment();
61 self.flows_seen.insert(message.flow.clone());
62 tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow);
63 }
64 }
65
66 self.metrics
67 .ingestion_channel_depth
68 .set(self.ingestion_tx.len() as u64);
69 self.metrics
70 .backup_channel_depth
71 .set(self.backup_tx.len() as u64);
72 self.metrics.messages_received.increment();
73
74 let data_msg = DataMessage {
75 message_id: self.message_id_counter,
76 request: Arc::new(message),
77 dropped_for_ingestion: false,
78 };
79 self.message_id_counter += 1;
80 data_msg
81 }
82
83 async fn dispatch_to_ingestion(
87 &mut self,
88 stream_id: &Uuid,
89 data_msg: DataMessage,
90 ) -> Option<IngestWithConfigDataStreamRequest> {
91 match self.ingestion_tx.force_send(data_msg) {
92 Ok(None) => None,
93 Ok(Some(mut oldest)) => {
94 oldest.dropped_for_ingestion = true;
95 self.metrics.old_messages_dropped_for_ingestion.increment();
96 self.metrics.checkpoint.failed_checkpoint_count.increment();
97 match self.backup_tx.send(oldest).await {
98 Ok(()) => {
99 self.metrics.messages_sent_to_backup.increment();
100 None
101 }
102 Err(async_channel::SendError(dm)) => {
103 self.metrics
104 .old_messages_failed_adding_to_backup
105 .increment();
106 #[cfg(feature = "tracing")]
107 tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message");
108 Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
109 }
110 }
111 }
112 Err(async_channel::SendError(dm)) => {
113 #[cfg(feature = "tracing")]
114 tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed");
115 Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
116 }
117 }
118 }
119
120 fn try_dispatch_to_ingestion(
124 &mut self,
125 stream_id: &Uuid,
126 data_msg: DataMessage,
127 ) -> Option<IngestWithConfigDataStreamRequest> {
128 match self.ingestion_tx.force_send(data_msg) {
129 Ok(None) => None,
130 Ok(Some(mut oldest)) => {
131 oldest.dropped_for_ingestion = true;
132 self.metrics.old_messages_dropped_for_ingestion.increment();
133 self.metrics.checkpoint.failed_checkpoint_count.increment();
134 match self.backup_tx.try_send(oldest) {
135 Ok(()) => {
136 self.metrics.messages_sent_to_backup.increment();
137 None
138 }
139 Err(async_channel::TrySendError::Full(dm)) => {
140 self.metrics
141 .old_messages_failed_adding_to_backup
142 .increment();
143 Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
144 }
145 Err(async_channel::TrySendError::Closed(dm)) => {
146 self.metrics
147 .old_messages_failed_adding_to_backup
148 .increment();
149 #[cfg(feature = "tracing")]
150 tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message");
151 Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
152 }
153 }
154 }
155 Err(async_channel::SendError(dm)) => {
156 #[cfg(feature = "tracing")]
157 tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed");
158 Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
159 }
160 }
161 }
162}
163
164#[async_trait]
165impl Transport for LiveStreamingWithBackups {
166 type Encoder = IngestionConfigEncoder;
167 type Message = IngestWithConfigDataStreamRequest;
168
169 async fn send(
181 &mut self,
182 stream_id: &Uuid,
183 message: Self::Message,
184 ) -> std::result::Result<(), SendError<Self::Message>> {
185 let data_msg = self.prepare_message(stream_id, message);
186
187 self.backup_tx
188 .send(data_msg.clone())
189 .await
190 .map_err(|async_channel::SendError(dm)| {
191 SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
192 })?;
193
194 self.metrics.messages_sent_to_backup.increment();
195 if let Some(displaced) = self.dispatch_to_ingestion(stream_id, data_msg).await {
196 return Err(SendError(displaced));
197 }
198 Ok(())
199 }
200
201 fn try_send(
214 &mut self,
215 stream_id: &Uuid,
216 message: Self::Message,
217 ) -> std::result::Result<(), TrySendError<Self::Message>> {
218 let data_msg = self.prepare_message(stream_id, message);
219
220 match self.backup_tx.try_send(data_msg.clone()) {
221 Ok(()) => {}
222 Err(async_channel::TrySendError::Full(dm)) => {
223 return Err(TrySendError::Full(
224 Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
225 ));
226 }
227 Err(async_channel::TrySendError::Closed(dm)) => {
228 return Err(TrySendError::Closed(
229 Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
230 ));
231 }
232 }
233
234 self.metrics.messages_sent_to_backup.increment();
235 if let Some(displaced) = self.try_dispatch_to_ingestion(stream_id, data_msg) {
236 return Err(TrySendError::Full(displaced));
237 }
238 Ok(())
239 }
240
241 async fn send_requests<I>(
243 &mut self,
244 stream_id: &Uuid,
245 requests: I,
246 ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
247 where
248 I: IntoIterator<Item = Self::Message> + Send,
249 I::IntoIter: Send,
250 {
251 let mut iter = requests.into_iter();
252 while let Some(msg) = iter.next() {
253 if let Err(SendError(failed)) = self.send(stream_id, msg).await {
254 let mut undelivered = vec![failed];
255 undelivered.extend(iter);
256 return Err(SendError(undelivered));
257 }
258 }
259 Ok(())
260 }
261
262 fn try_send_requests<I>(
264 &mut self,
265 stream_id: &Uuid,
266 requests: I,
267 ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
268 where
269 I: IntoIterator<Item = Self::Message> + Send,
270 I::IntoIter: Send,
271 {
272 let mut iter = requests.into_iter();
273 while let Some(msg) = iter.next() {
274 match self.try_send(stream_id, msg) {
275 Ok(()) => {}
276 Err(TrySendError::Full(failed)) => {
277 let mut undelivered = vec![failed];
278 undelivered.extend(iter);
279 return Err(TrySendError::Full(undelivered));
280 }
281 Err(TrySendError::Closed(failed)) => {
282 let mut undelivered = vec![failed];
283 undelivered.extend(iter);
284 return Err(TrySendError::Closed(undelivered));
285 }
286 }
287 }
288 Ok(())
289 }
290
291 async fn finish(self, stream_id: &Uuid) -> Result<()> {
301 drop(self.ingestion_tx);
302 drop(self.backup_tx);
303
304 self.control_tx
305 .send(ControlMessage::Shutdown)
306 .map_err(|e| Error::new(ErrorKind::StreamError, e))
307 .context("failed to send shutdown signal to task-based architecture")?;
308
309 let _ = tokio::try_join!(
310 self.ingestion_task,
311 self.backup_manager,
312 self.reingestion_task,
313 );
314
315 if let Some(metrics_streaming) = self.metrics_streaming {
316 let _ = metrics_streaming.await;
317 }
318
319 #[cfg(feature = "tracing")]
320 tracing::info!(
321 sift_stream_id = %stream_id,
322 "successfully shutdown streaming system"
323 );
324
325 Ok(())
326 }
327}
328
329impl SiftStream<IngestionConfigEncoder, LiveStreamingWithBackups> {
330 pub(crate) async fn new_live_with_backups(
331 ingestion_config: IngestionConfig,
332 flows_by_name: HashMap<String, FlowDescriptor<String>>,
333 run: Option<Run>,
334 task_config: LiveWithBackupsTaskConfig,
335 metrics: Arc<SiftStreamMetrics>,
336 ) -> Result<Self> {
337 #[cfg(feature = "metrics-unstable")]
338 {
339 let uuid = task_config.sift_stream_id.to_string();
340 let m = metrics.clone();
341 tokio::spawn(async move {
342 crate::metrics::register_metrics(uuid, m).await;
343 });
344 }
345
346 metrics.loaded_flows.add(flows_by_name.len() as u64);
347 let sift_stream_id = task_config.sift_stream_id;
348 let grpc_channel = task_config.setup_channel.clone();
349
350 let tasks = TaskBuilder::start_live_with_backups(task_config)
351 .await
352 .context("failed to start live-with-backups streaming tasks")?;
353
354 Ok(Self {
355 grpc_channel: grpc_channel.clone(),
356 encoder: IngestionConfigEncoder {
357 grpc_channel,
358 flows_by_name,
359 ingestion_config,
360 metrics: metrics.clone(),
361 },
362 transport: LiveStreamingWithBackups {
363 message_id_counter: 0,
364 backup_tx: tasks.backup_tx,
365 ingestion_tx: tasks.ingestion_tx,
366 control_tx: tasks.control_tx,
367 ingestion_task: tasks.ingestion,
368 backup_manager: tasks.backup_manager,
369 reingestion_task: tasks.reingestion,
370 metrics_streaming: tasks.metrics_streaming,
371 flows_seen: HashSet::new(),
372 metrics,
373 },
374 run,
375 sift_stream_id,
376 })
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use crate::stream::tasks::ControlMessage;
384 use crate::stream::tasks::DataMessage;
385 use tokio::sync::broadcast;
386
387 fn make_request() -> IngestWithConfigDataStreamRequest {
388 IngestWithConfigDataStreamRequest {
389 ingestion_config_id: uuid::Uuid::new_v4().to_string(),
390 flow: "test_flow".to_string(),
391 timestamp: None,
392 channel_values: vec![],
393 run_id: String::new(),
394 end_stream_on_validation_error: false,
395 organization_id: String::new(),
396 }
397 }
398
399 fn make_live_streaming_with_backups(
400 ingestion_capacity: usize,
401 backup_capacity: usize,
402 ) -> (
403 LiveStreamingWithBackups,
404 async_channel::Receiver<DataMessage>,
405 async_channel::Receiver<DataMessage>,
406 ) {
407 let (control_tx, _) = broadcast::channel(10);
408 let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity);
409 let (backup_tx, backup_rx) = async_channel::bounded(backup_capacity);
410
411 let transport = LiveStreamingWithBackups {
412 message_id_counter: 0,
413 backup_tx,
414 ingestion_tx,
415 control_tx,
416 ingestion_task: tokio::spawn(async { Ok(()) }),
417 backup_manager: tokio::spawn(async { Ok(()) }),
418 reingestion_task: tokio::spawn(async { Ok(()) }),
419 metrics_streaming: None,
420 flows_seen: HashSet::new(),
421 metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
422 };
423
424 (transport, ingestion_rx, backup_rx)
425 }
426
427 #[tokio::test]
428 async fn test_try_send_backup_closed_returns_closed() {
429 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
430 drop(backup_rx);
431 let stream_id = uuid::Uuid::new_v4();
432 let req = make_request();
433 let flow = req.flow.clone();
434 let err = live.try_send(&stream_id, req).unwrap_err();
435 assert!(err.is_closed(), "expected Closed, got {err}");
436 assert_eq!(err.into_inner().flow, flow);
437 }
438
439 #[tokio::test]
440 async fn test_try_send_backup_full_returns_full() {
441 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
442 let dummy = DataMessage {
443 message_id: 0,
444 request: Arc::new(make_request()),
445 dropped_for_ingestion: false,
446 };
447 live.backup_tx.try_send(dummy).unwrap();
448
449 let stream_id = uuid::Uuid::new_v4();
450 let req = make_request();
451 let flow = req.flow.clone();
452 let err = live.try_send(&stream_id, req).unwrap_err();
453 assert!(err.is_full(), "expected Full, got {err}");
454 assert_eq!(err.into_inner().flow, flow);
455 drop(backup_rx);
456 }
457
458 #[tokio::test]
459 async fn test_send_backup_closed_returns_send_error() {
460 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
461 drop(backup_rx);
462 let stream_id = uuid::Uuid::new_v4();
463 let req = make_request();
464 let flow = req.flow.clone();
465 let err = live.send(&stream_id, req).await.unwrap_err();
466 assert_eq!(err.into_inner().flow, flow);
467 }
468
469 #[tokio::test]
470 async fn test_send_blocks_until_backup_space_available() {
471 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
472 let dummy = DataMessage {
473 message_id: 0,
474 request: Arc::new(make_request()),
475 dropped_for_ingestion: false,
476 };
477 live.backup_tx.try_send(dummy).unwrap();
478
479 tokio::spawn(async move {
480 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
481 let _ = backup_rx.recv().await;
482 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
483 });
484
485 let stream_id = uuid::Uuid::new_v4();
486 live.send(&stream_id, make_request()).await.unwrap();
487 }
488
489 #[tokio::test]
490 async fn test_try_send_requests_returns_undelivered_on_full() {
491 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 1);
492 let dummy = DataMessage {
493 message_id: 0,
494 request: Arc::new(make_request()),
495 dropped_for_ingestion: false,
496 };
497 live.backup_tx.try_send(dummy).unwrap();
498
499 let stream_id = uuid::Uuid::new_v4();
500 let reqs = vec![make_request(), make_request(), make_request()];
501 let err = live.try_send_requests(&stream_id, reqs).unwrap_err();
502 assert!(err.is_full(), "expected Full, got {err}");
503 assert_eq!(err.into_inner().len(), 3);
504 drop(backup_rx);
505 }
506
507 #[tokio::test]
508 async fn test_send_requests_returns_undelivered_on_closed() {
509 let (mut live, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(10, 10);
510 drop(backup_rx);
511
512 let stream_id = uuid::Uuid::new_v4();
513 let reqs = vec![make_request(), make_request(), make_request()];
514 let err = live.send_requests(&stream_id, reqs).await.unwrap_err();
515 assert_eq!(err.into_inner().len(), 3);
516 }
517
518 #[tokio::test]
519 async fn test_send_evicts_oldest_when_ingestion_full() {
520 let (mut transport, ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 10);
522
523 let old_msg = DataMessage {
525 message_id: 99,
526 request: Arc::new(IngestWithConfigDataStreamRequest {
527 ingestion_config_id: uuid::Uuid::new_v4().to_string(),
528 flow: "old_flow".to_string(),
529 timestamp: None,
530 channel_values: vec![],
531 run_id: String::new(),
532 end_stream_on_validation_error: false,
533 organization_id: String::new(),
534 }),
535 dropped_for_ingestion: false,
536 };
537 transport.ingestion_tx.try_send(old_msg).unwrap();
538
539 let stream_id = uuid::Uuid::new_v4();
544 let new_req = make_request(); transport.send(&stream_id, new_req).await.unwrap();
546
547 let in_msg = ingestion_rx.try_recv().unwrap();
549 assert_eq!(in_msg.message_id, 0);
550 assert!(!in_msg.dropped_for_ingestion);
551
552 let backup_first = backup_rx.try_recv().unwrap();
554 assert_eq!(backup_first.message_id, 0);
555 assert!(!backup_first.dropped_for_ingestion);
556
557 let backup_evicted = backup_rx.try_recv().unwrap();
559 assert_eq!(backup_evicted.message_id, 99);
560 assert!(backup_evicted.dropped_for_ingestion);
561 assert_eq!(backup_evicted.request.flow, "old_flow");
562 }
563
564 #[tokio::test]
565 async fn test_try_send_evicts_oldest_to_backup_when_ingestion_full() {
566 let (mut transport, ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 10);
567
568 let old_msg = DataMessage {
569 message_id: 99,
570 request: Arc::new(IngestWithConfigDataStreamRequest {
571 ingestion_config_id: uuid::Uuid::new_v4().to_string(),
572 flow: "old_flow".to_string(),
573 timestamp: None,
574 channel_values: vec![],
575 run_id: String::new(),
576 end_stream_on_validation_error: false,
577 organization_id: String::new(),
578 }),
579 dropped_for_ingestion: false,
580 };
581 transport.ingestion_tx.try_send(old_msg).unwrap();
582
583 let stream_id = uuid::Uuid::new_v4();
584 transport.try_send(&stream_id, make_request()).unwrap();
585
586 let in_msg = ingestion_rx.try_recv().unwrap();
587 assert_eq!(in_msg.message_id, 0);
588 assert!(!in_msg.dropped_for_ingestion);
589
590 let backup_first = backup_rx.try_recv().unwrap();
591 assert_eq!(backup_first.message_id, 0);
592
593 let backup_evicted = backup_rx.try_recv().unwrap();
594 assert_eq!(backup_evicted.message_id, 99);
595 assert!(backup_evicted.dropped_for_ingestion);
596 assert_eq!(backup_evicted.request.flow, "old_flow");
597 }
598
599 #[tokio::test]
600 async fn test_send_returns_err_when_ingestion_closed() {
601 let (mut transport, ingestion_rx, _backup_rx) = make_live_streaming_with_backups(10, 10);
602 drop(ingestion_rx);
604
605 let stream_id = uuid::Uuid::new_v4();
606 let req = make_request();
607 let flow = req.flow.clone();
608 let err = transport.send(&stream_id, req).await.unwrap_err();
610 assert_eq!(err.into_inner().flow, flow);
611 }
612
613 #[tokio::test]
614 async fn test_try_send_returns_full_when_evicted_and_backup_full() {
615 let (mut transport, _ingestion_rx, backup_rx) = make_live_streaming_with_backups(1, 1);
622
623 let old_msg = DataMessage {
624 message_id: 99,
625 request: Arc::new(IngestWithConfigDataStreamRequest {
626 ingestion_config_id: uuid::Uuid::new_v4().to_string(),
627 flow: "old_flow".to_string(),
628 timestamp: None,
629 channel_values: vec![],
630 run_id: String::new(),
631 end_stream_on_validation_error: false,
632 organization_id: String::new(),
633 }),
634 dropped_for_ingestion: false,
635 };
636 transport.ingestion_tx.try_send(old_msg).unwrap();
637
638 let stream_id = uuid::Uuid::new_v4();
639 let err = transport.try_send(&stream_id, make_request()).unwrap_err();
640 assert!(err.is_full(), "expected Full, got {err}");
642 assert_eq!(err.into_inner().flow, "old_flow");
643
644 drop(backup_rx);
645 }
646
647 #[tokio::test]
648 async fn test_message_id_counter_increments_monotonically() {
649 let (mut transport, _ingestion_rx, _backup_rx) = make_live_streaming_with_backups(10, 10);
650 let stream_id = uuid::Uuid::new_v4();
651
652 for _ in 0..5 {
653 transport.send(&stream_id, make_request()).await.unwrap();
654 }
655
656 assert_eq!(transport.message_id_counter, 5);
657 }
658
659 #[tokio::test]
660 async fn test_finish_awaits_all_three_tasks() {
661 use std::sync::atomic::{AtomicU32, Ordering};
662
663 let completed = Arc::new(AtomicU32::new(0));
664 let (control_tx, _ctrl_rx) = broadcast::channel::<ControlMessage>(10);
666 let (ingestion_tx, _) = async_channel::bounded::<DataMessage>(10);
667 let (backup_tx, _) = async_channel::bounded::<DataMessage>(10);
668
669 macro_rules! counting_task {
670 ($counter:expr) => {{
671 let c = $counter.clone();
672 tokio::spawn(async move {
673 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
674 c.fetch_add(1, Ordering::Relaxed);
675 Ok(())
676 })
677 }};
678 }
679
680 let transport = LiveStreamingWithBackups {
681 message_id_counter: 0,
682 backup_tx,
683 ingestion_tx,
684 control_tx,
685 ingestion_task: counting_task!(completed),
686 backup_manager: counting_task!(completed),
687 reingestion_task: counting_task!(completed),
688 metrics_streaming: None,
689 flows_seen: std::collections::HashSet::new(),
690 metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
691 };
692
693 let stream_id = uuid::Uuid::new_v4();
694 transport.finish(&stream_id).await.unwrap();
695
696 assert_eq!(
697 completed.load(Ordering::Relaxed),
698 3,
699 "finish() must await all three internal tasks before returning"
700 );
701 }
702}