1mod file;
2#[cfg(feature = "nats")]
3mod nats_sink;
4#[cfg(feature = "nats")]
5mod nats_source;
6mod stdin;
7mod stdout;
8
9pub use file::FileSink;
10#[cfg(feature = "nats")]
11pub use nats_sink::NatsSink;
12#[cfg(feature = "nats")]
13pub use nats_source::NatsSource;
14pub use stdin::StdinSource;
15pub use stdout::StdoutSink;
16
17use std::sync::Arc;
18
19use rsigma_eval::ProcessResult;
20
21use crate::error::RuntimeError;
22use crate::metrics::MetricsHook;
23
24pub trait EventSource: Send + 'static {
30 fn recv(&mut self) -> impl std::future::Future<Output = Option<String>> + Send;
33}
34
35pub enum Sink {
41 Stdout(StdoutSink),
43 File(FileSink),
45 #[cfg(feature = "nats")]
47 Nats(NatsSink),
48 FanOut(Vec<Sink>),
50}
51
52impl Sink {
53 pub fn send<'a>(
59 &'a mut self,
60 result: &'a ProcessResult,
61 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
62 {
63 Box::pin(async move {
64 match self {
65 Sink::Stdout(s) => {
66 let s = &*s;
67 let result = result;
68 tokio::task::block_in_place(|| s.send(result))
69 }
70 Sink::File(s) => {
71 let s = &mut *s;
72 let result = result;
73 tokio::task::block_in_place(|| s.send(result))
74 }
75 #[cfg(feature = "nats")]
76 Sink::Nats(s) => s.send(result).await,
77 Sink::FanOut(sinks) => {
78 for sink in sinks {
79 sink.send(result).await?;
80 }
81 Ok(())
82 }
83 }
84 })
85 }
86}
87
88pub fn spawn_source<S: EventSource>(
95 mut source: S,
96 event_tx: tokio::sync::mpsc::Sender<String>,
97 metrics: Option<Arc<dyn MetricsHook>>,
98) -> tokio::task::JoinHandle<()> {
99 tokio::spawn(async move {
100 while let Some(line) = source.recv().await {
101 if let Some(ref m) = metrics {
102 match event_tx.try_send(line) {
103 Ok(()) => {
104 m.on_input_queue_depth_change(1);
105 }
106 Err(tokio::sync::mpsc::error::TrySendError::Full(line)) => {
107 m.on_back_pressure();
108 m.on_input_queue_depth_change(1);
109 if event_tx.send(line).await.is_err() {
110 tracing::debug!("Event channel closed, source shutting down");
111 break;
112 }
113 }
114 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
115 tracing::debug!("Event channel closed, source shutting down");
116 break;
117 }
118 }
119 } else if event_tx.send(line).await.is_err() {
120 tracing::debug!("Event channel closed, source shutting down");
121 break;
122 }
123 }
124 })
125}