mod file;
#[cfg(feature = "nats")]
pub mod nats_config;
#[cfg(feature = "nats")]
mod nats_sink;
#[cfg(feature = "nats")]
mod nats_source;
#[cfg(feature = "otlp")]
pub mod otlp;
mod stdin;
mod stdout;
pub use file::FileSink;
#[cfg(feature = "nats")]
pub use nats_config::NatsConnectConfig;
#[cfg(feature = "nats")]
pub use nats_sink::NatsSink;
#[cfg(feature = "nats")]
pub use nats_source::{NatsSource, ReplayPolicy};
pub use stdin::StdinSource;
pub use stdout::StdoutSink;
use std::sync::Arc;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
use crate::metrics::MetricsHook;
pub enum AckToken {
Noop,
#[cfg(feature = "nats")]
Nats(Box<async_nats::jetstream::Message>),
}
impl AckToken {
pub async fn ack(self) {
match self {
AckToken::Noop => {}
#[cfg(feature = "nats")]
AckToken::Nats(msg) => {
if let Err(e) = msg.ack().await {
tracing::warn!(error = %e, "Failed to ack NATS message");
}
}
}
}
#[cfg(feature = "nats")]
pub fn nats_stream_position(&self) -> Option<(u64, i64)> {
match self {
AckToken::Noop => None,
AckToken::Nats(msg) => msg
.info()
.ok()
.map(|info| (info.stream_sequence, info.published.unix_timestamp())),
}
}
}
pub struct RawEvent {
pub payload: String,
pub ack_token: AckToken,
}
pub trait EventSource: Send + 'static {
fn recv(&mut self) -> impl std::future::Future<Output = Option<RawEvent>> + Send;
}
pub enum Sink {
Stdout(StdoutSink),
File(FileSink),
#[cfg(feature = "nats")]
Nats(Box<NatsSink>),
FanOut(Vec<Sink>),
}
impl Sink {
pub fn send<'a>(
&'a mut self,
result: &'a ProcessResult,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
{
Box::pin(async move {
match self {
Sink::Stdout(s) => {
let s = &*s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
Sink::File(s) => {
let s = &mut *s;
let result = result;
tokio::task::block_in_place(|| s.send(result))
}
#[cfg(feature = "nats")]
Sink::Nats(s) => s.send(result).await,
Sink::FanOut(sinks) => {
for (idx, sink) in sinks.iter_mut().enumerate() {
if let Err(e) = sink.send(result).await {
tracing::warn!(
sink_index = idx,
sink_type = sink.kind_label(),
error = %e,
"Fan-out child sink failed",
);
return Err(e);
}
}
Ok(())
}
}
})
}
pub fn send_raw<'a>(
&'a mut self,
json: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RuntimeError>> + Send + 'a>>
{
Box::pin(async move {
match self {
Sink::Stdout(s) => tokio::task::block_in_place(|| s.send_raw(json)),
Sink::File(s) => tokio::task::block_in_place(|| s.send_raw(json)),
#[cfg(feature = "nats")]
Sink::Nats(s) => s.send_raw(json).await,
Sink::FanOut(sinks) => {
for (idx, sink) in sinks.iter_mut().enumerate() {
if let Err(e) = sink.send_raw(json).await {
tracing::warn!(
sink_index = idx,
sink_type = sink.kind_label(),
error = %e,
"Fan-out child sink failed (raw)",
);
return Err(e);
}
}
Ok(())
}
}
})
}
fn kind_label(&self) -> &'static str {
match self {
Sink::Stdout(_) => "stdout",
Sink::File(_) => "file",
#[cfg(feature = "nats")]
Sink::Nats(_) => "nats",
Sink::FanOut(_) => "fanout",
}
}
}
pub fn spawn_source<S: EventSource>(
mut source: S,
event_tx: tokio::sync::mpsc::Sender<RawEvent>,
metrics: Option<Arc<dyn MetricsHook>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(raw_event) = source.recv().await {
if let Some(ref m) = metrics {
match event_tx.try_send(raw_event) {
Ok(()) => {
m.on_input_queue_depth_change(1);
}
Err(tokio::sync::mpsc::error::TrySendError::Full(raw_event)) => {
m.on_back_pressure();
m.on_input_queue_depth_change(1);
tracing::warn!("Input channel full, backpressure applied");
if event_tx.send(raw_event).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
} else if event_tx.send(raw_event).await.is_err() {
tracing::debug!("Event channel closed, source shutting down");
break;
}
}
})
}