Skip to main content

rsigma_runtime/io/
mod.rs

1mod file;
2#[cfg(feature = "nats")]
3mod nats_sink;
4#[cfg(feature = "nats")]
5mod nats_source;
6mod stdin;
7mod stdout;
8
9pub use file::FileSink;
10#[cfg(feature = "nats")]
11pub use nats_sink::NatsSink;
12#[cfg(feature = "nats")]
13pub use nats_source::NatsSource;
14pub use stdin::StdinSource;
15pub use stdout::StdoutSink;
16
17use std::sync::Arc;
18
19use rsigma_eval::ProcessResult;
20
21use crate::error::RuntimeError;
22use crate::metrics::MetricsHook;
23
24/// Contract for event input adapters.
25///
26/// Each source reads events from a specific input (stdin, HTTP, NATS) and
27/// yields raw strings (typically JSON lines). Sources are used as concrete
28/// types (not `dyn`), so `async fn` is valid without object-safety concerns.
29pub trait EventSource: Send + 'static {
30    /// Receive the next event as a raw string.
31    /// Returns `None` when the source is exhausted or shutting down.
32    fn recv(&mut self) -> impl std::future::Future<Output = Option<String>> + Send;
33}
34
35/// Enum dispatch for output adapters.
36///
37/// Uses enum dispatch instead of `dyn Trait` because:
38/// - Async trait methods are not object-safe
39/// - `FanOut(Vec<Sink>)` requires a sized, concrete type
40pub enum Sink {
41    /// Write NDJSON to stdout.
42    Stdout(StdoutSink),
43    /// Append NDJSON to a file.
44    File(FileSink),
45    /// Publish NDJSON to a NATS subject.
46    #[cfg(feature = "nats")]
47    Nats(NatsSink),
48    /// Fan out to multiple sinks.
49    FanOut(Vec<Sink>),
50}
51
52impl Sink {
53    /// Serialize and deliver a ProcessResult to this sink.
54    ///
55    /// Synchronous sinks (Stdout, File) use `block_in_place` to avoid blocking
56    /// the Tokio runtime. Uses `Box::pin` for the FanOut case to handle
57    /// recursive async.
58    pub fn send<'a>(
59        &'a mut self,
60        result: &'a ProcessResult,
61    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
62    {
63        Box::pin(async move {
64            match self {
65                Sink::Stdout(s) => {
66                    let s = &*s;
67                    let result = result;
68                    tokio::task::block_in_place(|| s.send(result))
69                }
70                Sink::File(s) => {
71                    let s = &mut *s;
72                    let result = result;
73                    tokio::task::block_in_place(|| s.send(result))
74                }
75                #[cfg(feature = "nats")]
76                Sink::Nats(s) => s.send(result).await,
77                Sink::FanOut(sinks) => {
78                    for sink in sinks {
79                        sink.send(result).await?;
80                    }
81                    Ok(())
82                }
83            }
84        })
85    }
86}
87
88/// Spawn an EventSource as a tokio task wired to a shared event channel.
89///
90/// The source reads events in a loop via `recv()` and forwards them to
91/// `event_tx`. When the source is exhausted or the channel is closed,
92/// the task completes. Tracks input queue depth and back-pressure metrics
93/// via the provided `MetricsHook`.
94pub fn spawn_source<S: EventSource>(
95    mut source: S,
96    event_tx: tokio::sync::mpsc::Sender<String>,
97    metrics: Option<Arc<dyn MetricsHook>>,
98) -> tokio::task::JoinHandle<()> {
99    tokio::spawn(async move {
100        while let Some(line) = source.recv().await {
101            if let Some(ref m) = metrics {
102                match event_tx.try_send(line) {
103                    Ok(()) => {
104                        m.on_input_queue_depth_change(1);
105                    }
106                    Err(tokio::sync::mpsc::error::TrySendError::Full(line)) => {
107                        m.on_back_pressure();
108                        m.on_input_queue_depth_change(1);
109                        if event_tx.send(line).await.is_err() {
110                            tracing::debug!("Event channel closed, source shutting down");
111                            break;
112                        }
113                    }
114                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
115                        tracing::debug!("Event channel closed, source shutting down");
116                        break;
117                    }
118                }
119            } else if event_tx.send(line).await.is_err() {
120                tracing::debug!("Event channel closed, source shutting down");
121                break;
122            }
123        }
124    })
125}