1use crate::metrics::SiftStreamMetrics;
2use crate::stream::flow::FlowDescriptor;
3use crate::stream::mode::ingestion_config::IngestionConfigEncoder;
4use crate::stream::send_error::{SendError, TrySendError};
5use crate::stream::tasks::{ControlMessage, DataMessage, LiveOnlyTaskConfig, TaskBuilder};
6use crate::stream::{SiftStream, Transport, private::Sealed};
7use async_trait::async_trait;
8use sift_error::prelude::*;
9use sift_rs::{
10 ingest::v1::IngestWithConfigDataStreamRequest, ingestion_configs::v2::IngestionConfig,
11 runs::v2::Run,
12};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use uuid::Uuid;
18
19pub struct LiveStreamingOnly {
29 message_id_counter: u64,
30 ingestion_tx: async_channel::Sender<DataMessage>,
31 control_tx: broadcast::Sender<ControlMessage>,
32 ingestion_task: JoinHandle<Result<()>>,
33 metrics_streaming: Option<JoinHandle<Result<()>>>,
34 flows_seen: HashSet<String>,
35 metrics: Arc<SiftStreamMetrics>,
36}
37
38impl Sealed for LiveStreamingOnly {}
39
40impl LiveStreamingOnly {
41 fn prepare_message(
42 &mut self,
43 stream_id: &Uuid,
44 message: IngestWithConfigDataStreamRequest,
45 ) -> DataMessage {
46 #[cfg(feature = "tracing")]
47 {
48 if !self.flows_seen.contains(&message.flow) {
49 self.metrics.unique_flows_received.increment();
50 self.flows_seen.insert(message.flow.clone());
51 tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow);
52 }
53 }
54
55 self.metrics
56 .ingestion_channel_depth
57 .set(self.ingestion_tx.len() as u64);
58 self.metrics.messages_received.increment();
59
60 let data_msg = DataMessage {
61 message_id: self.message_id_counter,
62 request: Arc::new(message),
63 dropped_for_ingestion: false,
64 };
65 self.message_id_counter += 1;
66 data_msg
67 }
68}
69
70#[async_trait]
71impl Transport for LiveStreamingOnly {
72 type Encoder = IngestionConfigEncoder;
73 type Message = IngestWithConfigDataStreamRequest;
74
75 async fn send(
81 &mut self,
82 stream_id: &Uuid,
83 message: Self::Message,
84 ) -> std::result::Result<(), SendError<Self::Message>> {
85 let data_msg = self.prepare_message(stream_id, message);
86 self.ingestion_tx
87 .send(data_msg)
88 .await
89 .map_err(|async_channel::SendError(dm)| {
90 SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
91 })
92 }
93
94 fn try_send(
99 &mut self,
100 stream_id: &Uuid,
101 message: Self::Message,
102 ) -> std::result::Result<(), TrySendError<Self::Message>> {
103 let data_msg = self.prepare_message(stream_id, message);
104 self.ingestion_tx.try_send(data_msg).map_err(|e| match e {
105 async_channel::TrySendError::Full(dm) => {
106 TrySendError::Full(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()))
107 }
108 async_channel::TrySendError::Closed(dm) => TrySendError::Closed(
109 Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()),
110 ),
111 })
112 }
113
114 async fn send_requests<I>(
116 &mut self,
117 stream_id: &Uuid,
118 requests: I,
119 ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
120 where
121 I: IntoIterator<Item = Self::Message> + Send,
122 I::IntoIter: Send,
123 {
124 let mut iter = requests.into_iter();
125 while let Some(msg) = iter.next() {
126 if let Err(SendError(failed)) = self.send(stream_id, msg).await {
127 let mut undelivered = vec![failed];
128 undelivered.extend(iter);
129 return Err(SendError(undelivered));
130 }
131 }
132 Ok(())
133 }
134
135 fn try_send_requests<I>(
137 &mut self,
138 stream_id: &Uuid,
139 requests: I,
140 ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
141 where
142 I: IntoIterator<Item = Self::Message> + Send,
143 I::IntoIter: Send,
144 {
145 let mut iter = requests.into_iter();
146 while let Some(msg) = iter.next() {
147 match self.try_send(stream_id, msg) {
148 Ok(()) => {}
149 Err(TrySendError::Full(failed)) => {
150 let mut undelivered = vec![failed];
151 undelivered.extend(iter);
152 return Err(TrySendError::Full(undelivered));
153 }
154 Err(TrySendError::Closed(failed)) => {
155 let mut undelivered = vec![failed];
156 undelivered.extend(iter);
157 return Err(TrySendError::Closed(undelivered));
158 }
159 }
160 }
161 Ok(())
162 }
163
164 async fn finish(self, stream_id: &Uuid) -> Result<()> {
169 self.ingestion_tx.close();
170 let _ = self.control_tx.send(ControlMessage::Shutdown);
171 let _ = self.ingestion_task.await;
172 if let Some(t) = self.metrics_streaming {
173 let _ = t.await;
174 }
175
176 #[cfg(feature = "tracing")]
177 tracing::info!(
178 sift_stream_id = %stream_id,
179 "successfully shutdown live-only streaming system"
180 );
181
182 Ok(())
183 }
184}
185
186impl SiftStream<IngestionConfigEncoder, LiveStreamingOnly> {
187 pub(crate) async fn new_live_only(
188 ingestion_config: IngestionConfig,
189 flows_by_name: HashMap<String, FlowDescriptor<String>>,
190 run: Option<Run>,
191 task_config: LiveOnlyTaskConfig,
192 metrics: Arc<SiftStreamMetrics>,
193 ) -> Result<Self> {
194 #[cfg(feature = "metrics-unstable")]
195 {
196 let uuid = task_config.sift_stream_id.to_string();
197 let m = metrics.clone();
198 tokio::spawn(async move {
199 crate::metrics::register_metrics(uuid, m).await;
200 });
201 }
202
203 metrics.loaded_flows.add(flows_by_name.len() as u64);
204 let sift_stream_id = task_config.sift_stream_id;
205 let grpc_channel = task_config.setup_channel.clone();
206
207 let tasks = TaskBuilder::start_live_only(task_config)
208 .await
209 .context("failed to start live-only streaming tasks")?;
210
211 Ok(Self {
212 grpc_channel: grpc_channel.clone(),
213 encoder: IngestionConfigEncoder {
214 grpc_channel,
215 flows_by_name,
216 ingestion_config,
217 metrics: metrics.clone(),
218 },
219 transport: LiveStreamingOnly {
220 message_id_counter: 0,
221 ingestion_tx: tasks.ingestion_tx,
222 control_tx: tasks.control_tx,
223 ingestion_task: tasks.ingestion,
224 metrics_streaming: tasks.metrics_streaming,
225 flows_seen: HashSet::new(),
226 metrics,
227 },
228 run,
229 sift_stream_id,
230 })
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::stream::tasks::DataMessage;
238 use tokio::sync::broadcast;
239
240 fn make_request() -> IngestWithConfigDataStreamRequest {
241 IngestWithConfigDataStreamRequest {
242 ingestion_config_id: uuid::Uuid::new_v4().to_string(),
243 flow: "test_flow".to_string(),
244 timestamp: None,
245 channel_values: vec![],
246 run_id: String::new(),
247 end_stream_on_validation_error: false,
248 organization_id: String::new(),
249 }
250 }
251
252 fn make_live_streaming_only(
253 ingestion_capacity: usize,
254 ) -> (LiveStreamingOnly, async_channel::Receiver<DataMessage>) {
255 let (control_tx, _) = broadcast::channel(10);
256 let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity);
257
258 let transport = LiveStreamingOnly {
259 message_id_counter: 0,
260 ingestion_tx,
261 control_tx,
262 ingestion_task: tokio::spawn(async { Ok(()) }),
263 metrics_streaming: None,
264 flows_seen: HashSet::new(),
265 metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
266 };
267
268 (transport, ingestion_rx)
269 }
270
271 #[tokio::test]
272 async fn test_try_send_returns_full_when_channel_at_capacity() {
273 let (mut transport, _ingestion_rx) = make_live_streaming_only(1);
274 let dummy = DataMessage {
275 message_id: 0,
276 request: Arc::new(make_request()),
277 dropped_for_ingestion: false,
278 };
279 transport.ingestion_tx.try_send(dummy).unwrap();
280
281 let stream_id = uuid::Uuid::new_v4();
282 let req = make_request();
283 let flow = req.flow.clone();
284 let err = transport.try_send(&stream_id, req).unwrap_err();
285 assert!(err.is_full(), "expected Full, got {err}");
286 assert_eq!(err.into_inner().flow, flow);
287 }
288
289 #[tokio::test]
290 async fn test_try_send_closed_returns_closed() {
291 let (mut transport, ingestion_rx) = make_live_streaming_only(10);
292 drop(ingestion_rx);
293 let stream_id = uuid::Uuid::new_v4();
294 let req = make_request();
295 let flow = req.flow.clone();
296 let err = transport.try_send(&stream_id, req).unwrap_err();
297 assert!(err.is_closed(), "expected Closed, got {err}");
298 assert_eq!(err.into_inner().flow, flow);
299 }
300
301 #[tokio::test]
302 async fn test_send_blocks_until_ingestion_space_available() {
303 let (mut transport, ingestion_rx) = make_live_streaming_only(1);
304 let dummy = DataMessage {
305 message_id: 0,
306 request: Arc::new(make_request()),
307 dropped_for_ingestion: false,
308 };
309 transport.ingestion_tx.try_send(dummy).unwrap();
310
311 tokio::spawn(async move {
312 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
313 let _ = ingestion_rx.recv().await;
314 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
315 });
316
317 let stream_id = uuid::Uuid::new_v4();
318 transport.send(&stream_id, make_request()).await.unwrap();
319 }
320
321 #[tokio::test]
322 async fn test_finish_drains_queued_messages_before_exit() {
323 let (control_tx, _) = broadcast::channel(10);
324 let (ingestion_tx, ingestion_rx) = async_channel::bounded::<DataMessage>(5);
325
326 for _ in 0..3 {
327 let msg = DataMessage {
328 message_id: 0,
329 request: Arc::new(make_request()),
330 dropped_for_ingestion: false,
331 };
332 ingestion_tx.try_send(msg).unwrap();
333 }
334
335 let consumer = tokio::spawn(async move {
336 let mut count = 0;
337 while ingestion_rx.recv().await.is_ok() {
338 count += 1;
339 }
340 count
341 });
342
343 let transport = LiveStreamingOnly {
344 message_id_counter: 3,
345 ingestion_tx,
346 control_tx,
347 ingestion_task: tokio::spawn(async { Ok(()) }),
348 metrics_streaming: None,
349 flows_seen: HashSet::new(),
350 metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
351 };
352
353 let stream_id = uuid::Uuid::new_v4();
354 transport.finish(&stream_id).await.unwrap();
355
356 let count = consumer.await.unwrap();
357 assert_eq!(count, 3);
358 }
359
360 #[tokio::test]
361 async fn test_finish_shuts_down_ingestion_task() {
362 let (control_tx, mut control_rx) = broadcast::channel(10);
363 let (ingestion_tx, _ingestion_rx) = async_channel::bounded::<DataMessage>(10);
364
365 let shutdown_task = tokio::spawn(async move {
366 loop {
367 if let Ok(ControlMessage::Shutdown) = control_rx.recv().await {
368 break;
369 }
370 }
371 Ok(())
372 });
373
374 let transport = LiveStreamingOnly {
375 message_id_counter: 0,
376 ingestion_tx,
377 control_tx,
378 ingestion_task: shutdown_task,
379 metrics_streaming: None,
380 flows_seen: HashSet::new(),
381 metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()),
382 };
383
384 let stream_id = uuid::Uuid::new_v4();
385 transport.finish(&stream_id).await.unwrap();
386 }
387
388 #[tokio::test]
389 async fn test_message_id_increments_on_each_send() {
390 let (mut transport, ingestion_rx) = make_live_streaming_only(10);
391 let stream_id = uuid::Uuid::new_v4();
392
393 for _ in 0..5 {
394 transport.send(&stream_id, make_request()).await.unwrap();
395 }
396
397 let mut ids: Vec<u64> = Vec::new();
398 while let Ok(msg) = ingestion_rx.try_recv() {
399 ids.push(msg.message_id);
400 }
401 assert_eq!(ids, vec![0, 1, 2, 3, 4]);
402 }
403
404 #[tokio::test]
405 async fn test_try_send_requests_stops_at_first_full() {
406 let (mut transport, _ingestion_rx) = make_live_streaming_only(1);
407
408 let dummy = DataMessage {
410 message_id: 99,
411 request: Arc::new(make_request()),
412 dropped_for_ingestion: false,
413 };
414 transport.ingestion_tx.try_send(dummy).unwrap();
415
416 let stream_id = uuid::Uuid::new_v4();
417 let reqs = vec![make_request(), make_request(), make_request()];
418 let err = transport.try_send_requests(&stream_id, reqs).unwrap_err();
419 assert!(err.is_full(), "expected Full, got {err}");
420 assert_eq!(err.into_inner().len(), 3);
421 }
422
423 #[tokio::test]
424 async fn test_try_send_requests_stops_at_first_closed() {
425 let (mut transport, ingestion_rx) = make_live_streaming_only(10);
426 drop(ingestion_rx);
427
428 let stream_id = uuid::Uuid::new_v4();
429 let reqs = vec![make_request(), make_request(), make_request()];
430 let err = transport.try_send_requests(&stream_id, reqs).unwrap_err();
431 assert!(err.is_closed(), "expected Closed, got {err}");
432 assert_eq!(err.into_inner().len(), 3);
433 }
434
435 #[tokio::test]
436 async fn test_send_requests_stops_at_first_closed() {
437 let (mut transport, ingestion_rx) = make_live_streaming_only(10);
438 drop(ingestion_rx);
439
440 let stream_id = uuid::Uuid::new_v4();
441 let reqs = vec![make_request(), make_request(), make_request()];
442 let err = transport.send_requests(&stream_id, reqs).await.unwrap_err();
443 assert_eq!(err.into_inner().len(), 3);
445 }
446}