1mod file;
2#[cfg(feature = "nats")]
3pub mod nats_config;
4#[cfg(feature = "nats")]
5mod nats_sink;
6#[cfg(feature = "nats")]
7mod nats_source;
8#[cfg(feature = "otlp")]
9pub mod otlp;
10mod stdin;
11mod stdout;
12
13pub use file::FileSink;
14#[cfg(feature = "nats")]
15pub use nats_config::NatsConnectConfig;
16#[cfg(feature = "nats")]
17pub use nats_sink::NatsSink;
18#[cfg(feature = "nats")]
19pub use nats_source::{NatsSource, ReplayPolicy};
20pub use stdin::StdinSource;
21pub use stdout::StdoutSink;
22
23use std::sync::Arc;
24
25use rsigma_eval::ProcessResult;
26
27use crate::error::RuntimeError;
28use crate::metrics::MetricsHook;
29
30pub enum AckToken {
36 Noop,
38 #[cfg(feature = "nats")]
40 Nats(Box<async_nats::jetstream::Message>),
41}
42
43impl AckToken {
44 pub async fn ack(self) {
46 match self {
47 AckToken::Noop => {}
48 #[cfg(feature = "nats")]
49 AckToken::Nats(msg) => {
50 if let Err(e) = msg.ack().await {
51 tracing::warn!(error = %e, "Failed to ack NATS message");
52 }
53 }
54 }
55 }
56
57 #[cfg(feature = "nats")]
62 pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
63 match self {
64 AckToken::Noop => None,
65 AckToken::Nats(msg) => msg
66 .info()
67 .ok()
68 .map(|info| (info.stream_sequence, info.published.unix_timestamp())),
69 }
70 }
71}
72
73pub struct RawEvent {
79 pub payload: String,
80 pub ack_token: AckToken,
81}
82
83pub trait EventSource: Send + 'static {
90 fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
93}
94
95pub enum Sink {
101 Stdout(StdoutSink),
103 File(FileSink),
105 #[cfg(feature = "nats")]
107 Nats(Box<NatsSink>),
108 FanOut(Vec<Sink>),
110}
111
112impl Sink {
113 pub fn send<'a>(
119 &'a mut self,
120 result: &'a ProcessResult,
121 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
122 {
123 Box::pin(async move {
124 match self {
125 Sink::Stdout(s) => {
126 let s = &*s;
127 let result = result;
128 tokio::task::block_in_place(|| s.send(result))
129 }
130 Sink::File(s) => {
131 let s = &mut *s;
132 let result = result;
133 tokio::task::block_in_place(|| s.send(result))
134 }
135 #[cfg(feature = "nats")]
136 Sink::Nats(s) => s.send(result).await,
137 Sink::FanOut(sinks) => {
138 for (idx, sink) in sinks.iter_mut().enumerate() {
139 if let Err(e) = sink.send(result).await {
140 tracing::warn!(
141 sink_index = idx,
142 sink_type = sink.kind_label(),
143 error = %e,
144 "Fan-out child sink failed",
145 );
146 return Err(e);
147 }
148 }
149 Ok(())
150 }
151 }
152 })
153 }
154
155 pub fn send_raw<'a>(
157 &'a mut self,
158 json: &'a str,
159 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
160 {
161 Box::pin(async move {
162 match self {
163 Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
164 Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
165 #[cfg(feature = "nats")]
166 Sink::Nats(s) => s.send_raw(json).await,
167 Sink::FanOut(sinks) => {
168 for (idx, sink) in sinks.iter_mut().enumerate() {
169 if let Err(e) = sink.send_raw(json).await {
170 tracing::warn!(
171 sink_index = idx,
172 sink_type = sink.kind_label(),
173 error = %e,
174 "Fan-out child sink failed (raw)",
175 );
176 return Err(e);
177 }
178 }
179 Ok(())
180 }
181 }
182 })
183 }
184
185 fn kind_label(&self) -> &'static str {
187 match self {
188 Sink::Stdout(_) => "stdout",
189 Sink::File(_) => "file",
190 #[cfg(feature = "nats")]
191 Sink::Nats(_) => "nats",
192 Sink::FanOut(_) => "fanout",
193 }
194 }
195}
196
197pub fn spawn_source<S: EventSource>(
204 mut source: S,
205 event_tx: tokio::sync::mpsc::Sender<RawEvent>,
206 metrics: Option<Arc<dyn MetricsHook>>,
207) -> tokio::task::JoinHandle<()> {
208 tokio::spawn(async move {
209 while let Some(raw_event) = source.recv().await {
210 if let Some(ref m) = metrics {
211 match event_tx.try_send(raw_event) {
212 Ok(()) => {
213 m.on_input_queue_depth_change(1);
214 }
215 Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
216 m.on_back_pressure();
217 m.on_input_queue_depth_change(1);
218 tracing::warn!("Input channel full, backpressure applied");
219 if event_tx.send(raw_event).await.is_err() {
220 tracing::debug!("Event channel closed, source shutting down");
221 break;
222 }
223 }
224 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
225 tracing::debug!("Event channel closed, source shutting down");
226 break;
227 }
228 }
229 } else if event_tx.send(raw_event).await.is_err() {
230 tracing::debug!("Event channel closed, source shutting down");
231 break;
232 }
233 }
234 })
235}