1mod file;
2#[cfg(feature = "nats")]
3pub mod nats_config;
4#[cfg(feature = "nats")]
5mod nats_sink;
6#[cfg(feature = "nats")]
7mod nats_source;
8#[cfg(feature = "otlp")]
9pub mod otlp;
10mod stdin;
11mod stdout;
12
13pub use file::FileSink;
14#[cfg(feature = "nats")]
15pub use nats_config::NatsConnectConfig;
16#[cfg(feature = "nats")]
17pub use nats_sink::NatsSink;
18#[cfg(feature = "nats")]
19pub use nats_source::{NatsSource, ReplayPolicy};
20pub use stdin::StdinSource;
21pub use stdout::StdoutSink;
22
23use std::sync::Arc;
24
25use rsigma_eval::ProcessResult;
26
27use crate::error::RuntimeError;
28use crate::metrics::MetricsHook;
29
30pub enum AckToken {
36 Noop,
38 #[cfg(feature = "nats")]
40 Nats(Box<async_nats::jetstream::Message>),
41}
42
43impl AckToken {
44 pub async fn ack(self) {
46 match self {
47 AckToken::Noop => {}
48 #[cfg(feature = "nats")]
49 AckToken::Nats(msg) => {
50 if let Err(e) = msg.ack().await {
51 tracing::warn!(error = %e, "Failed to ack NATS message");
52 }
53 }
54 }
55 }
56
57 #[cfg(feature = "nats")]
62 pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
63 match self {
64 AckToken::Noop => None,
65 AckToken::Nats(msg) => msg
66 .info()
67 .ok()
68 .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
69 }
70 }
71}
72
73pub struct RawEvent {
79 pub payload: String,
80 pub ack_token: AckToken,
81}
82
83pub trait EventSource: Send + 'static {
90 fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
93}
94
95pub enum Sink {
101 Stdout(StdoutSink),
103 File(FileSink),
105 #[cfg(feature = "nats")]
107 Nats(Box<NatsSink>),
108 FanOut(Vec<Sink>),
110}
111
112impl Sink {
113 pub fn send<'a>(
119 &'a mut self,
120 result: &'a ProcessResult,
121 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
122 {
123 Box::pin(async move {
124 match self {
125 Sink::Stdout(s) => {
126 let s = &*s;
127 let result = result;
128 tokio::task::block_in_place(|| s.send(result))
129 }
130 Sink::File(s) => {
131 let s = &mut *s;
132 let result = result;
133 tokio::task::block_in_place(|| s.send(result))
134 }
135 #[cfg(feature = "nats")]
136 Sink::Nats(s) => s.send(result).await,
137 Sink::FanOut(sinks) => {
138 for sink in sinks {
139 sink.send(result).await?;
140 }
141 Ok(())
142 }
143 }
144 })
145 }
146
147 pub fn send_raw<'a>(
149 &'a mut self,
150 json: &'a str,
151 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
152 {
153 Box::pin(async move {
154 match self {
155 Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
156 Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
157 #[cfg(feature = "nats")]
158 Sink::Nats(s) => s.send_raw(json).await,
159 Sink::FanOut(sinks) => {
160 for sink in sinks {
161 sink.send_raw(json).await?;
162 }
163 Ok(())
164 }
165 }
166 })
167 }
168}
169
170pub fn spawn_source<S: EventSource>(
177 mut source: S,
178 event_tx: tokio::sync::mpsc::Sender<RawEvent>,
179 metrics: Option<Arc<dyn MetricsHook>>,
180) -> tokio::task::JoinHandle<()> {
181 tokio::spawn(async move {
182 while let Some(raw_event) = source.recv().await {
183 if let Some(ref m) = metrics {
184 match event_tx.try_send(raw_event) {
185 Ok(()) => {
186 m.on_input_queue_depth_change(1);
187 }
188 Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
189 m.on_back_pressure();
190 m.on_input_queue_depth_change(1);
191 if event_tx.send(raw_event).await.is_err() {
192 tracing::debug!("Event channel closed, source shutting down");
193 break;
194 }
195 }
196 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
197 tracing::debug!("Event channel closed, source shutting down");
198 break;
199 }
200 }
201 } else if event_tx.send(raw_event).await.is_err() {
202 tracing::debug!("Event channel closed, source shutting down");
203 break;
204 }
205 }
206 })
207}