sift_stream/stream/mod.rs
1use crate::stream::run::{RunSelector, load_run_by_form, load_run_by_id};
2use async_trait::async_trait;
3use sift_connect::SiftChannel;
4use sift_error::prelude::*;
5use sift_rs::runs::v2::Run;
6use uuid::Uuid;
7
8use crate::metrics::SiftStreamMetricsSnapshot;
9
10/// Concerned with building and configuring and instance of [SiftStream].
11pub mod builder;
12
13/// Concerned with constructing values for channels/sensors that get telemetered.
14pub mod channel;
15
16/// Shared helper functions used across stream implementations.
17mod helpers;
18
19/// Implementations for different modes of streaming.
20pub mod mode;
21
22/// Concerned with gRPC retries.
23pub mod retry;
24pub use retry::RetryPolicy;
25
26/// Concerned with accessing or creating runs for [SiftStream]
27pub mod run;
28
29/// Concerned with constructing values of time that make up the time-series sent ot Sift.
30pub mod time;
31
32/// Concerned with validating flows and detecting if changes are being made to an ingestion config
33/// in a manner that isn't backwards compatible.
34pub(crate) mod flow;
35
36/// Task-based architecture for non-blocking SiftStream operations
37pub mod tasks;
38
39/// Error types returned by [`Transport`] send methods.
40pub mod send_error;
41pub use send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError};
42
43#[cfg(test)]
44mod test;
45
46/// Provides a point-in-time snapshot of stream metrics.
47///
48/// Implemented by [`IngestionConfigEncoder`](crate::IngestionConfigEncoder). Snapshots are
49/// non-blocking and do not affect stream operation. Obtain one via
50/// [`SiftStream::get_metrics_snapshot`].
51pub trait MetricsSnapshot: private::Sealed {
52 fn snapshot(&self) -> SiftStreamMetricsSnapshot;
53}
54
55/// Implemented by types that can be encoded and sent via [`SiftStream::send`].
56///
57/// The two concrete implementations are [`Flow`](crate::mode::ingestion_config::Flow) and
58/// [`FlowBuilder`](crate::flow::FlowBuilder). The associated `Encoder` type links each
59/// encodeable to the specific encoder implementation that processes it — external types cannot
60/// implement this trait because `Encoder` is sealed.
61pub trait Encodeable {
62 type Output: Send + Sync;
63 type Encoder: Encoder<Message = Self::Output>;
64
65 fn encode(
66 self,
67 encoder: &mut Self::Encoder,
68 stream_id: &Uuid,
69 run: Option<&Run>,
70 ) -> Option<Self::Output>;
71}
72
73/// A trait that indicates that a type can be encoded by it.
74///
75/// This trait is used to tie an [`Encoder`] to the [`Encodeable`]s that
76/// it can encode.
77pub trait Encoder: private::Sealed {
78 type Message: Send + Sync;
79}
80
81/// Defines how encoded telemetry messages are delivered to their destination.
82///
83/// Three concrete implementations are provided:
84///
85/// - [`LiveStreamingOnly`](crate::LiveStreamingOnly) — delivers messages to Sift in real-time
86/// over a single bounded ingestion channel. No checkpointing, no disk backups.
87/// - [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups) — delivers messages to Sift
88/// in real-time with periodic checkpointing and disk backups. Uses a dual-channel
89/// architecture; see below.
90/// - [`FileBackup`](crate::FileBackup) — writes messages to rolling disk files without
91/// streaming live to Sift.
92///
93/// ## Send API
94///
95/// Each implementation exposes four send methods that differ in their backpressure behaviour:
96///
97/// | Method | Blocks? | Error on failure |
98/// |---|---|---|
99/// | [`send`](Transport::send) | Yes — awaits until the channel has capacity | [`SendError<T>`] with the undelivered message |
100/// | [`send_requests`](Transport::send_requests) | Yes — per-message backpressure | [`SendError<Vec<T>>`] with all undelivered messages |
101/// | [`try_send`](Transport::try_send) | No — returns immediately | [`TrySendError<T>`] as `Full(T)` or `Closed(T)` |
102/// | [`try_send_requests`](Transport::try_send_requests) | No — fails on first undeliverable message | [`TrySendError<Vec<T>>`] with all undelivered |
103///
104/// In every failure case the undelivered message(s) are returned inside the error variant so
105/// that the caller can decide whether to retry, log, buffer locally, or discard them.
106///
107/// ## Backpressure sources
108///
109/// The channel that applies backpressure to [`send`](Transport::send) differs per mode. Knowing
110/// which channel to tune is important when adjusting capacity via the mode builders:
111///
112/// | Mode | [`send`](Transport::send) awaits on | Capacity setting |
113/// |---|---|---|
114/// | [`LiveStreamingOnly`](crate::LiveStreamingOnly) | ingestion channel | [`ingestion_data_channel_capacity`](crate::LiveOnlyBuilder::ingestion_data_channel_capacity) |
115/// | [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups) | backup channel only — ingestion uses force-send | [`backup_data_channel_capacity`](crate::LiveWithBackupsBuilder::backup_data_channel_capacity) |
116/// | [`FileBackup`](crate::FileBackup) | write channel | [`backup_data_channel_capacity`](crate::FileBackupBuilder::backup_data_channel_capacity) |
117///
118/// ## Channel semantics for `LiveStreamingWithBackups`
119///
120/// `LiveStreamingWithBackups` maintains two internal bounded channels:
121///
122/// - **backup channel** — the primary durability path. [`send`](Transport::send) awaits here.
123/// - **ingestion channel** — forwards messages to the gRPC task using a *force-send* strategy:
124/// when full, the **oldest buffered message is evicted** to make room for the incoming one.
125/// Evicted messages are redirected to the backup channel.
126///
127/// Because of force-send eviction, the message returned inside an error variant from
128/// [`send`](Transport::send) or [`send_requests`](Transport::send_requests) may be an **older
129/// displaced message**, not necessarily the one you just sent.
130///
131/// This trait is sealed: only implementations within this crate are permitted.
132#[async_trait]
133pub trait Transport: private::Sealed {
134 type Message: Send + Sync;
135 type Encoder: Encoder<Message = Self::Message>;
136
137 /// Send a single message with backpressure.
138 ///
139 /// Awaits until the backing channel has capacity, then delivers the message.
140 ///
141 /// # Errors
142 ///
143 /// Returns [`SendError<Self::Message>`] containing a potentially undelivered message.
144 ///
145 /// Depending on the implementation of [`Transport`], the undelivered message is not
146 /// necessarily the message that was provided to the current invocation of [`Self::send`].
147 ///
148 /// See implementation documentation for details.
149 async fn send(
150 &mut self,
151 stream_id: &Uuid,
152 message: Self::Message,
153 ) -> std::result::Result<(), SendError<Self::Message>>;
154
155 /// Send a batch of messages with backpressure.
156 ///
157 /// Awaits channel capacity for each message in turn. Stops on the first failure and
158 /// returns the failed message together with all remaining (not-yet-attempted) messages.
159 ///
160 /// # Errors
161 ///
162 /// Returns [`SendError<Vec<Self::Message>>`] containing potentially undelivered messages.
163 ///
164 /// Depending on the implementation of [`Transport`], the undelivered messages are not
165 /// necessarily the messages that were provided to the current invocation of [`Self::send_requests`].
166 ///
167 /// See implementation documentation for details.
168 async fn send_requests<I>(
169 &mut self,
170 stream_id: &Uuid,
171 requests: I,
172 ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
173 where
174 I: IntoIterator<Item = Self::Message> + Send,
175 I::IntoIter: Send;
176
177 /// Attempt to send a single message without blocking.
178 ///
179 /// Returns immediately regardless of whether the channel has capacity.
180 ///
181 /// # Errors
182 ///
183 /// Returns [`TrySendError<Self::Message>`] containing a potentially undelivered message:
184 /// - [`TrySendError::Full`] — the channel is at capacity; consider retrying with
185 /// [`send`](Transport::send) to apply backpressure instead.
186 /// - [`TrySendError::Closed`] — the channel has been closed.
187 ///
188 /// Depending on the implementation of [`Transport`], the undelivered messages are not
189 /// necessarily the messages that were provided to the current invocation of [`Self::try_send`].
190 ///
191 /// See implementation documentation for details.
192 fn try_send(
193 &mut self,
194 stream_id: &Uuid,
195 message: Self::Message,
196 ) -> std::result::Result<(), TrySendError<Self::Message>>;
197
198 /// Attempt to send a batch of messages without blocking.
199 ///
200 /// Calls [`try_send`](Transport::try_send) for each message in turn. Returns immediately
201 /// on the first failure, bundling the failed message with any remaining unprocessed
202 /// messages.
203 ///
204 /// # Errors
205 ///
206 /// Returns [`TrySendError<Vec<Self::Message>>`] containing potentially undelivered messages.
207 /// - [`TrySendError::Full`] — the channel was at capacity for one of the messages.
208 /// - [`TrySendError::Closed`] — the channel was closed.
209 ///
210 /// Depending on the implementation of [`Transport`], the undelivered messages are not
211 /// necessarily the messages that were provided to the current invocation of [`Self::try_send_requests`].
212 ///
213 /// See implementation documentation for details.
214 fn try_send_requests<I>(
215 &mut self,
216 stream_id: &Uuid,
217 requests: I,
218 ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
219 where
220 I: IntoIterator<Item = Self::Message> + Send,
221 I::IntoIter: Send;
222
223 /// Flush any remaining messages and cleanly shut down the transport.
224 ///
225 /// Must be called when ingestion is complete. Dropping a [`SiftStream`] without
226 /// calling `finish` may result in tail-end data not reaching Sift.
227 async fn finish(self, stream_id: &Uuid) -> Result<()>;
228}
229
230/// Generic wrapper over a telemetry transport that provides a consistent send API regardless
231/// of the underlying mode.
232///
233/// `E` is the encoder (e.g. [`IngestionConfigEncoder`](crate::IngestionConfigEncoder)) and `T`
234/// is the transport (e.g. [`LiveStreamingOnly`](crate::LiveStreamingOnly),
235/// [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups), or
236/// [`FileBackup`](crate::FileBackup)). The available features — checkpointing, retry, disk
237/// backups — depend entirely on the transport mode chosen at build time.
238///
239/// Construct a `SiftStream` via [`SiftStreamBuilder`](builder::SiftStreamBuilder). Refer to the
240/// [crate-level documentation](crate) for mode comparison, examples, and tuning guidance.
241pub struct SiftStream<E, T> {
242 grpc_channel: SiftChannel,
243 encoder: E,
244 transport: T,
245 run: Option<Run>,
246 sift_stream_id: Uuid,
247}
248
249impl<E, T> SiftStream<E, T>
250where
251 E: Encoder + MetricsSnapshot,
252 T: Transport<Encoder = E>,
253{
254 #[cfg(feature = "metrics-unstable")]
255 /// Retrieve a snapshot of the current metrics for this stream.
256 pub fn get_metrics_snapshot(&self) -> SiftStreamMetricsSnapshot {
257 self.encoder.snapshot()
258 }
259
260 /// Attach a run to the stream. Any data provided through [SiftStream::send] after return
261 /// of this function will be associated with the run.
262 pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {
263 let run = match run_selector {
264 RunSelector::ById(run_id) => load_run_by_id(self.grpc_channel.clone(), &run_id).await?,
265 RunSelector::ByForm(run_form) => {
266 load_run_by_form(self.grpc_channel.clone(), run_form).await?
267 }
268 };
269
270 self.run = Some(run);
271
272 Ok(())
273 }
274
275 /// Detach the run, if any, associated with the stream. Any data provided through [SiftStream::send] after
276 /// this function is called will not be associated with a run.
277 pub fn detach_run(&mut self) {
278 self.run = None;
279 }
280
281 /// Retrieves the attached run if it exists.
282 pub fn run(&self) -> Option<&Run> {
283 self.run.as_ref()
284 }
285
286 /// Send telemetry with backpressure.
287 ///
288 /// Encodes `message` and then awaits until the backing channel has capacity. See the
289 /// [`Transport`] implementation for specific details on backpressure.
290 ///
291 /// Use this method when you want the caller to slow down naturally when the pipeline
292 /// is under load. For a non-blocking alternative see [`try_send`](SiftStream::try_send).
293 ///
294 /// # Errors
295 ///
296 /// - [`SiftStreamSendError::EncodeError`] — the message could not be encoded. This
297 /// indicates a schema mismatch or invalid value and is not recoverable by retrying.
298 /// - [`SiftStreamSendError::ChannelClosed`] — the backing channel was closed before the
299 /// message could be delivered. The undelivered message is returned inside the variant.
300 ///
301 /// # Cancellation safety
302 ///
303 /// If the returned future is dropped while waiting for channel capacity, no message is
304 /// lost — either the send completed before the drop, or the channel slot was never taken.
305 pub async fn send<M>(
306 &mut self,
307 message: M,
308 ) -> std::result::Result<(), SiftStreamSendError<<T as Transport>::Message>>
309 where
310 M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
311 {
312 let encoded = message
313 .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
314 .ok_or_else(|| SiftStreamSendError::encode_error("Failed to encode message"))?;
315
316 self.transport
317 .send(&self.sift_stream_id, encoded)
318 .await
319 .map_err(|SendError(msg)| SiftStreamSendError::ChannelClosed(msg))
320 }
321
322 /// Send a batch of pre-encoded requests with backpressure.
323 ///
324 /// Awaits channel capacity for each request in turn. Stops on the first failure and
325 /// returns all undelivered messages (the failing one plus any not yet attempted).
326 ///
327 /// Unlike [`send`](SiftStream::send), this method accepts pre-encoded
328 /// [`Transport::Message`](crate::stream::Transport::Message) values directly, bypassing
329 /// the encode step. Use [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum
330 /// performance.
331 ///
332 /// # Errors
333 ///
334 /// [`SendError<Vec<T>>`] containing every message that was not delivered.
335 pub async fn send_requests<I>(
336 &mut self,
337 requests: I,
338 ) -> std::result::Result<(), SendError<Vec<<T as Transport>::Message>>>
339 where
340 I: IntoIterator<Item = <T as Transport>::Message> + Send,
341 I::IntoIter: Send,
342 {
343 self.transport
344 .send_requests(&self.sift_stream_id, requests)
345 .await
346 }
347
348 /// Attempt to send telemetry without blocking.
349 ///
350 /// Encodes `message` and immediately attempts to place it on the backing channel. Returns
351 /// at once regardless of whether the channel has capacity.
352 ///
353 /// Use this method in tight loops or real-time contexts where blocking is unacceptable.
354 /// For backpressure-aware sending see [`send`](SiftStream::send).
355 ///
356 /// # Errors
357 ///
358 /// - [`SiftStreamTrySendError::EncodeError`] — the message could not be encoded.
359 /// - [`SiftStreamTrySendError::Channel`] wrapping one of:
360 /// - [`TrySendError::Full`] — the backing channel is at capacity; the undelivered
361 /// message is returned. Consider switching to [`send`](SiftStream::send) to apply
362 /// backpressure, or retrying after a short delay.
363 /// - [`TrySendError::Closed`] — the backing channel has been closed; the undelivered
364 /// message is returned.
365 pub fn try_send<M>(
366 &mut self,
367 message: M,
368 ) -> std::result::Result<(), SiftStreamTrySendError<<T as Transport>::Message>>
369 where
370 M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
371 {
372 let encoded = message
373 .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
374 .ok_or_else(|| SiftStreamTrySendError::encode_error("Failed to encode message"))?;
375
376 self.transport
377 .try_send(&self.sift_stream_id, encoded)
378 .map_err(SiftStreamTrySendError::Channel)
379 }
380
381 /// Attempt to send a batch of pre-encoded requests without blocking.
382 ///
383 /// Calls `try_send` on the backing channel for each request. Returns immediately on
384 /// the first failure with every undelivered message (the failing one plus any not yet
385 /// attempted).
386 ///
387 /// Unlike [`try_send`](SiftStream::try_send), this method accepts pre-encoded
388 /// [`Transport::Message`](crate::stream::Transport::Message) values directly. Use
389 /// [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum performance.
390 ///
391 /// # Errors
392 ///
393 /// [`TrySendError<Vec<T>>`] containing every message that was not delivered:
394 /// - [`TrySendError::Full`] — the backing channel was at capacity.
395 /// - [`TrySendError::Closed`] — the backing channel was closed.
396 pub fn try_send_requests<I>(
397 &mut self,
398 requests: I,
399 ) -> std::result::Result<(), TrySendError<Vec<<T as Transport>::Message>>>
400 where
401 I: IntoIterator<Item = <T as Transport>::Message> + Send,
402 I::IntoIter: Send,
403 {
404 self.transport
405 .try_send_requests(&self.sift_stream_id, requests)
406 }
407
408 /// Gracefully finish the stream, draining any remaining data before returning.
409 ///
410 /// It is important to always call this method when you are done sending data and
411 /// before the object is dropped.
412 pub async fn finish(self) -> Result<()> {
413 self.transport.finish(&self.sift_stream_id).await
414 }
415}
416
417impl<E, T> std::ops::Deref for SiftStream<E, T>
418where
419 E: Encoder + MetricsSnapshot,
420 T: Transport<Encoder = E>,
421{
422 type Target = E;
423 fn deref(&self) -> &Self::Target {
424 &self.encoder
425 }
426}
427
428impl<E, T> std::ops::DerefMut for SiftStream<E, T>
429where
430 E: Encoder + MetricsSnapshot,
431 T: Transport<Encoder = E>,
432{
433 fn deref_mut(&mut self) -> &mut Self::Target {
434 &mut self.encoder
435 }
436}
437
438/// Sealed trait to prevent external implementations of `SiftStreamMode`.
439mod private {
440 /// This trait is sealed and cannot be implemented outside this crate.
441 ///
442 /// It is public so it can be used as a supertrait, but the module is private,
443 /// preventing external code from implementing it.
444 pub trait Sealed {}
445}