mod file_sink;
#[cfg(feature = "daemon-nats")]
mod nats_sink;
#[cfg(feature = "daemon-nats")]
mod nats_source;
mod stdin_source;
mod stdout_sink;
pub use file_sink::FileSink;
#[cfg(feature = "daemon-nats")]
pub use nats_sink::NatsSink;
#[cfg(feature = "daemon-nats")]
pub use nats_source::NatsSource;
pub use stdin_source::StdinSource;
pub use stdout_sink::StdoutSink;
use rsigma_eval::ProcessResult;
#[derive(Debug)]
pub enum StreamingError {
Io(std::io::Error),
Serialization(serde_json::Error),
}
impl std::fmt::Display for StreamingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamingError::Io(e) => write!(f, "I/O error: {e}"),
StreamingError::Serialization(e) => write!(f, "serialization error: {e}"),
}
}
}
impl std::error::Error for StreamingError {}
impl From<std::io::Error> for StreamingError {
fn from(e: std::io::Error) -> Self {
StreamingError::Io(e)
}
}
impl From<serde_json::Error> for StreamingError {
fn from(e: serde_json::Error) -> Self {
StreamingError::Serialization(e)
}
}
pub trait EventSource: Send + 'static {
fn recv(&mut self) -> impl std::future::Future<Output = Option<String>> + Send;
}
pub enum Sink {
Stdout(StdoutSink),
File(FileSink),
#[cfg(feature = "daemon-nats")]
Nats(NatsSink),
FanOut(Vec<Sink>),
}
impl Sink {
pub fn send<'a>(
&'a mut self,
result: &'a ProcessResult,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), StreamingError>> + Send + 'a>>
{
Box::pin(async move {
match self {
Sink::Stdout(s) => {
let s = &*s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
Sink::File(s) => {
let s = &mut *s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
#[cfg(feature = "daemon-nats")]
Sink::Nats(s) => s.send(result).await,
Sink::FanOut(sinks) => {
for sink in sinks {
sink.send(result).await?;
}
Ok(())
}
}
})
}
}
pub fn spawn_source<S: EventSource>(
mut source: S,
event_tx: tokio::sync::mpsc::Sender<String>,
metrics: Option<crate::daemon::metrics::Metrics>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(line) = source.recv().await {
if let Some(ref m) = metrics {
match event_tx.try_send(line) {
Ok(()) => {
m.input_queue_depth.inc();
}
Err(tokio::sync::mpsc::error::TrySendError::Full(line)) => {
m.back_pressure_events.inc();
m.input_queue_depth.inc();
if event_tx.send(line).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
} else if event_tx.send(line).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
})
}