mod file;
#[cfg(feature = "nats")]
mod nats_sink;
#[cfg(feature = "nats")]
mod nats_source;
mod stdin;
mod stdout;
pub use file::FileSink;
#[cfg(feature = "nats")]
pub use nats_sink::NatsSink;
#[cfg(feature = "nats")]
pub use nats_source::NatsSource;
pub use stdin::StdinSource;
pub use stdout::StdoutSink;
use std::sync::Arc;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
use crate::metrics::MetricsHook;
pub trait EventSource: Send + 'static {
fn recv(&mut self) -> impl std::future::Future<Output = Option<String>> + Send;
}
pub enum Sink {
Stdout(StdoutSink),
File(FileSink),
#[cfg(feature = "nats")]
Nats(NatsSink),
FanOut(Vec<Sink>),
}
impl Sink {
pub fn send<'a>(
&'a mut self,
result: &'a ProcessResult,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
{
Box::pin(async move {
match self {
Sink::Stdout(s) => {
let s = &*s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
Sink::File(s) => {
let s = &mut *s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
#[cfg(feature = "nats")]
Sink::Nats(s) => s.send(result).await,
Sink::FanOut(sinks) => {
for sink in sinks {
sink.send(result).await?;
}
Ok(())
}
}
})
}
}
pub fn spawn_source<S: EventSource>(
mut source: S,
event_tx: tokio::sync::mpsc::Sender<String>,
metrics: Option<Arc<dyn MetricsHook>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(line) = source.recv().await {
if let Some(ref m) = metrics {
match event_tx.try_send(line) {
Ok(()) => {
m.on_input_queue_depth_change(1);
}
Err(tokio::sync::mpsc::error::TrySendError::Full(line)) => {
m.on_back_pressure();
m.on_input_queue_depth_change(1);
if event_tx.send(line).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
} else if event_tx.send(line).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
})
}