Skip to main content

hyperi_rustlib/transport/
pipe.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/pipe.rs
3// Purpose:   Unix pipe transport (stdin/stdout)
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Pipe Transport
10//!
11//! Reads from stdin and writes to stdout for Unix pipeline composition.
12//! Newline-delimited: each line is one message.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{PipeTransport, PipeTransportConfig};
18//!
19//! let config = PipeTransportConfig::default();
20//! let transport = PipeTransport::new(&config);
21//!
22//! // Send writes payload + newline to stdout
23//! transport.send("ignored", b"hello world").await;
24//!
25//! // Recv reads lines from stdin
26//! let messages = transport.recv(10).await?;
27//! ```
28
29use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
36
37/// Commit token for pipe transport.
38///
39/// Contains a monotonic sequence number. Commit is a no-op
40/// because stdin is a forward-only stream.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct PipeToken {
43    /// Message sequence number.
44    pub seq: u64,
45}
46
47impl CommitToken for PipeToken {}
48
49impl std::fmt::Display for PipeToken {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "pipe:{}", self.seq)
52    }
53}
54
55/// Configuration for pipe transport.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PipeTransportConfig {
58    /// Receive timeout in milliseconds (0 = block until data). Default: 100.
59    #[serde(default = "default_recv_timeout_ms")]
60    pub recv_timeout_ms: u64,
61
62    /// Inbound message filters (applied on recv before caller sees messages).
63    #[serde(default)]
64    pub filters_in: Vec<super::filter::FilterRule>,
65
66    /// Outbound message filters (applied on send before transport dispatches).
67    #[serde(default)]
68    pub filters_out: Vec<super::filter::FilterRule>,
69}
70
71fn default_recv_timeout_ms() -> u64 {
72    100
73}
74
75impl Default for PipeTransportConfig {
76    fn default() -> Self {
77        Self {
78            recv_timeout_ms: default_recv_timeout_ms(),
79            filters_in: Vec::new(),
80            filters_out: Vec::new(),
81        }
82    }
83}
84
85impl PipeTransportConfig {
86    /// Load from the config cascade under the `transport.pipe` key.
87    #[must_use]
88    pub fn from_cascade() -> Self {
89        #[cfg(feature = "config")]
90        {
91            if let Some(cfg) = crate::config::try_get()
92                && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.pipe")
93            {
94                return tc;
95            }
96        }
97        Self::default()
98    }
99}
100
101/// Unix pipe transport (stdin/stdout).
102///
103/// Send writes newline-delimited payloads to stdout.
104/// Receive reads lines from stdin, each becoming a message.
105/// Commit is a no-op (stdin cannot be rewound).
106pub struct PipeTransport {
107    stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
108    stdout: tokio::sync::Mutex<tokio::io::Stdout>,
109    sequence: AtomicU64,
110    closed: Arc<AtomicBool>,
111    recv_timeout_ms: u64,
112    filter_engine: super::filter::TransportFilterEngine,
113    /// Buffer for messages staged to DLQ by inbound filters.
114    /// Drained by `take_filtered_dlq_entries()`.
115    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
116}
117
118impl PipeTransport {
119    /// Create a new pipe transport.
120    #[must_use]
121    pub fn new(config: &PipeTransportConfig) -> Self {
122        #[cfg(feature = "logger")]
123        tracing::info!(
124            recv_timeout_ms = config.recv_timeout_ms,
125            "Pipe transport opened"
126        );
127
128        let filter_engine = super::filter::TransportFilterEngine::new(
129            &config.filters_in,
130            &config.filters_out,
131            &crate::transport::filter::TransportFilterTierConfig::default(),
132        )
133        .unwrap_or_else(|e| {
134            tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
135            super::filter::TransportFilterEngine::empty()
136        });
137
138        let closed = Arc::new(AtomicBool::new(false));
139
140        #[cfg(feature = "health")]
141        {
142            let h = Arc::clone(&closed);
143            crate::health::HealthRegistry::register("transport:pipe", move || {
144                if h.load(Ordering::Relaxed) {
145                    crate::health::HealthStatus::Unhealthy
146                } else {
147                    crate::health::HealthStatus::Healthy
148                }
149            });
150        }
151
152        Self {
153            stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
154            stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
155            sequence: AtomicU64::new(0),
156            closed,
157            recv_timeout_ms: config.recv_timeout_ms,
158            filter_engine,
159            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
160        }
161    }
162}
163
164impl TransportBase for PipeTransport {
165    async fn close(&self) -> TransportResult<()> {
166        self.closed.store(true, Ordering::Relaxed);
167
168        // Flush stdout before closing
169        let mut stdout = self.stdout.lock().await;
170        stdout
171            .flush()
172            .await
173            .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
174
175        Ok(())
176    }
177
178    fn is_healthy(&self) -> bool {
179        !self.closed.load(Ordering::Relaxed)
180    }
181
182    fn name(&self) -> &'static str {
183        "pipe"
184    }
185}
186
187impl TransportSender for PipeTransport {
188    async fn send(&self, _key: &str, payload: &[u8]) -> SendResult {
189        if self.closed.load(Ordering::Relaxed) {
190            return SendResult::Fatal(TransportError::Closed);
191        }
192
193        // Outbound filter check
194        if self.filter_engine.has_outbound_filters() {
195            match self.filter_engine.apply_outbound(payload) {
196                super::filter::FilterDisposition::Pass => {}
197                super::filter::FilterDisposition::Drop => return SendResult::Ok,
198                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
199            }
200        }
201
202        let mut stdout = self.stdout.lock().await;
203
204        // Write payload + newline
205        if let Err(e) = stdout.write_all(payload).await {
206            return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
207        }
208        if let Err(e) = stdout.write_all(b"\n").await {
209            return SendResult::Fatal(TransportError::Send(format!(
210                "stdout newline write failed: {e}"
211            )));
212        }
213        if let Err(e) = stdout.flush().await {
214            return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
215        }
216
217        #[cfg(feature = "logger")]
218        tracing::debug!(
219            bytes = payload.len(),
220            "Pipe transport: message sent to stdout"
221        );
222
223        #[cfg(feature = "metrics")]
224        metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
225
226        SendResult::Ok
227    }
228}
229
230impl TransportReceiver for PipeTransport {
231    type Token = PipeToken;
232
233    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
234        if self.closed.load(Ordering::Relaxed) {
235            return Err(TransportError::Closed);
236        }
237
238        let mut stdin = self.stdin.lock().await;
239        let mut messages = Vec::with_capacity(max.min(100));
240        let mut line_buf = String::new();
241
242        for _ in 0..max {
243            line_buf.clear();
244
245            let read_result = if self.recv_timeout_ms == 0 {
246                // Block until data arrives
247                stdin.read_line(&mut line_buf).await
248            } else if messages.is_empty() {
249                // First message: wait up to timeout
250                match tokio::time::timeout(
251                    std::time::Duration::from_millis(self.recv_timeout_ms),
252                    stdin.read_line(&mut line_buf),
253                )
254                .await
255                {
256                    Ok(result) => result,
257                    Err(_) => break, // Timeout, return what we have (empty)
258                }
259            } else {
260                // Subsequent messages: non-blocking attempt via short timeout
261                match tokio::time::timeout(
262                    std::time::Duration::from_millis(1),
263                    stdin.read_line(&mut line_buf),
264                )
265                .await
266                {
267                    Ok(result) => result,
268                    Err(_) => break, // No more data ready
269                }
270            };
271
272            match read_result {
273                Ok(0) => {
274                    // EOF on stdin
275                    if messages.is_empty() {
276                        return Err(TransportError::Closed);
277                    }
278                    break;
279                }
280                Ok(_) => {
281                    // Strip trailing newline
282                    let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
283                    if payload.is_empty() {
284                        continue;
285                    }
286
287                    let payload_bytes = payload.as_bytes().to_vec();
288                    let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
289                    let format = PayloadFormat::detect(&payload_bytes);
290                    let timestamp_ms = chrono::Utc::now().timestamp_millis();
291
292                    messages.push(Message {
293                        key: None,
294                        payload: payload_bytes,
295                        token: PipeToken { seq },
296                        timestamp_ms: Some(timestamp_ms),
297                        format,
298                    });
299
300                    #[cfg(feature = "metrics")]
301                    metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
302                        .increment(1);
303                }
304                Err(e) => {
305                    return Err(TransportError::Recv(format!("stdin read failed: {e}")));
306                }
307            }
308        }
309
310        // Apply inbound filters: drop messages, stage DLQ entries
311        if self.filter_engine.has_inbound_filters() {
312            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
313            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
314                super::filter::FilterDisposition::Pass => true,
315                super::filter::FilterDisposition::Drop => false,
316                super::filter::FilterDisposition::Dlq => {
317                    staged_dlq.push(super::filter::FilteredDlqEntry {
318                        payload: msg.payload.clone(),
319                        key: msg.key.clone(),
320                        reason: "transport filter".to_string(),
321                    });
322                    false
323                }
324            });
325            if !staged_dlq.is_empty() {
326                self.filtered_dlq_buffer.lock().extend(staged_dlq);
327            }
328        }
329
330        #[cfg(feature = "logger")]
331        if !messages.is_empty() {
332            tracing::debug!(
333                lines = messages.len(),
334                "Pipe transport: batch received from stdin"
335            );
336        }
337
338        Ok(messages)
339    }
340
341    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
342        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
343    }
344
345    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
346        // No-op: stdin is a forward-only stream, cannot rewind or acknowledge
347        Ok(())
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn token_display() {
357        let token = PipeToken { seq: 42 };
358        assert_eq!(token.to_string(), "pipe:42");
359    }
360
361    #[test]
362    fn token_as_str() {
363        let token = PipeToken { seq: 7 };
364        assert_eq!(token.as_str(), "pipe:7");
365    }
366
367    #[test]
368    fn token_clone() {
369        let token = PipeToken { seq: 99 };
370        let cloned = token;
371        assert_eq!(token, cloned);
372    }
373
374    #[test]
375    fn config_defaults() {
376        let config = PipeTransportConfig::default();
377        assert_eq!(config.recv_timeout_ms, 100);
378    }
379
380    #[test]
381    fn config_serde_roundtrip() {
382        let config = PipeTransportConfig {
383            recv_timeout_ms: 500,
384            ..Default::default()
385        };
386        let json = serde_json::to_string(&config).unwrap();
387        let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
388        assert_eq!(parsed.recv_timeout_ms, 500);
389    }
390
391    #[test]
392    fn config_serde_default_fields() {
393        // Empty JSON should use defaults
394        let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
395        assert_eq!(parsed.recv_timeout_ms, 100);
396    }
397
398    #[tokio::test]
399    async fn new_transport_is_healthy() {
400        let config = PipeTransportConfig::default();
401        let transport = PipeTransport::new(&config);
402        assert!(transport.is_healthy());
403        assert_eq!(transport.name(), "pipe");
404    }
405
406    #[tokio::test]
407    async fn close_marks_unhealthy() {
408        let config = PipeTransportConfig::default();
409        let transport = PipeTransport::new(&config);
410
411        transport.close().await.unwrap();
412        assert!(!transport.is_healthy());
413    }
414
415    #[tokio::test]
416    async fn send_after_close_returns_fatal() {
417        let config = PipeTransportConfig::default();
418        let transport = PipeTransport::new(&config);
419
420        transport.close().await.unwrap();
421        let result = transport.send("key", b"data").await;
422        assert!(result.is_fatal());
423    }
424
425    #[tokio::test]
426    async fn recv_after_close_returns_error() {
427        let config = PipeTransportConfig::default();
428        let transport = PipeTransport::new(&config);
429
430        transport.close().await.unwrap();
431        let result = transport.recv(1).await;
432        assert!(result.is_err());
433    }
434
435    #[tokio::test]
436    async fn commit_is_noop() {
437        let config = PipeTransportConfig::default();
438        let transport = PipeTransport::new(&config);
439
440        let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
441        let result = transport.commit(&tokens).await;
442        assert!(result.is_ok());
443    }
444}