use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::common::message::Message;
use crate::common::types::{Audit, Event, Severity};
use crate::config::SystemConfig;
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::source::Source;
use crate::source::system::{AUDIT_SOURCE_ID, DLQ_SOURCE_ID, EVENT_SOURCE_ID};
use super::PipelineMetrics;
const MAX_CHAIN_DEPTH: u8 = 8;
fn should_route_to_dlq(msg: &Message) -> bool {
msg.meta.source_node != DLQ_SOURCE_ID && msg.meta.chain_depth < MAX_CHAIN_DEPTH
}
fn bump_chain_depth(msg: &Message) -> Message {
let mut updated = msg.clone();
updated.meta.chain_depth = updated.meta.chain_depth.saturating_add(1);
updated
}
#[derive(Clone, Debug)]
pub struct SystemChannels {
pub dlq: mpsc::Sender<Message>,
pub event: mpsc::Sender<Event>,
pub audit: mpsc::Sender<Audit>,
}
pub async fn route_error_to_system(
channels: &SystemChannels,
metrics: &PipelineMetrics,
node_id: &str,
node_type: &str,
error: &str,
msg: &Message,
) {
let event = Event::new(
format!("{}_error", node_type),
serde_json::json!({
"node_id": node_id,
"node_type": node_type,
"error": error,
"message_id": msg.meta.id.to_string(),
}),
);
let _ = channels.event.send(event).await;
if should_route_to_dlq(msg) {
let _ = channels.dlq.send(bump_chain_depth(msg)).await;
metrics.record_dlq();
}
}
pub async fn route_overflow_to_system(
channels: &SystemChannels,
node_id: &str,
node_type: &str,
dropped_count: u64,
) {
let event = Event::new(
"buffer_overflow",
serde_json::json!({
"node_id": node_id,
"node_type": node_type,
"dropped_count": dropped_count,
}),
)
.with_severity(Severity::Critical);
let _ = channels.event.send(event).await;
}
pub fn build_system_source(
source_id: &str,
pending_system_receivers: &mut HashMap<String, Box<dyn std::any::Any + Send>>,
) -> Result<Arc<dyn Source>> {
use crate::source::system::{AuditSource, DlqSource, EventSource, is_valid_system_source_id};
use tokio::sync::mpsc::Receiver;
if !is_valid_system_source_id(source_id) {
return Err(Error::config(format!(
"Invalid system source ID '{}'",
source_id
)));
}
let receiver = pending_system_receivers.remove(source_id).ok_or_else(|| {
Error::config(format!(
"System receiver for '{}' not found. This may indicate a duplicate source ID.",
source_id
))
})?;
match source_id {
DLQ_SOURCE_ID => {
let rx = receiver
.downcast::<Receiver<Message>>()
.map_err(|_| Error::config("Failed to downcast DLQ receiver"))?;
Ok(Arc::new(DlqSource::new(*rx)) as Arc<dyn Source>)
}
EVENT_SOURCE_ID => {
let rx = receiver
.downcast::<Receiver<Event>>()
.map_err(|_| Error::config("Failed to downcast Event receiver"))?;
Ok(Arc::new(EventSource::new(*rx)) as Arc<dyn Source>)
}
AUDIT_SOURCE_ID => {
let rx = receiver
.downcast::<Receiver<Audit>>()
.map_err(|_| Error::config("Failed to downcast Audit receiver"))?;
Ok(Arc::new(AuditSource::new(*rx)) as Arc<dyn Source>)
}
_ => Err(Error::config(format!(
"Unknown system source ID: {}",
source_id
))),
}
}
pub async fn build_system_sink(
sink_id: &str,
system_config: &SystemConfig,
) -> Result<Arc<dyn Sink>> {
use crate::sink::system::{SystemSink, is_valid_system_sink_id};
if !is_valid_system_sink_id(sink_id) {
return Err(Error::config(format!(
"Invalid system sink ID '{}'",
sink_id
)));
}
let sink = SystemSink::new(sink_id, Some(&system_config.sinks)).await?;
Ok(Arc::new(sink) as Arc<dyn Sink>)
}