use super::error::{TransportError, TransportResult};
use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
use super::types::{Message, PayloadFormat, SendResult};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PipeToken {
pub seq: u64,
}
impl CommitToken for PipeToken {}
impl std::fmt::Display for PipeToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "pipe:{}", self.seq)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipeTransportConfig {
#[serde(default = "default_recv_timeout_ms")]
pub recv_timeout_ms: u64,
#[serde(default)]
pub filters_in: Vec<super::filter::FilterRule>,
#[serde(default)]
pub filters_out: Vec<super::filter::FilterRule>,
}
fn default_recv_timeout_ms() -> u64 {
100
}
impl Default for PipeTransportConfig {
fn default() -> Self {
Self {
recv_timeout_ms: default_recv_timeout_ms(),
filters_in: Vec::new(),
filters_out: Vec::new(),
}
}
}
impl PipeTransportConfig {
#[must_use]
pub fn from_cascade() -> Self {
#[cfg(feature = "config")]
{
if let Some(cfg) = crate::config::try_get()
&& let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.pipe")
{
return tc;
}
}
Self::default()
}
}
pub struct PipeTransport {
stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
stdout: tokio::sync::Mutex<tokio::io::Stdout>,
sequence: AtomicU64,
closed: Arc<AtomicBool>,
recv_timeout_ms: u64,
filter_engine: super::filter::TransportFilterEngine,
filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
}
impl PipeTransport {
#[must_use]
pub fn new(config: &PipeTransportConfig) -> Self {
#[cfg(feature = "logger")]
tracing::info!(
recv_timeout_ms = config.recv_timeout_ms,
"Pipe transport opened"
);
let filter_engine = super::filter::TransportFilterEngine::new(
&config.filters_in,
&config.filters_out,
&crate::transport::filter::TransportFilterTierConfig::default(),
)
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
super::filter::TransportFilterEngine::empty()
});
let closed = Arc::new(AtomicBool::new(false));
#[cfg(feature = "health")]
{
let h = Arc::clone(&closed);
crate::health::HealthRegistry::register("transport:pipe", move || {
if h.load(Ordering::Relaxed) {
crate::health::HealthStatus::Unhealthy
} else {
crate::health::HealthStatus::Healthy
}
});
}
Self {
stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
sequence: AtomicU64::new(0),
closed,
recv_timeout_ms: config.recv_timeout_ms,
filter_engine,
filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
}
}
}
impl TransportBase for PipeTransport {
async fn close(&self) -> TransportResult<()> {
self.closed.store(true, Ordering::Relaxed);
let mut stdout = self.stdout.lock().await;
stdout
.flush()
.await
.map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
Ok(())
}
fn is_healthy(&self) -> bool {
!self.closed.load(Ordering::Relaxed)
}
fn name(&self) -> &'static str {
"pipe"
}
}
impl TransportSender for PipeTransport {
async fn send(&self, _key: &str, payload: &[u8]) -> SendResult {
if self.closed.load(Ordering::Relaxed) {
return SendResult::Fatal(TransportError::Closed);
}
if self.filter_engine.has_outbound_filters() {
match self.filter_engine.apply_outbound(payload) {
super::filter::FilterDisposition::Pass => {}
super::filter::FilterDisposition::Drop => return SendResult::Ok,
super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
}
}
let mut stdout = self.stdout.lock().await;
if let Err(e) = stdout.write_all(payload).await {
return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
}
if let Err(e) = stdout.write_all(b"\n").await {
return SendResult::Fatal(TransportError::Send(format!(
"stdout newline write failed: {e}"
)));
}
if let Err(e) = stdout.flush().await {
return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
}
#[cfg(feature = "logger")]
tracing::debug!(
bytes = payload.len(),
"Pipe transport: message sent to stdout"
);
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
SendResult::Ok
}
}
impl TransportReceiver for PipeTransport {
type Token = PipeToken;
async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TransportError::Closed);
}
let mut stdin = self.stdin.lock().await;
let mut messages = Vec::with_capacity(max.min(100));
let mut line_buf = String::new();
for _ in 0..max {
line_buf.clear();
let read_result = if self.recv_timeout_ms == 0 {
stdin.read_line(&mut line_buf).await
} else if messages.is_empty() {
match tokio::time::timeout(
std::time::Duration::from_millis(self.recv_timeout_ms),
stdin.read_line(&mut line_buf),
)
.await
{
Ok(result) => result,
Err(_) => break, }
} else {
match tokio::time::timeout(
std::time::Duration::from_millis(1),
stdin.read_line(&mut line_buf),
)
.await
{
Ok(result) => result,
Err(_) => break, }
};
match read_result {
Ok(0) => {
if messages.is_empty() {
return Err(TransportError::Closed);
}
break;
}
Ok(_) => {
let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
if payload.is_empty() {
continue;
}
let payload_bytes = payload.as_bytes().to_vec();
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
let format = PayloadFormat::detect(&payload_bytes);
let timestamp_ms = chrono::Utc::now().timestamp_millis();
messages.push(Message {
key: None,
payload: payload_bytes,
token: PipeToken { seq },
timestamp_ms: Some(timestamp_ms),
format,
});
#[cfg(feature = "metrics")]
metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
.increment(1);
}
Err(e) => {
return Err(TransportError::Recv(format!("stdin read failed: {e}")));
}
}
}
if self.filter_engine.has_inbound_filters() {
let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
super::filter::FilterDisposition::Pass => true,
super::filter::FilterDisposition::Drop => false,
super::filter::FilterDisposition::Dlq => {
staged_dlq.push(super::filter::FilteredDlqEntry {
payload: msg.payload.clone(),
key: msg.key.clone(),
reason: "transport filter".to_string(),
});
false
}
});
if !staged_dlq.is_empty() {
self.filtered_dlq_buffer.lock().extend(staged_dlq);
}
}
#[cfg(feature = "logger")]
if !messages.is_empty() {
tracing::debug!(
lines = messages.len(),
"Pipe transport: batch received from stdin"
);
}
Ok(messages)
}
fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
std::mem::take(&mut *self.filtered_dlq_buffer.lock())
}
async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_display() {
let token = PipeToken { seq: 42 };
assert_eq!(token.to_string(), "pipe:42");
}
#[test]
fn token_as_str() {
let token = PipeToken { seq: 7 };
assert_eq!(token.as_str(), "pipe:7");
}
#[test]
fn token_clone() {
let token = PipeToken { seq: 99 };
let cloned = token;
assert_eq!(token, cloned);
}
#[test]
fn config_defaults() {
let config = PipeTransportConfig::default();
assert_eq!(config.recv_timeout_ms, 100);
}
#[test]
fn config_serde_roundtrip() {
let config = PipeTransportConfig {
recv_timeout_ms: 500,
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.recv_timeout_ms, 500);
}
#[test]
fn config_serde_default_fields() {
let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
assert_eq!(parsed.recv_timeout_ms, 100);
}
#[tokio::test]
async fn new_transport_is_healthy() {
let config = PipeTransportConfig::default();
let transport = PipeTransport::new(&config);
assert!(transport.is_healthy());
assert_eq!(transport.name(), "pipe");
}
#[tokio::test]
async fn close_marks_unhealthy() {
let config = PipeTransportConfig::default();
let transport = PipeTransport::new(&config);
transport.close().await.unwrap();
assert!(!transport.is_healthy());
}
#[tokio::test]
async fn send_after_close_returns_fatal() {
let config = PipeTransportConfig::default();
let transport = PipeTransport::new(&config);
transport.close().await.unwrap();
let result = transport.send("key", b"data").await;
assert!(result.is_fatal());
}
#[tokio::test]
async fn recv_after_close_returns_error() {
let config = PipeTransportConfig::default();
let transport = PipeTransport::new(&config);
transport.close().await.unwrap();
let result = transport.recv(1).await;
assert!(result.is_err());
}
#[tokio::test]
async fn commit_is_noop() {
let config = PipeTransportConfig::default();
let transport = PipeTransport::new(&config);
let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
let result = transport.commit(&tokens).await;
assert!(result.is_ok());
}
}