rsigma_runtime/io/mod.rs
1mod delivery;
2mod file;
3#[cfg(feature = "nats")]
4pub mod nats_config;
5#[cfg(feature = "nats")]
6mod nats_sink;
7#[cfg(feature = "nats")]
8mod nats_source;
9#[cfg(feature = "otlp")]
10pub mod otlp;
11mod stdin;
12mod stdout;
13pub mod webhook;
14
15pub use delivery::{DeliveryConfig, DeliveryFailure, DeliverySink, Dispatcher, OnFull};
16pub use file::FileSink;
17#[cfg(feature = "nats")]
18pub use nats_config::NatsConnectConfig;
19#[cfg(feature = "nats")]
20pub use nats_sink::NatsSink;
21#[cfg(feature = "nats")]
22pub use nats_source::{NatsSource, ReplayPolicy};
23pub use stdin::StdinSource;
24pub use stdout::StdoutSink;
25
26use std::sync::Arc;
27
28use rsigma_eval::ProcessResult;
29
30use crate::error::RuntimeError;
31use crate::metrics::MetricsHook;
32
33/// Opaque acknowledgment handle returned alongside each event.
34///
35/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
36/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
37/// dispatch and mirrors the `Sink` enum pattern.
38pub enum AckToken {
39 /// No acknowledgment needed (stdin, HTTP).
40 Noop,
41 /// NATS JetStream message that must be acked after processing.
42 #[cfg(feature = "nats")]
43 Nats(Box<async_nats::jetstream::Message>),
44}
45
46impl AckToken {
47 /// Acknowledge the event. For NATS, this confirms delivery to the server.
48 pub async fn ack(self) {
49 match self {
50 AckToken::Noop => {}
51 #[cfg(feature = "nats")]
52 AckToken::Nats(msg) => {
53 if let Err(e) = msg.ack().await {
54 tracing::warn!(error = %e, "Failed to ack NATS message");
55 }
56 }
57 }
58 }
59
60 /// Extract the NATS JetStream stream sequence and published timestamp.
61 ///
62 /// Returns `None` for non-NATS tokens or if message info parsing fails.
63 /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
64 #[cfg(feature = "nats")]
65 pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
66 match self {
67 AckToken::Noop => None,
68 AckToken::Nats(msg) => msg
69 .info()
70 .ok()
71 .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
72 }
73 }
74}
75
76/// An event payload bundled with its acknowledgment token.
77///
78/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
79/// and forwards `ack_token` through the pipeline so it can be acked after the
80/// sink successfully delivers.
81pub struct RawEvent {
82 pub payload: String,
83 pub ack_token: AckToken,
84}
85
86/// Contract for event input adapters.
87///
88/// Each source reads events from a specific input (stdin, HTTP, NATS) and
89/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
90/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
91/// without object-safety concerns.
92pub trait EventSource: Send + 'static {
93 /// Receive the next event with its ack token.
94 /// Returns `None` when the source is exhausted or shutting down.
95 fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
96}
97
98/// Enum dispatch for output adapters.
99///
100/// Uses enum dispatch instead of `dyn Trait` because:
101/// - Async trait methods are not object-safe
102/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
103pub enum Sink {
104 /// Write NDJSON to stdout.
105 Stdout(StdoutSink),
106 /// Append NDJSON to a file.
107 File(FileSink),
108 /// Publish NDJSON to a NATS JetStream subject.
109 #[cfg(feature = "nats")]
110 Nats(Box<NatsSink>),
111 /// Export results to an OpenTelemetry collector via OTLP.
112 #[cfg(feature = "otlp")]
113 Otlp(Box<otlp::OtlpSink>),
114 /// Render and POST a templated HTTP request per result.
115 Webhook(Box<webhook::WebhookSink>),
116 /// Fan out to multiple sinks.
117 FanOut(Vec<Sink>),
118}
119
120impl Sink {
121 /// Serialize and deliver a ProcessResult to this sink.
122 ///
123 /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
124 /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
125 /// recursive async.
126 pub fn send<'a>(
127 &'a mut self,
128 result: &'a ProcessResult,
129 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
130 {
131 Box::pin(async move {
132 match self {
133 Sink::Stdout(s) => {
134 let s = &*s;
135 let result = result;
136 tokio::task::block_in_place(|| s.send(result))
137 }
138 Sink::File(s) => {
139 let s = &mut *s;
140 let result = result;
141 tokio::task::block_in_place(|| s.send(result))
142 }
143 #[cfg(feature = "nats")]
144 Sink::Nats(s) => s.send(result).await,
145 #[cfg(feature = "otlp")]
146 Sink::Otlp(s) => s.send(result).await,
147 Sink::Webhook(s) => s.send(result).await,
148 Sink::FanOut(sinks) => {
149 for (idx, sink) in sinks.iter_mut().enumerate() {
150 if let Err(e) = sink.send(result).await {
151 tracing::warn!(
152 sink_index = idx,
153 sink_type = sink.kind_label(),
154 error = %e,
155 "Fan-out child sink failed",
156 );
157 return Err(e);
158 }
159 }
160 Ok(())
161 }
162 }
163 })
164 }
165
166 /// Write a pre-serialized JSON string directly to this sink.
167 pub fn send_raw<'a>(
168 &'a mut self,
169 json: &'a str,
170 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
171 {
172 Box::pin(async move {
173 match self {
174 Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
175 Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
176 #[cfg(feature = "nats")]
177 Sink::Nats(s) => s.send_raw(json).await,
178 #[cfg(feature = "otlp")]
179 Sink::Otlp(s) => s.send_raw(json).await,
180 // A webhook renders from structured results, not a
181 // pre-serialized JSON line, so a raw send is a no-op. The
182 // delivery path always uses `send`, so this is never hit on
183 // the webhook hot path.
184 Sink::Webhook(_) => Ok(()),
185 Sink::FanOut(sinks) => {
186 for (idx, sink) in sinks.iter_mut().enumerate() {
187 if let Err(e) = sink.send_raw(json).await {
188 tracing::warn!(
189 sink_index = idx,
190 sink_type = sink.kind_label(),
191 error = %e,
192 "Fan-out child sink failed (raw)",
193 );
194 return Err(e);
195 }
196 }
197 Ok(())
198 }
199 }
200 })
201 }
202
203 /// Short label for the sink variant, used in structured logs and per-sink
204 /// delivery metrics.
205 pub(crate) fn kind_label(&self) -> &'static str {
206 match self {
207 Sink::Stdout(_) => "stdout",
208 Sink::File(_) => "file",
209 #[cfg(feature = "nats")]
210 Sink::Nats(_) => "nats",
211 #[cfg(feature = "otlp")]
212 Sink::Otlp(_) => "otlp",
213 // The webhook id (leaked to `&'static`) so its shared per-sink
214 // series maps one-to-one to the `rsigma_webhook_*` series.
215 Sink::Webhook(s) => s.label(),
216 Sink::FanOut(_) => "fanout",
217 }
218 }
219
220 /// Flatten a (possibly nested) `FanOut` into its leaf sinks.
221 ///
222 /// The delivery layer runs one worker per leaf, so fan-out is realized by
223 /// the dispatcher rather than by a `FanOut` variant on the hot path.
224 pub fn into_leaves(self) -> Vec<Sink> {
225 match self {
226 Sink::FanOut(sinks) => sinks.into_iter().flat_map(Sink::into_leaves).collect(),
227 leaf => vec![leaf],
228 }
229 }
230}
231
232/// Spawn an EventSource as a tokio task wired to a shared event channel.
233///
234/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
235/// `event_tx`. When the source is exhausted or the channel is closed,
236/// the task completes. Tracks input queue depth and back-pressure metrics
237/// via the provided `MetricsHook`.
238pub fn spawn_source<S: EventSource>(
239 mut source: S,
240 event_tx: tokio::sync::mpsc::Sender<RawEvent>,
241 metrics: Option<Arc<dyn MetricsHook>>,
242) -> tokio::task::JoinHandle<()> {
243 tokio::spawn(async move {
244 while let Some(raw_event) = source.recv().await {
245 if let Some(ref m) = metrics {
246 match event_tx.try_send(raw_event) {
247 Ok(()) => {
248 m.on_input_queue_depth_change(1);
249 }
250 Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
251 m.on_back_pressure();
252 m.on_input_queue_depth_change(1);
253 tracing::warn!("Input channel full, backpressure applied");
254 if event_tx.send(raw_event).await.is_err() {
255 tracing::debug!("Event channel closed, source shutting down");
256 break;
257 }
258 }
259 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
260 tracing::debug!("Event channel closed, source shutting down");
261 break;
262 }
263 }
264 } else if event_tx.send(raw_event).await.is_err() {
265 tracing::debug!("Event channel closed, source shutting down");
266 break;
267 }
268 }
269 })
270}