rsigma_runtime/io/mod.rs
1mod delivery;
2mod file;
3#[cfg(feature = "nats")]
4pub mod nats_config;
5#[cfg(feature = "nats")]
6mod nats_sink;
7#[cfg(feature = "nats")]
8mod nats_source;
9#[cfg(feature = "otlp")]
10pub mod otlp;
11mod stdin;
12mod stdout;
13#[cfg(all(unix, feature = "uds"))]
14mod unix;
15#[cfg(all(unix, feature = "uds"))]
16mod unix_sink;
17#[cfg(all(unix, feature = "uds"))]
18mod unix_source;
19pub mod webhook;
20
21pub use delivery::{
22 DeliveryConfig, DeliveryContext, DeliveryFailure, DeliverySink, Dispatcher, OnFull,
23};
24pub use file::FileSink;
25#[cfg(feature = "nats")]
26pub use nats_config::NatsConnectConfig;
27#[cfg(feature = "nats")]
28pub use nats_sink::NatsSink;
29#[cfg(feature = "nats")]
30pub use nats_source::{NatsSource, ReplayPolicy};
31pub use stdin::StdinSource;
32pub use stdout::StdoutSink;
33#[cfg(all(unix, feature = "uds"))]
34pub use unix::{UnixSocketGuard, bind_unix_listener, parse_unix_scheme};
35#[cfg(all(unix, feature = "uds"))]
36pub use unix_sink::UnixSocketSink;
37#[cfg(all(unix, feature = "uds"))]
38pub use unix_source::UnixSocketSource;
39
40use std::sync::Arc;
41
42use rsigma_eval::ProcessResult;
43
44use crate::error::RuntimeError;
45use crate::metrics::MetricsHook;
46
47/// Opaque acknowledgment handle returned alongside each event.
48///
49/// For NATS JetStream sources, calling `ack()` confirms message delivery to the
50/// server. For stdin/HTTP sources, ack is a no-op. This enum avoids dynamic
51/// dispatch and mirrors the `Sink` enum pattern.
52pub enum AckToken {
53 /// No acknowledgment needed (stdin, HTTP).
54 Noop,
55 /// NATS JetStream message that must be acked after processing.
56 #[cfg(feature = "nats")]
57 Nats(Box<async_nats::jetstream::Message>),
58}
59
60impl AckToken {
61 /// Acknowledge the event. For NATS, this confirms delivery to the server.
62 pub async fn ack(self) {
63 match self {
64 AckToken::Noop => {}
65 #[cfg(feature = "nats")]
66 AckToken::Nats(msg) => {
67 if let Err(e) = msg.ack().await {
68 tracing::warn!(error = %e, "Failed to ack NATS message");
69 }
70 }
71 }
72 }
73
74 /// Extract the NATS JetStream stream sequence and published timestamp.
75 ///
76 /// Returns `None` for non-NATS tokens or if message info parsing fails.
77 /// The tuple is `(stream_sequence, published_unix_timestamp_secs)`.
78 #[cfg(feature = "nats")]
79 pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
80 match self {
81 AckToken::Noop => None,
82 AckToken::Nats(msg) => msg
83 .info()
84 .ok()
85 .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
86 }
87 }
88}
89
90/// A pre-serialized incident line plus an optional NATS subject override.
91///
92/// The alert pipeline produces structured `IncidentResult`s; the sink task
93/// serializes each to NDJSON and pairs it with the configured subject override
94/// (if any) so the delivery layer can route incidents without depending on the
95/// alert-pipeline types. Delivered via [`Sink::send_incident`].
96pub struct IncidentEnvelope {
97 /// The serialized incident NDJSON line.
98 pub json: String,
99 /// Optional NATS subject override for incident consumers.
100 pub nats_subject: Option<String>,
101}
102
103/// An event payload bundled with its acknowledgment token.
104///
105/// Sources produce `RawEvent`s; the engine extracts `payload` for processing
106/// and forwards `ack_token` through the pipeline so it can be acked after the
107/// sink successfully delivers.
108pub struct RawEvent {
109 pub payload: String,
110 pub ack_token: AckToken,
111}
112
113/// Contract for event input adapters.
114///
115/// Each source reads events from a specific input (stdin, HTTP, NATS) and
116/// yields `RawEvent`s containing the raw payload and an acknowledgment token.
117/// Sources are used as concrete types (not `dyn`), so `async fn` is valid
118/// without object-safety concerns.
119pub trait EventSource: Send + 'static {
120 /// Receive the next event with its ack token.
121 /// Returns `None` when the source is exhausted or shutting down.
122 fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
123}
124
125/// Enum dispatch for output adapters.
126///
127/// Uses enum dispatch instead of `dyn Trait` because:
128/// - Async trait methods are not object-safe
129/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
130pub enum Sink {
131 /// Write NDJSON to stdout.
132 Stdout(StdoutSink),
133 /// Append NDJSON to a file.
134 File(FileSink),
135 /// Publish NDJSON to a NATS JetStream subject.
136 #[cfg(feature = "nats")]
137 Nats(Box<NatsSink>),
138 /// Export results to an OpenTelemetry collector via OTLP.
139 #[cfg(feature = "otlp")]
140 Otlp(Box<otlp::OtlpSink>),
141 /// Render and POST a templated HTTP request per result.
142 Webhook(Box<webhook::WebhookSink>),
143 /// Write NDJSON to a Unix domain socket (client connection).
144 #[cfg(all(unix, feature = "uds"))]
145 Unix(Box<UnixSocketSink>),
146 /// Fan out to multiple sinks.
147 FanOut(Vec<Sink>),
148}
149
150impl Sink {
151 /// Serialize and deliver a ProcessResult to this sink.
152 ///
153 /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
154 /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
155 /// recursive async.
156 pub fn send<'a>(
157 &'a mut self,
158 result: &'a ProcessResult,
159 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
160 {
161 Box::pin(async move {
162 match self {
163 Sink::Stdout(s) => {
164 let s = &*s;
165 let result = result;
166 tokio::task::block_in_place(|| s.send(result))
167 }
168 Sink::File(s) => {
169 let s = &mut *s;
170 let result = result;
171 tokio::task::block_in_place(|| s.send(result))
172 }
173 #[cfg(feature = "nats")]
174 Sink::Nats(s) => s.send(result).await,
175 #[cfg(feature = "otlp")]
176 Sink::Otlp(s) => s.send(result).await,
177 // The delivery layer drives webhooks through `DeliverySink`
178 // with a per-item context; this direct path is a completeness
179 // fallback, so it mints a one-shot context.
180 Sink::Webhook(s) => s.send(result, &DeliveryContext::new()).await,
181 #[cfg(all(unix, feature = "uds"))]
182 Sink::Unix(s) => s.send(result).await,
183 Sink::FanOut(sinks) => {
184 for (idx, sink) in sinks.iter_mut().enumerate() {
185 if let Err(e) = sink.send(result).await {
186 tracing::warn!(
187 sink_index = idx,
188 sink_type = sink.kind_label(),
189 error = %e,
190 "Fan-out child sink failed",
191 );
192 return Err(e);
193 }
194 }
195 Ok(())
196 }
197 }
198 })
199 }
200
201 /// Write a pre-serialized JSON string directly to this sink.
202 pub fn send_raw<'a>(
203 &'a mut self,
204 json: &'a str,
205 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
206 {
207 Box::pin(async move {
208 match self {
209 Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
210 Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
211 #[cfg(feature = "nats")]
212 Sink::Nats(s) => s.send_raw(json).await,
213 #[cfg(feature = "otlp")]
214 Sink::Otlp(s) => s.send_raw(json).await,
215 // A webhook renders from structured results, not a
216 // pre-serialized JSON line, so a raw send is a no-op. The
217 // delivery path always uses `send`, so this is never hit on
218 // the webhook hot path.
219 Sink::Webhook(_) => Ok(()),
220 #[cfg(all(unix, feature = "uds"))]
221 Sink::Unix(s) => s.send_raw(json).await,
222 Sink::FanOut(sinks) => {
223 for (idx, sink) in sinks.iter_mut().enumerate() {
224 if let Err(e) = sink.send_raw(json).await {
225 tracing::warn!(
226 sink_index = idx,
227 sink_type = sink.kind_label(),
228 error = %e,
229 "Fan-out child sink failed (raw)",
230 );
231 return Err(e);
232 }
233 }
234 Ok(())
235 }
236 }
237 })
238 }
239
240 /// Deliver a pre-serialized incident line to this sink.
241 ///
242 /// Stdout/file write the line inline; NATS publishes to the per-incident
243 /// subject override when set, else the sink's configured subject. OTLP and
244 /// webhook sinks no-op, since incidents are not OTLP log records and the
245 /// webhook renderer templates from structured results, not incidents.
246 pub fn send_incident<'a>(
247 &'a mut self,
248 env: &'a IncidentEnvelope,
249 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
250 {
251 Box::pin(async move {
252 match self {
253 Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(&env.json)),
254 Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(&env.json)),
255 #[cfg(feature = "nats")]
256 Sink::Nats(s) => {
257 s.send_incident(&env.json, env.nats_subject.as_deref())
258 .await
259 }
260 #[cfg(feature = "otlp")]
261 Sink::Otlp(_) => Ok(()),
262 Sink::Webhook(_) => Ok(()),
263 #[cfg(all(unix, feature = "uds"))]
264 Sink::Unix(s) => s.send_raw(&env.json).await,
265 Sink::FanOut(sinks) => {
266 for (idx, sink) in sinks.iter_mut().enumerate() {
267 if let Err(e) = sink.send_incident(env).await {
268 tracing::warn!(
269 sink_index = idx,
270 sink_type = sink.kind_label(),
271 error = %e,
272 "Fan-out child sink failed (incident)",
273 );
274 return Err(e);
275 }
276 }
277 Ok(())
278 }
279 }
280 })
281 }
282
283 /// Short label for the sink variant, used in structured logs and per-sink
284 /// delivery metrics.
285 pub(crate) fn kind_label(&self) -> &'static str {
286 match self {
287 Sink::Stdout(_) => "stdout",
288 Sink::File(_) => "file",
289 #[cfg(feature = "nats")]
290 Sink::Nats(_) => "nats",
291 #[cfg(feature = "otlp")]
292 Sink::Otlp(_) => "otlp",
293 // The webhook id (leaked to `&'static`) so its shared per-sink
294 // series maps one-to-one to the `rsigma_webhook_*` series.
295 Sink::Webhook(s) => s.label(),
296 #[cfg(all(unix, feature = "uds"))]
297 Sink::Unix(_) => "unix",
298 Sink::FanOut(_) => "fanout",
299 }
300 }
301
302 /// Flatten a (possibly nested) `FanOut` into its leaf sinks.
303 ///
304 /// The delivery layer runs one worker per leaf, so fan-out is realized by
305 /// the dispatcher rather than by a `FanOut` variant on the hot path.
306 pub fn into_leaves(self) -> Vec<Sink> {
307 match self {
308 Sink::FanOut(sinks) => sinks.into_iter().flat_map(Sink::into_leaves).collect(),
309 leaf => vec![leaf],
310 }
311 }
312}
313
314/// Spawn an EventSource as a tokio task wired to a shared event channel.
315///
316/// The source reads events in a loop via `recv()` and forwards `RawEvent`s to
317/// `event_tx`. When the source is exhausted or the channel is closed,
318/// the task completes. Tracks input queue depth and back-pressure metrics
319/// via the provided `MetricsHook`.
320pub fn spawn_source<S: EventSource>(
321 mut source: S,
322 event_tx: tokio::sync::mpsc::Sender<RawEvent>,
323 metrics: Option<Arc<dyn MetricsHook>>,
324) -> tokio::task::JoinHandle<()> {
325 tokio::spawn(async move {
326 while let Some(raw_event) = source.recv().await {
327 if let Some(ref m) = metrics {
328 match event_tx.try_send(raw_event) {
329 Ok(()) => {
330 m.on_input_queue_depth_change(1);
331 }
332 Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
333 m.on_back_pressure();
334 m.on_input_queue_depth_change(1);
335 tracing::warn!("Input channel full, backpressure applied");
336 if event_tx.send(raw_event).await.is_err() {
337 tracing::debug!("Event channel closed, source shutting down");
338 break;
339 }
340 }
341 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
342 tracing::debug!("Event channel closed, source shutting down");
343 break;
344 }
345 }
346 } else if event_tx.send(raw_event).await.is_err() {
347 tracing::debug!("Event channel closed, source shutting down");
348 break;
349 }
350 }
351 })
352}