Skip to main content

rsigma_runtime/io/
mod.rs

1mod delivery;
2mod file;
3#[cfg(feature = "nats")]
4pub mod nats_config;
5#[cfg(feature = "nats")]
6mod nats_sink;
7#[cfg(feature = "nats")]
8mod nats_source;
9#[cfg(feature = "otlp")]
10pub mod otlp;
11mod stdin;
12mod stdout;
13pub mod webhook;
14
15pub use delivery::{DeliveryConfig, DeliveryFailure, DeliverySink, Dispatcher, OnFull};
16pub use file::FileSink;
17#[cfg(feature = "nats")]
18pub use nats_config::NatsConnectConfig;
19#[cfg(feature = "nats")]
20pub use nats_sink::NatsSink;
21#[cfg(feature = "nats")]
22pub use nats_source::{NatsSource, ReplayPolicy};
23pub use stdin::StdinSource;
24pub use stdout::StdoutSink;
25
26use std::sync::Arc;
27
28use rsigma_eval::ProcessResult;
29
30use crate::error::RuntimeError;
31use crate::metrics::MetricsHook;
32
33/// Opaque acknowledgment handle returned alongside each event.
34///
35/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
36/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
37/// dispatch and mirrors the `Sink` enum pattern.
38pub enum AckToken {
39    /// No acknowledgment needed (stdin, HTTP).
40    Noop,
41    /// NATS JetStream message that must be acked after processing.
42    #[cfg(feature = "nats")]
43    Nats(Box<async_nats::jetstream::Message>),
44}
45
46impl AckToken {
47    /// Acknowledge the event. For NATS, this confirms delivery to the server.
48    pub async fn ack(self) {
49        match self {
50            AckToken::Noop => {}
51            #[cfg(feature = "nats")]
52            AckToken::Nats(msg) => {
53                if let Err(e) = msg.ack().await {
54                    tracing::warn!(error = %e, "Failed to ack NATS message");
55                }
56            }
57        }
58    }
59
60    /// Extract the NATS JetStream stream sequence and published timestamp.
61    ///
62    /// Returns `None` for non-NATS tokens or if message info parsing fails.
63    /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
64    #[cfg(feature = "nats")]
65    pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
66        match self {
67            AckToken::Noop => None,
68            AckToken::Nats(msg) => msg
69                .info()
70                .ok()
71                .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
72        }
73    }
74}
75
76/// An event payload bundled with its acknowledgment token.
77///
78/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
79/// and forwards `ack_token` through the pipeline so it can be acked after the
80/// sink successfully delivers.
81pub struct RawEvent {
82    pub payload: String,
83    pub ack_token: AckToken,
84}
85
86/// Contract for event input adapters.
87///
88/// Each source reads events from a specific input (stdin, HTTP, NATS) and
89/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
90/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
91/// without object-safety concerns.
92pub trait EventSource: Send + 'static {
93    /// Receive the next event with its ack token.
94    /// Returns `None` when the source is exhausted or shutting down.
95    fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
96}
97
98/// Enum dispatch for output adapters.
99///
100/// Uses enum dispatch instead of `dyn Trait` because:
101/// - Async trait methods are not object-safe
102/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
103pub enum Sink {
104    /// Write NDJSON to stdout.
105    Stdout(StdoutSink),
106    /// Append NDJSON to a file.
107    File(FileSink),
108    /// Publish NDJSON to a NATS JetStream subject.
109    #[cfg(feature = "nats")]
110    Nats(Box<NatsSink>),
111    /// Export results to an OpenTelemetry collector via OTLP.
112    #[cfg(feature = "otlp")]
113    Otlp(Box<otlp::OtlpSink>),
114    /// Render and POST a templated HTTP request per result.
115    Webhook(Box<webhook::WebhookSink>),
116    /// Fan out to multiple sinks.
117    FanOut(Vec<Sink>),
118}
119
120impl Sink {
121    /// Serialize and deliver a ProcessResult to this sink.
122    ///
123    /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
124    /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
125    /// recursive async.
126    pub fn send<'a>(
127        &'a mut self,
128        result: &'a ProcessResult,
129    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
130    {
131        Box::pin(async move {
132            match self {
133                Sink::Stdout(s) => {
134                    let s = &*s;
135                    let result = result;
136                    tokio::task::block_in_place(|| s.send(result))
137                }
138                Sink::File(s) => {
139                    let s = &mut *s;
140                    let result = result;
141                    tokio::task::block_in_place(|| s.send(result))
142                }
143                #[cfg(feature = "nats")]
144                Sink::Nats(s) => s.send(result).await,
145                #[cfg(feature = "otlp")]
146                Sink::Otlp(s) => s.send(result).await,
147                Sink::Webhook(s) => s.send(result).await,
148                Sink::FanOut(sinks) => {
149                    for (idx, sink) in sinks.iter_mut().enumerate() {
150                        if let Err(e) = sink.send(result).await {
151                            tracing::warn!(
152                                sink_index = idx,
153                                sink_type = sink.kind_label(),
154                                error = %e,
155                                "Fan-out child sink failed",
156                            );
157                            return Err(e);
158                        }
159                    }
160                    Ok(())
161                }
162            }
163        })
164    }
165
166    /// Write a pre-serialized JSON string directly to this sink.
167    pub fn send_raw<'a>(
168        &'a mut self,
169        json: &'a str,
170    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
171    {
172        Box::pin(async move {
173            match self {
174                Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
175                Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
176                #[cfg(feature = "nats")]
177                Sink::Nats(s) => s.send_raw(json).await,
178                #[cfg(feature = "otlp")]
179                Sink::Otlp(s) => s.send_raw(json).await,
180                // A webhook renders from structured results, not a
181                // pre-serialized JSON line, so a raw send is a no-op. The
182                // delivery path always uses `send`, so this is never hit on
183                // the webhook hot path.
184                Sink::Webhook(_) => Ok(()),
185                Sink::FanOut(sinks) => {
186                    for (idx, sink) in sinks.iter_mut().enumerate() {
187                        if let Err(e) = sink.send_raw(json).await {
188                            tracing::warn!(
189                                sink_index = idx,
190                                sink_type = sink.kind_label(),
191                                error = %e,
192                                "Fan-out child sink failed (raw)",
193                            );
194                            return Err(e);
195                        }
196                    }
197                    Ok(())
198                }
199            }
200        })
201    }
202
203    /// Short label for the sink variant, used in structured logs and per-sink
204    /// delivery metrics.
205    pub(crate) fn kind_label(&self) -> &'static str {
206        match self {
207            Sink::Stdout(_) => "stdout",
208            Sink::File(_) => "file",
209            #[cfg(feature = "nats")]
210            Sink::Nats(_) => "nats",
211            #[cfg(feature = "otlp")]
212            Sink::Otlp(_) => "otlp",
213            // The webhook id (leaked to `&'static`) so its shared per-sink
214            // series maps one-to-one to the `rsigma_webhook_*` series.
215            Sink::Webhook(s) => s.label(),
216            Sink::FanOut(_) => "fanout",
217        }
218    }
219
220    /// Flatten a (possibly nested) `FanOut` into its leaf sinks.
221    ///
222    /// The delivery layer runs one worker per leaf, so fan-out is realized by
223    /// the dispatcher rather than by a `FanOut` variant on the hot path.
224    pub fn into_leaves(self) -> Vec<Sink> {
225        match self {
226            Sink::FanOut(sinks) => sinks.into_iter().flat_map(Sink::into_leaves).collect(),
227            leaf => vec![leaf],
228        }
229    }
230}
231
232/// Spawn an EventSource as a tokio task wired to a shared event channel.
233///
234/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
235/// `event_tx`. When the source is exhausted or the channel is closed,
236/// the task completes. Tracks input queue depth and back-pressure metrics
237/// via the provided `MetricsHook`.
238pub fn spawn_source<S: EventSource>(
239    mut source: S,
240    event_tx: tokio::sync::mpsc::Sender<RawEvent>,
241    metrics: Option<Arc<dyn MetricsHook>>,
242) -> tokio::task::JoinHandle<()> {
243    tokio::spawn(async move {
244        while let Some(raw_event) = source.recv().await {
245            if let Some(ref m) = metrics {
246                match event_tx.try_send(raw_event) {
247                    Ok(()) => {
248                        m.on_input_queue_depth_change(1);
249                    }
250                    Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
251                        m.on_back_pressure();
252                        m.on_input_queue_depth_change(1);
253                        tracing::warn!("Input channel full, backpressure applied");
254                        if event_tx.send(raw_event).await.is_err() {
255                            tracing::debug!("Event channel closed, source shutting down");
256                            break;
257                        }
258                    }
259                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
260                        tracing::debug!("Event channel closed, source shutting down");
261                        break;
262                    }
263                }
264            } else if event_tx.send(raw_event).await.is_err() {
265                tracing::debug!("Event channel closed, source shutting down");
266                break;
267            }
268        }
269    })
270}