Skip to main content

rsigma_runtime/io/
mod.rs

1mod file;
2#[cfg(feature = "nats")]
3pub mod nats_config;
4#[cfg(feature = "nats")]
5mod nats_sink;
6#[cfg(feature = "nats")]
7mod nats_source;
8#[cfg(feature = "otlp")]
9pub mod otlp;
10mod stdin;
11mod stdout;
12
13pub use file::FileSink;
14#[cfg(feature = "nats")]
15pub use nats_config::NatsConnectConfig;
16#[cfg(feature = "nats")]
17pub use nats_sink::NatsSink;
18#[cfg(feature = "nats")]
19pub use nats_source::{NatsSource, ReplayPolicy};
20pub use stdin::StdinSource;
21pub use stdout::StdoutSink;
22
23use std::sync::Arc;
24
25use rsigma_eval::ProcessResult;
26
27use crate::error::RuntimeError;
28use crate::metrics::MetricsHook;
29
30/// Opaque acknowledgment handle returned alongside each event.
31///
32/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
33/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
34/// dispatch and mirrors the `Sink` enum pattern.
35pub enum AckToken {
36    /// No acknowledgment needed (stdin, HTTP).
37    Noop,
38    /// NATS JetStream message that must be acked after processing.
39    #[cfg(feature = "nats")]
40    Nats(Box<async_nats::jetstream::Message>),
41}
42
43impl AckToken {
44    /// Acknowledge the event. For NATS, this confirms delivery to the server.
45    pub async fn ack(self) {
46        match self {
47            AckToken::Noop => {}
48            #[cfg(feature = "nats")]
49            AckToken::Nats(msg) => {
50                if let Err(e) = msg.ack().await {
51                    tracing::warn!(error = %e, "Failed to ack NATS message");
52                }
53            }
54        }
55    }
56
57    /// Extract the NATS JetStream stream sequence and published timestamp.
58    ///
59    /// Returns `None` for non-NATS tokens or if message info parsing fails.
60    /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
61    #[cfg(feature = "nats")]
62    pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
63        match self {
64            AckToken::Noop => None,
65            AckToken::Nats(msg) => msg
66                .info()
67                .ok()
68                .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
69        }
70    }
71}
72
73/// An event payload bundled with its acknowledgment token.
74///
75/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
76/// and forwards `ack_token` through the pipeline so it can be acked after the
77/// sink successfully delivers.
78pub struct RawEvent {
79    pub payload: String,
80    pub ack_token: AckToken,
81}
82
83/// Contract for event input adapters.
84///
85/// Each source reads events from a specific input (stdin, HTTP, NATS) and
86/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
87/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
88/// without object-safety concerns.
89pub trait EventSource: Send + 'static {
90    /// Receive the next event with its ack token.
91    /// Returns `None` when the source is exhausted or shutting down.
92    fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
93}
94
95/// Enum dispatch for output adapters.
96///
97/// Uses enum dispatch instead of `dyn Trait` because:
98/// - Async trait methods are not object-safe
99/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
100pub enum Sink {
101    /// Write NDJSON to stdout.
102    Stdout(StdoutSink),
103    /// Append NDJSON to a file.
104    File(FileSink),
105    /// Publish NDJSON to a NATS JetStream subject.
106    #[cfg(feature = "nats")]
107    Nats(Box<NatsSink>),
108    /// Fan out to multiple sinks.
109    FanOut(Vec<Sink>),
110}
111
112impl Sink {
113    /// Serialize and deliver a ProcessResult to this sink.
114    ///
115    /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
116    /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
117    /// recursive async.
118    pub fn send<'a>(
119        &'a mut self,
120        result: &'a ProcessResult,
121    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
122    {
123        Box::pin(async move {
124            match self {
125                Sink::Stdout(s) => {
126                    let s = &*s;
127                    let result = result;
128                    tokio::task::block_in_place(|| s.send(result))
129                }
130                Sink::File(s) => {
131                    let s = &mut *s;
132                    let result = result;
133                    tokio::task::block_in_place(|| s.send(result))
134                }
135                #[cfg(feature = "nats")]
136                Sink::Nats(s) => s.send(result).await,
137                Sink::FanOut(sinks) => {
138                    for sink in sinks {
139                        sink.send(result).await?;
140                    }
141                    Ok(())
142                }
143            }
144        })
145    }
146
147    /// Write a pre-serialized JSON string directly to this sink.
148    pub fn send_raw<'a>(
149        &'a mut self,
150        json: &'a str,
151    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
152    {
153        Box::pin(async move {
154            match self {
155                Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
156                Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
157                #[cfg(feature = "nats")]
158                Sink::Nats(s) => s.send_raw(json).await,
159                Sink::FanOut(sinks) => {
160                    for sink in sinks {
161                        sink.send_raw(json).await?;
162                    }
163                    Ok(())
164                }
165            }
166        })
167    }
168}
169
170/// Spawn an EventSource as a tokio task wired to a shared event channel.
171///
172/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
173/// `event_tx`. When the source is exhausted or the channel is closed,
174/// the task completes. Tracks input queue depth and back-pressure metrics
175/// via the provided `MetricsHook`.
176pub fn spawn_source<S: EventSource>(
177    mut source: S,
178    event_tx: tokio::sync::mpsc::Sender<RawEvent>,
179    metrics: Option<Arc<dyn MetricsHook>>,
180) -> tokio::task::JoinHandle<()> {
181    tokio::spawn(async move {
182        while let Some(raw_event) = source.recv().await {
183            if let Some(ref m) = metrics {
184                match event_tx.try_send(raw_event) {
185                    Ok(()) => {
186                        m.on_input_queue_depth_change(1);
187                    }
188                    Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
189                        m.on_back_pressure();
190                        m.on_input_queue_depth_change(1);
191                        if event_tx.send(raw_event).await.is_err() {
192                            tracing::debug!("Event channel closed, source shutting down");
193                            break;
194                        }
195                    }
196                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
197                        tracing::debug!("Event channel closed, source shutting down");
198                        break;
199                    }
200                }
201            } else if event_tx.send(raw_event).await.is_err() {
202                tracing::debug!("Event channel closed, source shutting down");
203                break;
204            }
205        }
206    })
207}