Skip to main content

rsigma_runtime/io/
mod.rs

1mod delivery;
2mod file;
3#[cfg(feature = "nats")]
4pub mod nats_config;
5#[cfg(feature = "nats")]
6mod nats_sink;
7#[cfg(feature = "nats")]
8mod nats_source;
9#[cfg(feature = "otlp")]
10pub mod otlp;
11mod stdin;
12mod stdout;
13#[cfg(all(unix, feature = "uds"))]
14mod unix;
15#[cfg(all(unix, feature = "uds"))]
16mod unix_sink;
17#[cfg(all(unix, feature = "uds"))]
18mod unix_source;
19pub mod webhook;
20
21pub use delivery::{
22    DeliveryConfig, DeliveryContext, DeliveryFailure, DeliverySink, Dispatcher, OnFull,
23};
24pub use file::FileSink;
25#[cfg(feature = "nats")]
26pub use nats_config::NatsConnectConfig;
27#[cfg(feature = "nats")]
28pub use nats_sink::NatsSink;
29#[cfg(feature = "nats")]
30pub use nats_source::{NatsSource, ReplayPolicy};
31pub use stdin::StdinSource;
32pub use stdout::StdoutSink;
33#[cfg(all(unix, feature = "uds"))]
34pub use unix::{UnixSocketGuard, bind_unix_listener, parse_unix_scheme};
35#[cfg(all(unix, feature = "uds"))]
36pub use unix_sink::UnixSocketSink;
37#[cfg(all(unix, feature = "uds"))]
38pub use unix_source::UnixSocketSource;
39
40use std::sync::Arc;
41
42use rsigma_eval::ProcessResult;
43
44use crate::error::RuntimeError;
45use crate::metrics::MetricsHook;
46
47/// Opaque acknowledgment handle returned alongside each event.
48///
49/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
50/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
51/// dispatch and mirrors the `Sink` enum pattern.
52pub enum AckToken {
53    /// No acknowledgment needed (stdin, HTTP).
54    Noop,
55    /// NATS JetStream message that must be acked after processing.
56    #[cfg(feature = "nats")]
57    Nats(Box<async_nats::jetstream::Message>),
58}
59
60impl AckToken {
61    /// Acknowledge the event. For NATS, this confirms delivery to the server.
62    pub async fn ack(self) {
63        match self {
64            AckToken::Noop => {}
65            #[cfg(feature = "nats")]
66            AckToken::Nats(msg) => {
67                if let Err(e) = msg.ack().await {
68                    tracing::warn!(error = %e, "Failed to ack NATS message");
69                }
70            }
71        }
72    }
73
74    /// Extract the NATS JetStream stream sequence and published timestamp.
75    ///
76    /// Returns `None` for non-NATS tokens or if message info parsing fails.
77    /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
78    #[cfg(feature = "nats")]
79    pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
80        match self {
81            AckToken::Noop => None,
82            AckToken::Nats(msg) => msg
83                .info()
84                .ok()
85                .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
86        }
87    }
88}
89
90/// A pre-serialized incident line plus an optional NATS subject override.
91///
92/// The alert pipeline produces structured `IncidentResult`s; the sink task
93/// serializes each to NDJSON and pairs it with the configured subject override
94/// (if any) so the delivery layer can route incidents without depending on the
95/// alert-pipeline types. Delivered via [`Sink::send_incident`].
96pub struct IncidentEnvelope {
97    /// The serialized incident NDJSON line.
98    pub json: String,
99    /// Optional NATS subject override for incident consumers.
100    pub nats_subject: Option<String>,
101}
102
103/// An event payload bundled with its acknowledgment token.
104///
105/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
106/// and forwards `ack_token` through the pipeline so it can be acked after the
107/// sink successfully delivers.
108pub struct RawEvent {
109    pub payload: String,
110    pub ack_token: AckToken,
111}
112
113/// Contract for event input adapters.
114///
115/// Each source reads events from a specific input (stdin, HTTP, NATS) and
116/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
117/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
118/// without object-safety concerns.
119pub trait EventSource: Send + 'static {
120    /// Receive the next event with its ack token.
121    /// Returns `None` when the source is exhausted or shutting down.
122    fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
123}
124
125/// Enum dispatch for output adapters.
126///
127/// Uses enum dispatch instead of `dyn Trait` because:
128/// - Async trait methods are not object-safe
129/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
130pub enum Sink {
131    /// Write NDJSON to stdout.
132    Stdout(StdoutSink),
133    /// Append NDJSON to a file.
134    File(FileSink),
135    /// Publish NDJSON to a NATS JetStream subject.
136    #[cfg(feature = "nats")]
137    Nats(Box<NatsSink>),
138    /// Export results to an OpenTelemetry collector via OTLP.
139    #[cfg(feature = "otlp")]
140    Otlp(Box<otlp::OtlpSink>),
141    /// Render and POST a templated HTTP request per result.
142    Webhook(Box<webhook::WebhookSink>),
143    /// Write NDJSON to a Unix domain socket (client connection).
144    #[cfg(all(unix, feature = "uds"))]
145    Unix(Box<UnixSocketSink>),
146    /// Fan out to multiple sinks.
147    FanOut(Vec<Sink>),
148}
149
150impl Sink {
151    /// Serialize and deliver a ProcessResult to this sink.
152    ///
153    /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
154    /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
155    /// recursive async.
156    pub fn send<'a>(
157        &'a mut self,
158        result: &'a ProcessResult,
159    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
160    {
161        Box::pin(async move {
162            match self {
163                Sink::Stdout(s) => {
164                    let s = &*s;
165                    let result = result;
166                    tokio::task::block_in_place(|| s.send(result))
167                }
168                Sink::File(s) => {
169                    let s = &mut *s;
170                    let result = result;
171                    tokio::task::block_in_place(|| s.send(result))
172                }
173                #[cfg(feature = "nats")]
174                Sink::Nats(s) => s.send(result).await,
175                #[cfg(feature = "otlp")]
176                Sink::Otlp(s) => s.send(result).await,
177                // The delivery layer drives webhooks through `DeliverySink`
178                // with a per-item context; this direct path is a completeness
179                // fallback, so it mints a one-shot context.
180                Sink::Webhook(s) => s.send(result, &DeliveryContext::new()).await,
181                #[cfg(all(unix, feature = "uds"))]
182                Sink::Unix(s) => s.send(result).await,
183                Sink::FanOut(sinks) => {
184                    for (idx, sink) in sinks.iter_mut().enumerate() {
185                        if let Err(e) = sink.send(result).await {
186                            tracing::warn!(
187                                sink_index = idx,
188                                sink_type = sink.kind_label(),
189                                error = %e,
190                                "Fan-out child sink failed",
191                            );
192                            return Err(e);
193                        }
194                    }
195                    Ok(())
196                }
197            }
198        })
199    }
200
201    /// Write a pre-serialized JSON string directly to this sink.
202    pub fn send_raw<'a>(
203        &'a mut self,
204        json: &'a str,
205    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
206    {
207        Box::pin(async move {
208            match self {
209                Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
210                Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
211                #[cfg(feature = "nats")]
212                Sink::Nats(s) => s.send_raw(json).await,
213                #[cfg(feature = "otlp")]
214                Sink::Otlp(s) => s.send_raw(json).await,
215                // A webhook renders from structured results, not a
216                // pre-serialized JSON line, so a raw send is a no-op. The
217                // delivery path always uses `send`, so this is never hit on
218                // the webhook hot path.
219                Sink::Webhook(_) => Ok(()),
220                #[cfg(all(unix, feature = "uds"))]
221                Sink::Unix(s) => s.send_raw(json).await,
222                Sink::FanOut(sinks) => {
223                    for (idx, sink) in sinks.iter_mut().enumerate() {
224                        if let Err(e) = sink.send_raw(json).await {
225                            tracing::warn!(
226                                sink_index = idx,
227                                sink_type = sink.kind_label(),
228                                error = %e,
229                                "Fan-out child sink failed (raw)",
230                            );
231                            return Err(e);
232                        }
233                    }
234                    Ok(())
235                }
236            }
237        })
238    }
239
240    /// Deliver a pre-serialized incident line to this sink.
241    ///
242    /// Stdout/file write the line inline; NATS publishes to the per-incident
243    /// subject override when set, else the sink's configured subject. OTLP and
244    /// webhook sinks no-op, since incidents are not OTLP log records and the
245    /// webhook renderer templates from structured results, not incidents.
246    pub fn send_incident<'a>(
247        &'a mut self,
248        env: &'a IncidentEnvelope,
249    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
250    {
251        Box::pin(async move {
252            match self {
253                Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(&env.json)),
254                Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(&env.json)),
255                #[cfg(feature = "nats")]
256                Sink::Nats(s) => {
257                    s.send_incident(&env.json, env.nats_subject.as_deref())
258                        .await
259                }
260                #[cfg(feature = "otlp")]
261                Sink::Otlp(_) => Ok(()),
262                Sink::Webhook(_) => Ok(()),
263                #[cfg(all(unix, feature = "uds"))]
264                Sink::Unix(s) => s.send_raw(&env.json).await,
265                Sink::FanOut(sinks) => {
266                    for (idx, sink) in sinks.iter_mut().enumerate() {
267                        if let Err(e) = sink.send_incident(env).await {
268                            tracing::warn!(
269                                sink_index = idx,
270                                sink_type = sink.kind_label(),
271                                error = %e,
272                                "Fan-out child sink failed (incident)",
273                            );
274                            return Err(e);
275                        }
276                    }
277                    Ok(())
278                }
279            }
280        })
281    }
282
283    /// Short label for the sink variant, used in structured logs and per-sink
284    /// delivery metrics.
285    pub(crate) fn kind_label(&self) -> &'static str {
286        match self {
287            Sink::Stdout(_) => "stdout",
288            Sink::File(_) => "file",
289            #[cfg(feature = "nats")]
290            Sink::Nats(_) => "nats",
291            #[cfg(feature = "otlp")]
292            Sink::Otlp(_) => "otlp",
293            // The webhook id (leaked to `&'static`) so its shared per-sink
294            // series maps one-to-one to the `rsigma_webhook_*` series.
295            Sink::Webhook(s) => s.label(),
296            #[cfg(all(unix, feature = "uds"))]
297            Sink::Unix(_) => "unix",
298            Sink::FanOut(_) => "fanout",
299        }
300    }
301
302    /// Flatten a (possibly nested) `FanOut` into its leaf sinks.
303    ///
304    /// The delivery layer runs one worker per leaf, so fan-out is realized by
305    /// the dispatcher rather than by a `FanOut` variant on the hot path.
306    pub fn into_leaves(self) -> Vec<Sink> {
307        match self {
308            Sink::FanOut(sinks) => sinks.into_iter().flat_map(Sink::into_leaves).collect(),
309            leaf => vec![leaf],
310        }
311    }
312}
313
314/// Spawn an EventSource as a tokio task wired to a shared event channel.
315///
316/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
317/// `event_tx`. When the source is exhausted or the channel is closed,
318/// the task completes. Tracks input queue depth and back-pressure metrics
319/// via the provided `MetricsHook`.
320pub fn spawn_source<S: EventSource>(
321    mut source: S,
322    event_tx: tokio::sync::mpsc::Sender<RawEvent>,
323    metrics: Option<Arc<dyn MetricsHook>>,
324) -> tokio::task::JoinHandle<()> {
325    tokio::spawn(async move {
326        while let Some(raw_event) = source.recv().await {
327            if let Some(ref m) = metrics {
328                match event_tx.try_send(raw_event) {
329                    Ok(()) => {
330                        m.on_input_queue_depth_change(1);
331                    }
332                    Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
333                        m.on_back_pressure();
334                        m.on_input_queue_depth_change(1);
335                        tracing::warn!("Input channel full, backpressure applied");
336                        if event_tx.send(raw_event).await.is_err() {
337                            tracing::debug!("Event channel closed, source shutting down");
338                            break;
339                        }
340                    }
341                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
342                        tracing::debug!("Event channel closed, source shutting down");
343                        break;
344                    }
345                }
346            } else if event_tx.send(raw_event).await.is_err() {
347                tracing::debug!("Event channel closed, source shutting down");
348                break;
349            }
350        }
351    })
352}