Skip to main content

sift_stream/stream/mode/
live_only.rs

1use crate::metrics::SiftStreamMetrics;
2use crate::stream::flow::FlowDescriptor;
3use crate::stream::mode::ingestion_config::IngestionConfigEncoder;
4use crate::stream::send_error::{SendError, TrySendError};
5use crate::stream::tasks::{ControlMessage, DataMessage, LiveOnlyTaskConfig, TaskBuilder};
6use crate::stream::{SiftStream, Transport, private::Sealed};
7use async_trait::async_trait;
8use sift_error::prelude::*;
9use sift_rs::{
10    ingest::v1::IngestWithConfigDataStreamRequest, ingestion_configs::v2::IngestionConfig,
11    runs::v2::Run,
12};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use uuid::Uuid;
18
19/// Transport for real-time streaming over a single bounded ingestion channel.
20///
21/// Messages are delivered directly to the gRPC ingestion task. The caller blocks
22/// until the ingestion task drains capacity.
23///
24/// **Backpressure**: [`send`](crate::SiftStream::send) awaits when the **ingestion channel**
25/// is full. The channel capacity is set via
26/// [`LiveOnlyBuilder::ingestion_data_channel_capacity`](crate::LiveOnlyBuilder::ingestion_data_channel_capacity)
27/// (default: [`DATA_CHANNEL_CAPACITY`](crate::stream::tasks::DATA_CHANNEL_CAPACITY)).
28pub struct LiveStreamingOnly {
29    message_id_counter: u64,
30    ingestion_tx: async_channel::Sender<DataMessage>,
31    control_tx: broadcast::Sender<ControlMessage>,
32    ingestion_task: JoinHandle<Result<()>>,
33    metrics_streaming: Option<JoinHandle<Result<()>>>,
34    flows_seen: HashSet<String>,
35    metrics: Arc<SiftStreamMetrics>,
36}
37
38impl Sealed for LiveStreamingOnly {}
39
40impl LiveStreamingOnly {
41    fn prepare_message(
42        &mut self,
43        stream_id: &Uuid,
44        message: IngestWithConfigDataStreamRequest,
45    ) -> DataMessage {
46        #[cfg(feature = "tracing")]
47        {
48            if !self.flows_seen.contains(&message.flow) {
49                self.metrics.unique_flows_received.increment();
50                self.flows_seen.insert(message.flow.clone());
51                tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow);
52            }
53        }
54
55        self.metrics
56            .ingestion_channel_depth
57            .set(self.ingestion_tx.len() as u64);
58        self.metrics.messages_received.increment();
59
60        let data_msg = DataMessage {
61            message_id: self.message_id_counter,
62            request: Arc::new(message),
63            dropped_for_ingestion: false,
64        };
65        self.message_id_counter += 1;
66        data_msg
67    }
68}
69
70#[async_trait]
71impl Transport for LiveStreamingOnly {
72    type Encoder = IngestionConfigEncoder;
73    type Message = IngestWithConfigDataStreamRequest;
74
75    /// Sends a message, awaiting capacity on the **ingestion channel** if it is full.
76    ///
77    /// Backpressure comes from the bounded ingestion channel. The caller blocks until
78    /// the ingestion task drains capacity. Returns an error only if the channel is
79    /// closed (i.e. the stream is shutting down).
80    async fn send(
81        &mut self,
82        stream_id: &Uuid,
83        message: Self::Message,
84    ) -> std::result::Result<(), SendError<Self::Message>> {
85        let data_msg = self.prepare_message(stream_id, message);
86        self.ingestion_tx
87            .send(data_msg)
88            .await
89            .map_err(|async_channel::SendError(dm)| {
90                SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
91            })
92    }
93
94    /// Attempts to send a message without blocking.
95    ///
96    /// Returns immediately with `TrySendError::Full` or `TrySendError::Closed` if the
97    /// channel cannot accept data right now.
98    fn try_send(
99        &mut self,
100        stream_id: &Uuid,
101        message: Self::Message,
102    ) -> std::result::Result<(), TrySendError<Self::Message>> {
103        let data_msg = self.prepare_message(stream_id, message);
104        self.ingestion_tx.try_send(data_msg).map_err(|e| match e {
105            async_channel::TrySendError::Full(dm) => {
106                TrySendError::Full(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
107            }
108            async_channel::TrySendError::Closed(dm) => TrySendError::Closed(
109                Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
110            ),
111        })
112    }
113
114    /// Sends a batch of messages in order, awaiting capacity for each one.
115    async fn send_requests<I>(
116        &mut self,
117        stream_id: &Uuid,
118        requests: I,
119    ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
120    where
121        I: IntoIterator<Item = Self::Message> + Send,
122        I::IntoIter: Send,
123    {
124        let mut iter = requests.into_iter();
125        while let Some(msg) = iter.next() {
126            if let Err(SendError(failed)) = self.send(stream_id, msg).await {
127                let mut undelivered = vec![failed];
128                undelivered.extend(iter);
129                return Err(SendError(undelivered));
130            }
131        }
132        Ok(())
133    }
134
135    /// Attempts to send a batch of messages in order without blocking.
136    fn try_send_requests<I>(
137        &mut self,
138        stream_id: &Uuid,
139        requests: I,
140    ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
141    where
142        I: IntoIterator<Item = Self::Message> + Send,
143        I::IntoIter: Send,
144    {
145        let mut iter = requests.into_iter();
146        while let Some(msg) = iter.next() {
147            match self.try_send(stream_id, msg) {
148                Ok(()) => {}
149                Err(TrySendError::Full(failed)) => {
150                    let mut undelivered = vec![failed];
151                    undelivered.extend(iter);
152                    return Err(TrySendError::Full(undelivered));
153                }
154                Err(TrySendError::Closed(failed)) => {
155                    let mut undelivered = vec![failed];
156                    undelivered.extend(iter);
157                    return Err(TrySendError::Closed(undelivered));
158                }
159            }
160        }
161        Ok(())
162    }
163
164    /// Closes the ingestion channel, sends the shutdown signal, and awaits task completion.
165    ///
166    /// The ingestion task drains any messages already queued before acting on the shutdown
167    /// signal, so all messages sent before `finish` is called will be delivered.
168    async fn finish(self, stream_id: &Uuid) -> Result<()> {
169        self.ingestion_tx.close();
170        let _ = self.control_tx.send(ControlMessage::Shutdown);
171        let _ = self.ingestion_task.await;
172        if let Some(t) = self.metrics_streaming {
173            let _ = t.await;
174        }
175
176        #[cfg(feature = "tracing")]
177        tracing::info!(
178            sift_stream_id = %stream_id,
179            "successfully shutdown live-only streaming system"
180        );
181
182        Ok(())
183    }
184}
185
186impl SiftStream<IngestionConfigEncoder, LiveStreamingOnly> {
187    pub(crate) async fn new_live_only(
188        ingestion_config: IngestionConfig,
189        flows_by_name: HashMap<String, FlowDescriptor<String>>,
190        run: Option<Run>,
191        task_config: LiveOnlyTaskConfig,
192        metrics: Arc<SiftStreamMetrics>,
193    ) -> Result<Self> {
194        #[cfg(feature = "metrics-unstable")]
195        {
196            let uuid = task_config.sift_stream_id.to_string();
197            let m = metrics.clone();
198            tokio::spawn(async move {
199                crate::metrics::register_metrics(uuid, m).await;
200            });
201        }
202
203        metrics.loaded_flows.add(flows_by_name.len() as u64);
204        let sift_stream_id = task_config.sift_stream_id;
205        let grpc_channel = task_config.setup_channel.clone();
206
207        let tasks = TaskBuilder::start_live_only(task_config)
208            .await
209            .context("failed to start live-only streaming tasks")?;
210
211        Ok(Self {
212            grpc_channel: grpc_channel.clone(),
213            encoder: IngestionConfigEncoder {
214                grpc_channel,
215                flows_by_name,
216                ingestion_config,
217                metrics: metrics.clone(),
218            },
219            transport: LiveStreamingOnly {
220                message_id_counter: 0,
221                ingestion_tx: tasks.ingestion_tx,
222                control_tx: tasks.control_tx,
223                ingestion_task: tasks.ingestion,
224                metrics_streaming: tasks.metrics_streaming,
225                flows_seen: HashSet::new(),
226                metrics,
227            },
228            run,
229            sift_stream_id,
230        })
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::stream::tasks::DataMessage;
238    use tokio::sync::broadcast;
239
240    fn make_request() -> IngestWithConfigDataStreamRequest {
241        IngestWithConfigDataStreamRequest {
242            ingestion_config_id: uuid::Uuid::new_v4().to_string(),
243            flow: "test_flow".to_string(),
244            timestamp: None,
245            channel_values: vec![],
246            run_id: String::new(),
247            end_stream_on_validation_error: false,
248            organization_id: String::new(),
249        }
250    }
251
252    fn make_live_streaming_only(
253        ingestion_capacity: usize,
254    ) -> (LiveStreamingOnly, async_channel::Receiver<DataMessage>) {
255        let (control_tx, _) = broadcast::channel(10);
256        let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity);
257
258        let transport = LiveStreamingOnly {
259            message_id_counter: 0,
260            ingestion_tx,
261            control_tx,
262            ingestion_task: tokio::spawn(async { Ok(()) }),
263            metrics_streaming: None,
264            flows_seen: HashSet::new(),
265            metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
266        };
267
268        (transport, ingestion_rx)
269    }
270
271    #[tokio::test]
272    async fn test_try_send_returns_full_when_channel_at_capacity() {
273        let (mut transport, _ingestion_rx) = make_live_streaming_only(1);
274        let dummy = DataMessage {
275            message_id: 0,
276            request: Arc::new(make_request()),
277            dropped_for_ingestion: false,
278        };
279        transport.ingestion_tx.try_send(dummy).unwrap();
280
281        let stream_id = uuid::Uuid::new_v4();
282        let req = make_request();
283        let flow = req.flow.clone();
284        let err = transport.try_send(&stream_id, req).unwrap_err();
285        assert!(err.is_full(), "expected Full, got {err}");
286        assert_eq!(err.into_inner().flow, flow);
287    }
288
289    #[tokio::test]
290    async fn test_try_send_closed_returns_closed() {
291        let (mut transport, ingestion_rx) = make_live_streaming_only(10);
292        drop(ingestion_rx);
293        let stream_id = uuid::Uuid::new_v4();
294        let req = make_request();
295        let flow = req.flow.clone();
296        let err = transport.try_send(&stream_id, req).unwrap_err();
297        assert!(err.is_closed(), "expected Closed, got {err}");
298        assert_eq!(err.into_inner().flow, flow);
299    }
300
301    #[tokio::test]
302    async fn test_send_blocks_until_ingestion_space_available() {
303        let (mut transport, ingestion_rx) = make_live_streaming_only(1);
304        let dummy = DataMessage {
305            message_id: 0,
306            request: Arc::new(make_request()),
307            dropped_for_ingestion: false,
308        };
309        transport.ingestion_tx.try_send(dummy).unwrap();
310
311        tokio::spawn(async move {
312            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
313            let _ = ingestion_rx.recv().await;
314            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
315        });
316
317        let stream_id = uuid::Uuid::new_v4();
318        transport.send(&stream_id, make_request()).await.unwrap();
319    }
320
321    #[tokio::test]
322    async fn test_finish_drains_queued_messages_before_exit() {
323        let (control_tx, _) = broadcast::channel(10);
324        let (ingestion_tx, ingestion_rx) = async_channel::bounded::<DataMessage>(5);
325
326        for _ in 0..3 {
327            let msg = DataMessage {
328                message_id: 0,
329                request: Arc::new(make_request()),
330                dropped_for_ingestion: false,
331            };
332            ingestion_tx.try_send(msg).unwrap();
333        }
334
335        let consumer = tokio::spawn(async move {
336            let mut count = 0;
337            while ingestion_rx.recv().await.is_ok() {
338                count += 1;
339            }
340            count
341        });
342
343        let transport = LiveStreamingOnly {
344            message_id_counter: 3,
345            ingestion_tx,
346            control_tx,
347            ingestion_task: tokio::spawn(async { Ok(()) }),
348            metrics_streaming: None,
349            flows_seen: HashSet::new(),
350            metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
351        };
352
353        let stream_id = uuid::Uuid::new_v4();
354        transport.finish(&stream_id).await.unwrap();
355
356        let count = consumer.await.unwrap();
357        assert_eq!(count, 3);
358    }
359
360    #[tokio::test]
361    async fn test_finish_shuts_down_ingestion_task() {
362        let (control_tx, mut control_rx) = broadcast::channel(10);
363        let (ingestion_tx, _ingestion_rx) = async_channel::bounded::<DataMessage>(10);
364
365        let shutdown_task = tokio::spawn(async move {
366            loop {
367                if let Ok(ControlMessage::Shutdown) = control_rx.recv().await {
368                    break;
369                }
370            }
371            Ok(())
372        });
373
374        let transport = LiveStreamingOnly {
375            message_id_counter: 0,
376            ingestion_tx,
377            control_tx,
378            ingestion_task: shutdown_task,
379            metrics_streaming: None,
380            flows_seen: HashSet::new(),
381            metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
382        };
383
384        let stream_id = uuid::Uuid::new_v4();
385        transport.finish(&stream_id).await.unwrap();
386    }
387
388    #[tokio::test]
389    async fn test_message_id_increments_on_each_send() {
390        let (mut transport, ingestion_rx) = make_live_streaming_only(10);
391        let stream_id = uuid::Uuid::new_v4();
392
393        for _ in 0..5 {
394            transport.send(&stream_id, make_request()).await.unwrap();
395        }
396
397        let mut ids: Vec<u64> = Vec::new();
398        while let Ok(msg) = ingestion_rx.try_recv() {
399            ids.push(msg.message_id);
400        }
401        assert_eq!(ids, vec![0, 1, 2, 3, 4]);
402    }
403
404    #[tokio::test]
405    async fn test_try_send_requests_stops_at_first_full() {
406        let (mut transport, _ingestion_rx) = make_live_streaming_only(1);
407
408        // Pre-fill the channel to capacity.
409        let dummy = DataMessage {
410            message_id: 99,
411            request: Arc::new(make_request()),
412            dropped_for_ingestion: false,
413        };
414        transport.ingestion_tx.try_send(dummy).unwrap();
415
416        let stream_id = uuid::Uuid::new_v4();
417        let reqs = vec![make_request(), make_request(), make_request()];
418        let err = transport.try_send_requests(&stream_id, reqs).unwrap_err();
419        assert!(err.is_full(), "expected Full, got {err}");
420        assert_eq!(err.into_inner().len(), 3);
421    }
422
423    #[tokio::test]
424    async fn test_try_send_requests_stops_at_first_closed() {
425        let (mut transport, ingestion_rx) = make_live_streaming_only(10);
426        drop(ingestion_rx);
427
428        let stream_id = uuid::Uuid::new_v4();
429        let reqs = vec![make_request(), make_request(), make_request()];
430        let err = transport.try_send_requests(&stream_id, reqs).unwrap_err();
431        assert!(err.is_closed(), "expected Closed, got {err}");
432        assert_eq!(err.into_inner().len(), 3);
433    }
434
435    #[tokio::test]
436    async fn test_send_requests_stops_at_first_closed() {
437        let (mut transport, ingestion_rx) = make_live_streaming_only(10);
438        drop(ingestion_rx);
439
440        let stream_id = uuid::Uuid::new_v4();
441        let reqs = vec![make_request(), make_request(), make_request()];
442        let err = transport.send_requests(&stream_id, reqs).await.unwrap_err();
443        // All three returned: the one that failed plus the remaining two.
444        assert_eq!(err.into_inner().len(), 3);
445    }
446}