Skip to main content

sift_stream/stream/
mod.rs

1use crate::stream::run::{RunSelector, load_run_by_form, load_run_by_id};
2use async_trait::async_trait;
3use sift_connect::SiftChannel;
4use sift_error::prelude::*;
5use sift_rs::runs::v2::Run;
6use uuid::Uuid;
7
8use crate::metrics::SiftStreamMetricsSnapshot;
9
10/// Concerned with building and configuring and instance of [SiftStream].
11pub mod builder;
12
13/// Concerned with constructing values for channels/sensors that get telemetered.
14pub mod channel;
15
16/// Shared helper functions used across stream implementations.
17mod helpers;
18
19/// Implementations for different modes of streaming.
20pub mod mode;
21
22/// Concerned with gRPC retries.
23pub mod retry;
24pub use retry::RetryPolicy;
25
26/// Concerned with accessing or creating runs for [SiftStream]
27pub mod run;
28
29/// Concerned with constructing values of time that make up the time-series sent ot Sift.
30pub mod time;
31
32/// Concerned with validating flows and detecting if changes are being made to an ingestion config
33/// in a manner that isn't backwards compatible.
34pub(crate) mod flow;
35
36/// Task-based architecture for non-blocking SiftStream operations
37pub mod tasks;
38
39/// Error types returned by [`Transport`] send methods.
40pub mod send_error;
41pub use send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError};
42
43#[cfg(test)]
44mod test;
45
46/// Provides a point-in-time snapshot of stream metrics.
47///
48/// Implemented by [`IngestionConfigEncoder`](crate::IngestionConfigEncoder). Snapshots are
49/// non-blocking and do not affect stream operation. Obtain one via
50/// [`SiftStream::get_metrics_snapshot`].
51pub trait MetricsSnapshot: private::Sealed {
52    fn snapshot(&self) -> SiftStreamMetricsSnapshot;
53}
54
55/// Implemented by types that can be encoded and sent via [`SiftStream::send`].
56///
57/// The two concrete implementations are [`Flow`](crate::mode::ingestion_config::Flow) and
58/// [`FlowBuilder`](crate::flow::FlowBuilder). The associated `Encoder` type links each
59/// encodeable to the specific encoder implementation that processes it — external types cannot
60/// implement this trait because `Encoder` is sealed.
61pub trait Encodeable {
62    type Output: Send + Sync;
63    type Encoder: Encoder<Message = Self::Output>;
64
65    fn encode(
66        self,
67        encoder: &mut Self::Encoder,
68        stream_id: &Uuid,
69        run: Option<&Run>,
70    ) -> Option<Self::Output>;
71}
72
73/// A trait that indicates that a type can be encoded by it.
74///
75/// This trait is used to tie an [`Encoder`] to the [`Encodeable`]s that
76/// it can encode.
77pub trait Encoder: private::Sealed {
78    type Message: Send + Sync;
79}
80
81/// Defines how encoded telemetry messages are delivered to their destination.
82///
83/// Three concrete implementations are provided:
84///
85/// - [`LiveStreamingOnly`](crate::LiveStreamingOnly) — delivers messages to Sift in real-time
86///   over a single bounded ingestion channel. No checkpointing, no disk backups.
87/// - [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups) — delivers messages to Sift
88///   in real-time with periodic checkpointing and disk backups. Uses a dual-channel
89///   architecture; see below.
90/// - [`FileBackup`](crate::FileBackup) — writes messages to rolling disk files without
91///   streaming live to Sift.
92///
93/// ## Send API
94///
95/// Each implementation exposes four send methods that differ in their backpressure behaviour:
96///
97/// | Method | Blocks? | Error on failure |
98/// |---|---|---|
99/// | [`send`](Transport::send) | Yes — awaits until the channel has capacity | [`SendError<T>`] with the undelivered message |
100/// | [`send_requests`](Transport::send_requests) | Yes — per-message backpressure | [`SendError<Vec<T>>`] with all undelivered messages |
101/// | [`try_send`](Transport::try_send) | No — returns immediately | [`TrySendError<T>`] as `Full(T)` or `Closed(T)` |
102/// | [`try_send_requests`](Transport::try_send_requests) | No — fails on first undeliverable message | [`TrySendError<Vec<T>>`] with all undelivered |
103///
104/// In every failure case the undelivered message(s) are returned inside the error variant so
105/// that the caller can decide whether to retry, log, buffer locally, or discard them.
106///
107/// ## Backpressure sources
108///
109/// The channel that applies backpressure to [`send`](Transport::send) differs per mode. Knowing
110/// which channel to tune is important when adjusting capacity via the mode builders:
111///
112/// | Mode | [`send`](Transport::send) awaits on | Capacity setting |
113/// |---|---|---|
114/// | [`LiveStreamingOnly`](crate::LiveStreamingOnly) | ingestion channel | [`ingestion_data_channel_capacity`](crate::LiveOnlyBuilder::ingestion_data_channel_capacity) |
115/// | [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups) | backup channel only — ingestion uses force-send | [`backup_data_channel_capacity`](crate::LiveWithBackupsBuilder::backup_data_channel_capacity) |
116/// | [`FileBackup`](crate::FileBackup) | write channel | [`backup_data_channel_capacity`](crate::FileBackupBuilder::backup_data_channel_capacity) |
117///
118/// ## Channel semantics for `LiveStreamingWithBackups`
119///
120/// `LiveStreamingWithBackups` maintains two internal bounded channels:
121///
122/// - **backup channel** — the primary durability path. [`send`](Transport::send) awaits here.
123/// - **ingestion channel** — forwards messages to the gRPC task using a *force-send* strategy:
124///   when full, the **oldest buffered message is evicted** to make room for the incoming one.
125///   Evicted messages are redirected to the backup channel.
126///
127/// Because of force-send eviction, the message returned inside an error variant from
128/// [`send`](Transport::send) or [`send_requests`](Transport::send_requests) may be an **older
129/// displaced message**, not necessarily the one you just sent.
130///
131/// This trait is sealed: only implementations within this crate are permitted.
132#[async_trait]
133pub trait Transport: private::Sealed {
134    type Message: Send + Sync;
135    type Encoder: Encoder<Message = Self::Message>;
136
137    /// Send a single message with backpressure.
138    ///
139    /// Awaits until the backing channel has capacity, then delivers the message.
140    ///
141    /// # Errors
142    ///
143    /// Returns [`SendError<Self::Message>`] containing a potentially undelivered message.
144    ///
145    /// Depending on the implementation of [`Transport`], the undelivered message is not
146    /// necessarily the message that was provided to the current invocation of [`Self::send`].
147    ///
148    /// See implementation documentation for details.
149    async fn send(
150        &mut self,
151        stream_id: &Uuid,
152        message: Self::Message,
153    ) -> std::result::Result<(), SendError<Self::Message>>;
154
155    /// Send a batch of messages with backpressure.
156    ///
157    /// Awaits channel capacity for each message in turn. Stops on the first failure and
158    /// returns the failed message together with all remaining (not-yet-attempted) messages.
159    ///
160    /// # Errors
161    ///
162    /// Returns [`SendError<Vec<Self::Message>>`] containing potentially undelivered messages.
163    ///
164    /// Depending on the implementation of [`Transport`], the undelivered messages are not
165    /// necessarily the messages that were provided to the current invocation of [`Self::send_requests`].
166    ///
167    /// See implementation documentation for details.
168    async fn send_requests<I>(
169        &mut self,
170        stream_id: &Uuid,
171        requests: I,
172    ) -> std::result::Result<(), SendError<Vec<Self::Message>>>
173    where
174        I: IntoIterator<Item = Self::Message> + Send,
175        I::IntoIter: Send;
176
177    /// Attempt to send a single message without blocking.
178    ///
179    /// Returns immediately regardless of whether the channel has capacity.
180    ///
181    /// # Errors
182    ///
183    /// Returns [`TrySendError<Self::Message>`] containing a potentially undelivered message:
184    /// - [`TrySendError::Full`] — the channel is at capacity; consider retrying with
185    ///   [`send`](Transport::send) to apply backpressure instead.
186    /// - [`TrySendError::Closed`] — the channel has been closed.
187    ///
188    /// Depending on the implementation of [`Transport`], the undelivered messages are not
189    /// necessarily the messages that were provided to the current invocation of [`Self::try_send`].
190    ///
191    /// See implementation documentation for details.
192    fn try_send(
193        &mut self,
194        stream_id: &Uuid,
195        message: Self::Message,
196    ) -> std::result::Result<(), TrySendError<Self::Message>>;
197
198    /// Attempt to send a batch of messages without blocking.
199    ///
200    /// Calls [`try_send`](Transport::try_send) for each message in turn. Returns immediately
201    /// on the first failure, bundling the failed message with any remaining unprocessed
202    /// messages.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`TrySendError<Vec<Self::Message>>`] containing potentially undelivered messages.
207    /// - [`TrySendError::Full`] — the channel was at capacity for one of the messages.
208    /// - [`TrySendError::Closed`] — the channel was closed.
209    ///
210    /// Depending on the implementation of [`Transport`], the undelivered messages are not
211    /// necessarily the messages that were provided to the current invocation of [`Self::try_send_requests`].
212    ///
213    /// See implementation documentation for details.
214    fn try_send_requests<I>(
215        &mut self,
216        stream_id: &Uuid,
217        requests: I,
218    ) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
219    where
220        I: IntoIterator<Item = Self::Message> + Send,
221        I::IntoIter: Send;
222
223    /// Flush any remaining messages and cleanly shut down the transport.
224    ///
225    /// Must be called when ingestion is complete. Dropping a [`SiftStream`] without
226    /// calling `finish` may result in tail-end data not reaching Sift.
227    async fn finish(self, stream_id: &Uuid) -> Result<()>;
228}
229
230/// Generic wrapper over a telemetry transport that provides a consistent send API regardless
231/// of the underlying mode.
232///
233/// `E` is the encoder (e.g. [`IngestionConfigEncoder`](crate::IngestionConfigEncoder)) and `T`
234/// is the transport (e.g. [`LiveStreamingOnly`](crate::LiveStreamingOnly),
235/// [`LiveStreamingWithBackups`](crate::LiveStreamingWithBackups), or
236/// [`FileBackup`](crate::FileBackup)). The available features — checkpointing, retry, disk
237/// backups — depend entirely on the transport mode chosen at build time.
238///
239/// Construct a `SiftStream` via [`SiftStreamBuilder`](builder::SiftStreamBuilder). Refer to the
240/// [crate-level documentation](crate) for mode comparison, examples, and tuning guidance.
241pub struct SiftStream<E, T> {
242    grpc_channel: SiftChannel,
243    encoder: E,
244    transport: T,
245    run: Option<Run>,
246    sift_stream_id: Uuid,
247}
248
249impl<E, T> SiftStream<E, T>
250where
251    E: Encoder + MetricsSnapshot,
252    T: Transport<Encoder = E>,
253{
254    #[cfg(feature = "metrics-unstable")]
255    /// Retrieve a snapshot of the current metrics for this stream.
256    pub fn get_metrics_snapshot(&self) -> SiftStreamMetricsSnapshot {
257        self.encoder.snapshot()
258    }
259
260    /// Attach a run to the stream. Any data provided through [SiftStream::send] after return
261    /// of this function will be associated with the run.
262    pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {
263        let run = match run_selector {
264            RunSelector::ById(run_id) => load_run_by_id(self.grpc_channel.clone(), &run_id).await?,
265            RunSelector::ByForm(run_form) => {
266                load_run_by_form(self.grpc_channel.clone(), run_form).await?
267            }
268        };
269
270        self.run = Some(run);
271
272        Ok(())
273    }
274
275    /// Detach the run, if any, associated with the stream. Any data provided through [SiftStream::send] after
276    /// this function is called will not be associated with a run.
277    pub fn detach_run(&mut self) {
278        self.run = None;
279    }
280
281    /// Retrieves the attached run if it exists.
282    pub fn run(&self) -> Option<&Run> {
283        self.run.as_ref()
284    }
285
286    /// Send telemetry with backpressure.
287    ///
288    /// Encodes `message` and then awaits until the backing channel has capacity. See the
289    /// [`Transport`] implementation for specific details on backpressure.
290    ///
291    /// Use this method when you want the caller to slow down naturally when the pipeline
292    /// is under load. For a non-blocking alternative see [`try_send`](SiftStream::try_send).
293    ///
294    /// # Errors
295    ///
296    /// - [`SiftStreamSendError::EncodeError`] — the message could not be encoded. This
297    ///   indicates a schema mismatch or invalid value and is not recoverable by retrying.
298    /// - [`SiftStreamSendError::ChannelClosed`] — the backing channel was closed before the
299    ///   message could be delivered. The undelivered message is returned inside the variant.
300    ///
301    /// # Cancellation safety
302    ///
303    /// If the returned future is dropped while waiting for channel capacity, no message is
304    /// lost — either the send completed before the drop, or the channel slot was never taken.
305    pub async fn send<M>(
306        &mut self,
307        message: M,
308    ) -> std::result::Result<(), SiftStreamSendError<<T as Transport>::Message>>
309    where
310        M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
311    {
312        let encoded = message
313            .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
314            .ok_or_else(|| SiftStreamSendError::encode_error("Failed to encode message"))?;
315
316        self.transport
317            .send(&self.sift_stream_id, encoded)
318            .await
319            .map_err(|SendError(msg)| SiftStreamSendError::ChannelClosed(msg))
320    }
321
322    /// Send a batch of pre-encoded requests with backpressure.
323    ///
324    /// Awaits channel capacity for each request in turn. Stops on the first failure and
325    /// returns all undelivered messages (the failing one plus any not yet attempted).
326    ///
327    /// Unlike [`send`](SiftStream::send), this method accepts pre-encoded
328    /// [`Transport::Message`](crate::stream::Transport::Message) values directly, bypassing
329    /// the encode step. Use [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum
330    /// performance.
331    ///
332    /// # Errors
333    ///
334    /// [`SendError<Vec<T>>`] containing every message that was not delivered.
335    pub async fn send_requests<I>(
336        &mut self,
337        requests: I,
338    ) -> std::result::Result<(), SendError<Vec<<T as Transport>::Message>>>
339    where
340        I: IntoIterator<Item = <T as Transport>::Message> + Send,
341        I::IntoIter: Send,
342    {
343        self.transport
344            .send_requests(&self.sift_stream_id, requests)
345            .await
346    }
347
348    /// Attempt to send telemetry without blocking.
349    ///
350    /// Encodes `message` and immediately attempts to place it on the backing channel. Returns
351    /// at once regardless of whether the channel has capacity.
352    ///
353    /// Use this method in tight loops or real-time contexts where blocking is unacceptable.
354    /// For backpressure-aware sending see [`send`](SiftStream::send).
355    ///
356    /// # Errors
357    ///
358    /// - [`SiftStreamTrySendError::EncodeError`] — the message could not be encoded.
359    /// - [`SiftStreamTrySendError::Channel`] wrapping one of:
360    ///   - [`TrySendError::Full`] — the backing channel is at capacity; the undelivered
361    ///     message is returned. Consider switching to [`send`](SiftStream::send) to apply
362    ///     backpressure, or retrying after a short delay.
363    ///   - [`TrySendError::Closed`] — the backing channel has been closed; the undelivered
364    ///     message is returned.
365    pub fn try_send<M>(
366        &mut self,
367        message: M,
368    ) -> std::result::Result<(), SiftStreamTrySendError<<T as Transport>::Message>>
369    where
370        M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
371    {
372        let encoded = message
373            .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
374            .ok_or_else(|| SiftStreamTrySendError::encode_error("Failed to encode message"))?;
375
376        self.transport
377            .try_send(&self.sift_stream_id, encoded)
378            .map_err(SiftStreamTrySendError::Channel)
379    }
380
381    /// Attempt to send a batch of pre-encoded requests without blocking.
382    ///
383    /// Calls `try_send` on the backing channel for each request. Returns immediately on
384    /// the first failure with every undelivered message (the failing one plus any not yet
385    /// attempted).
386    ///
387    /// Unlike [`try_send`](SiftStream::try_send), this method accepts pre-encoded
388    /// [`Transport::Message`](crate::stream::Transport::Message) values directly. Use
389    /// [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum performance.
390    ///
391    /// # Errors
392    ///
393    /// [`TrySendError<Vec<T>>`] containing every message that was not delivered:
394    /// - [`TrySendError::Full`] — the backing channel was at capacity.
395    /// - [`TrySendError::Closed`] — the backing channel was closed.
396    pub fn try_send_requests<I>(
397        &mut self,
398        requests: I,
399    ) -> std::result::Result<(), TrySendError<Vec<<T as Transport>::Message>>>
400    where
401        I: IntoIterator<Item = <T as Transport>::Message> + Send,
402        I::IntoIter: Send,
403    {
404        self.transport
405            .try_send_requests(&self.sift_stream_id, requests)
406    }
407
408    /// Gracefully finish the stream, draining any remaining data before returning.
409    ///
410    /// It is important to always call this method when you are done sending data and
411    /// before the object is dropped.
412    pub async fn finish(self) -> Result<()> {
413        self.transport.finish(&self.sift_stream_id).await
414    }
415}
416
417impl<E, T> std::ops::Deref for SiftStream<E, T>
418where
419    E: Encoder + MetricsSnapshot,
420    T: Transport<Encoder = E>,
421{
422    type Target = E;
423    fn deref(&self) -> &Self::Target {
424        &self.encoder
425    }
426}
427
428impl<E, T> std::ops::DerefMut for SiftStream<E, T>
429where
430    E: Encoder + MetricsSnapshot,
431    T: Transport<Encoder = E>,
432{
433    fn deref_mut(&mut self) -> &mut Self::Target {
434        &mut self.encoder
435    }
436}
437
438/// Sealed trait to prevent external implementations of `SiftStreamMode`.
439mod private {
440    /// This trait is sealed and cannot be implemented outside this crate.
441    ///
442    /// It is public so it can be used as a supertrait, but the module is private,
443    /// preventing external code from implementing it.
444    pub trait Sealed {}
445}