Skip to main content

rsigma_runtime/io/
mod.rs

1mod file;
2#[cfg(feature = "nats")]
3pub mod nats_config;
4#[cfg(feature = "nats")]
5mod nats_sink;
6#[cfg(feature = "nats")]
7mod nats_source;
8#[cfg(feature = "otlp")]
9pub mod otlp;
10mod stdin;
11mod stdout;
12
13pub use file::FileSink;
14#[cfg(feature = "nats")]
15pub use nats_config::NatsConnectConfig;
16#[cfg(feature = "nats")]
17pub use nats_sink::NatsSink;
18#[cfg(feature = "nats")]
19pub use nats_source::{NatsSource, ReplayPolicy};
20pub use stdin::StdinSource;
21pub use stdout::StdoutSink;
22
23use std::sync::Arc;
24
25use rsigma_eval::ProcessResult;
26
27use crate::error::RuntimeError;
28use crate::metrics::MetricsHook;
29
30/// Opaque acknowledgment handle returned alongside each event.
31///
32/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
33/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
34/// dispatch and mirrors the `Sink` enum pattern.
35pub enum AckToken {
36    /// No acknowledgment needed (stdin, HTTP).
37    Noop,
38    /// NATS JetStream message that must be acked after processing.
39    #[cfg(feature = "nats")]
40    Nats(Box<async_nats::jetstream::Message>),
41}
42
43impl AckToken {
44    /// Acknowledge the event. For NATS, this confirms delivery to the server.
45    pub async fn ack(self) {
46        match self {
47            AckToken::Noop => {}
48            #[cfg(feature = "nats")]
49            AckToken::Nats(msg) => {
50                if let Err(e) = msg.ack().await {
51                    tracing::warn!(error = %e, "Failed to ack NATS message");
52                }
53            }
54        }
55    }
56
57    /// Extract the NATS JetStream stream sequence and published timestamp.
58    ///
59    /// Returns `None` for non-NATS tokens or if message info parsing fails.
60    /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
61    #[cfg(feature = "nats")]
62    pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
63        match self {
64            AckToken::Noop => None,
65            AckToken::Nats(msg) => msg
66                .info()
67                .ok()
68                .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
69        }
70    }
71}
72
73/// An event payload bundled with its acknowledgment token.
74///
75/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
76/// and forwards `ack_token` through the pipeline so it can be acked after the
77/// sink successfully delivers.
78pub struct RawEvent {
79    pub payload: String,
80    pub ack_token: AckToken,
81}
82
83/// Contract for event input adapters.
84///
85/// Each source reads events from a specific input (stdin, HTTP, NATS) and
86/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
87/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
88/// without object-safety concerns.
89pub trait EventSource: Send + 'static {
90    /// Receive the next event with its ack token.
91    /// Returns `None` when the source is exhausted or shutting down.
92    fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
93}
94
95/// Enum dispatch for output adapters.
96///
97/// Uses enum dispatch instead of `dyn Trait` because:
98/// - Async trait methods are not object-safe
99/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
100pub enum Sink {
101    /// Write NDJSON to stdout.
102    Stdout(StdoutSink),
103    /// Append NDJSON to a file.
104    File(FileSink),
105    /// Publish NDJSON to a NATS JetStream subject.
106    #[cfg(feature = "nats")]
107    Nats(Box<NatsSink>),
108    /// Fan out to multiple sinks.
109    FanOut(Vec<Sink>),
110}
111
112impl Sink {
113    /// Serialize and deliver a ProcessResult to this sink.
114    ///
115    /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
116    /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
117    /// recursive async.
118    pub fn send<'a>(
119        &'a mut self,
120        result: &'a ProcessResult,
121    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
122    {
123        Box::pin(async move {
124            match self {
125                Sink::Stdout(s) => {
126                    let s = &*s;
127                    let result = result;
128                    tokio::task::block_in_place(|| s.send(result))
129                }
130                Sink::File(s) => {
131                    let s = &mut *s;
132                    let result = result;
133                    tokio::task::block_in_place(|| s.send(result))
134                }
135                #[cfg(feature = "nats")]
136                Sink::Nats(s) => s.send(result).await,
137                Sink::FanOut(sinks) => {
138                    for (idx, sink) in sinks.iter_mut().enumerate() {
139                        if let Err(e) = sink.send(result).await {
140                            tracing::warn!(
141                                sink_index = idx,
142                                sink_type = sink.kind_label(),
143                                error = %e,
144                                "Fan-out child sink failed",
145                            );
146                            return Err(e);
147                        }
148                    }
149                    Ok(())
150                }
151            }
152        })
153    }
154
155    /// Write a pre-serialized JSON string directly to this sink.
156    pub fn send_raw<'a>(
157        &'a mut self,
158        json: &'a str,
159    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
160    {
161        Box::pin(async move {
162            match self {
163                Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
164                Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
165                #[cfg(feature = "nats")]
166                Sink::Nats(s) => s.send_raw(json).await,
167                Sink::FanOut(sinks) => {
168                    for (idx, sink) in sinks.iter_mut().enumerate() {
169                        if let Err(e) = sink.send_raw(json).await {
170                            tracing::warn!(
171                                sink_index = idx,
172                                sink_type = sink.kind_label(),
173                                error = %e,
174                                "Fan-out child sink failed (raw)",
175                            );
176                            return Err(e);
177                        }
178                    }
179                    Ok(())
180                }
181            }
182        })
183    }
184
185    /// Short label for the sink variant, used in structured logs.
186    fn kind_label(&self) -> &'static str {
187        match self {
188            Sink::Stdout(_) => "stdout",
189            Sink::File(_) => "file",
190            #[cfg(feature = "nats")]
191            Sink::Nats(_) => "nats",
192            Sink::FanOut(_) => "fanout",
193        }
194    }
195}
196
197/// Spawn an EventSource as a tokio task wired to a shared event channel.
198///
199/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
200/// `event_tx`. When the source is exhausted or the channel is closed,
201/// the task completes. Tracks input queue depth and back-pressure metrics
202/// via the provided `MetricsHook`.
203pub fn spawn_source<S: EventSource>(
204    mut source: S,
205    event_tx: tokio::sync::mpsc::Sender<RawEvent>,
206    metrics: Option<Arc<dyn MetricsHook>>,
207) -> tokio::task::JoinHandle<()> {
208    tokio::spawn(async move {
209        while let Some(raw_event) = source.recv().await {
210            if let Some(ref m) = metrics {
211                match event_tx.try_send(raw_event) {
212                    Ok(()) => {
213                        m.on_input_queue_depth_change(1);
214                    }
215                    Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
216                        m.on_back_pressure();
217                        m.on_input_queue_depth_change(1);
218                        tracing::warn!("Input channel full, backpressure applied");
219                        if event_tx.send(raw_event).await.is_err() {
220                            tracing::debug!("Event channel closed, source shutting down");
221                            break;
222                        }
223                    }
224                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
225                        tracing::debug!("Event channel closed, source shutting down");
226                        break;
227                    }
228                }
229            } else if event_tx.send(raw_event).await.is_err() {
230                tracing::debug!("Event channel closed, source shutting down");
231                break;
232            }
233        }
234    })
235}