Skip to main content

hyperi_rustlib/transport/
pipe.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/pipe.rs
3// Purpose:   Unix pipe transport (stdin/stdout)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Pipe Transport
10//!
11//! Reads from stdin and writes to stdout for Unix pipeline composition.
12//! Newline-delimited: each line is one message.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{PipeTransport, PipeTransportConfig};
18//!
19//! let config = PipeTransportConfig::default();
20//! let transport = PipeTransport::new(&config);
21//!
22//! // Send writes payload + newline to stdout
23//! transport.send("ignored", bytes::Bytes::from_static(b"hello world")).await;
24//!
25//! // Recv reads lines from stdin
26//! let messages = transport.recv(10).await?.messages;
27//! ```
28
29use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
36
37/// Commit token for pipe transport.
38///
39/// Contains a monotonic sequence number. Commit is a no-op
40/// because stdin is a forward-only stream.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct PipeToken {
43    /// Message sequence number.
44    pub seq: u64,
45}
46
47impl CommitToken for PipeToken {}
48
49impl std::fmt::Display for PipeToken {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "pipe:{}", self.seq)
52    }
53}
54
55/// Configuration for pipe transport.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PipeTransportConfig {
58    /// Receive timeout in milliseconds (0 = block until data). Default: 100.
59    #[serde(default = "default_recv_timeout_ms")]
60    pub recv_timeout_ms: u64,
61
62    /// Inbound message filters (applied on recv before caller sees messages).
63    #[serde(default)]
64    pub filters_in: Vec<super::filter::FilterRule>,
65
66    /// Outbound message filters (applied on send before transport dispatches).
67    #[serde(default)]
68    pub filters_out: Vec<super::filter::FilterRule>,
69}
70
71fn default_recv_timeout_ms() -> u64 {
72    100
73}
74
75impl Default for PipeTransportConfig {
76    fn default() -> Self {
77        Self {
78            recv_timeout_ms: default_recv_timeout_ms(),
79            filters_in: Vec::new(),
80            filters_out: Vec::new(),
81        }
82    }
83}
84
85impl PipeTransportConfig {
86    /// Load from the config cascade under the `transport.pipe` key.
87    #[must_use]
88    pub fn from_cascade() -> Self {
89        <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
90    }
91}
92
93/// Unix pipe transport (stdin/stdout).
94///
95/// Send writes newline-delimited payloads to stdout.
96/// Receive reads lines from stdin, each becoming a message.
97/// Commit is a no-op (stdin cannot be rewound).
98pub struct PipeTransport {
99    stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
100    stdout: tokio::sync::Mutex<tokio::io::Stdout>,
101    sequence: AtomicU64,
102    closed: Arc<AtomicBool>,
103    recv_timeout_ms: u64,
104    filter_engine: super::filter::TransportFilterEngine,
105}
106
107impl PipeTransport {
108    /// Create a new pipe transport.
109    #[must_use]
110    pub fn new(config: &PipeTransportConfig) -> Self {
111        #[cfg(feature = "logger")]
112        tracing::info!(
113            recv_timeout_ms = config.recv_timeout_ms,
114            "Pipe transport opened"
115        );
116
117        let filter_engine = super::filter::TransportFilterEngine::new(
118            &config.filters_in,
119            &config.filters_out,
120            &crate::transport::filter::TransportFilterTierConfig::default(),
121        )
122        .unwrap_or_else(|e| {
123            tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
124            super::filter::TransportFilterEngine::empty()
125        });
126
127        let closed = Arc::new(AtomicBool::new(false));
128
129        #[cfg(feature = "health")]
130        {
131            let h = Arc::clone(&closed);
132            crate::health::HealthRegistry::register("transport:pipe", move || {
133                if h.load(Ordering::Relaxed) {
134                    crate::health::HealthStatus::Unhealthy
135                } else {
136                    crate::health::HealthStatus::Healthy
137                }
138            });
139        }
140
141        Self {
142            stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
143            stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
144            sequence: AtomicU64::new(0),
145            closed,
146            recv_timeout_ms: config.recv_timeout_ms,
147            filter_engine,
148        }
149    }
150}
151
152impl TransportBase for PipeTransport {
153    async fn close(&self) -> TransportResult<()> {
154        self.closed.store(true, Ordering::Relaxed);
155
156        // Flush stdout before closing
157        let mut stdout = self.stdout.lock().await;
158        stdout
159            .flush()
160            .await
161            .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
162
163        Ok(())
164    }
165
166    fn is_healthy(&self) -> bool {
167        !self.closed.load(Ordering::Relaxed)
168    }
169
170    fn name(&self) -> &'static str {
171        "pipe"
172    }
173}
174
175impl TransportSender for PipeTransport {
176    async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
177        if self.closed.load(Ordering::Relaxed) {
178            return SendResult::Fatal(TransportError::Closed);
179        }
180
181        // Outbound filter check
182        if self.filter_engine.has_outbound_filters() {
183            match self.filter_engine.apply_outbound(&payload) {
184                super::filter::FilterDisposition::Pass => {}
185                super::filter::FilterDisposition::Drop => return SendResult::Ok,
186                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
187            }
188        }
189
190        let mut stdout = self.stdout.lock().await;
191
192        // Write payload + newline
193        if let Err(e) = stdout.write_all(&payload).await {
194            return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
195        }
196        if let Err(e) = stdout.write_all(b"\n").await {
197            return SendResult::Fatal(TransportError::Send(format!(
198                "stdout newline write failed: {e}"
199            )));
200        }
201        if let Err(e) = stdout.flush().await {
202            return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
203        }
204
205        #[cfg(feature = "logger")]
206        tracing::debug!(
207            bytes = payload.len(),
208            "Pipe transport: message sent to stdout"
209        );
210
211        #[cfg(feature = "metrics")]
212        metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
213
214        SendResult::Ok
215    }
216}
217
218impl TransportReceiver for PipeTransport {
219    type Token = PipeToken;
220
221    async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
222        if self.closed.load(Ordering::Relaxed) {
223            return Err(TransportError::Closed);
224        }
225
226        let mut stdin = self.stdin.lock().await;
227        let mut messages = Vec::with_capacity(max.min(100));
228        let mut line_buf = String::new();
229
230        for _ in 0..max {
231            line_buf.clear();
232
233            let read_result = if self.recv_timeout_ms == 0 {
234                // Block until data arrives
235                stdin.read_line(&mut line_buf).await
236            } else if messages.is_empty() {
237                // First message: wait up to timeout
238                match tokio::time::timeout(
239                    std::time::Duration::from_millis(self.recv_timeout_ms),
240                    stdin.read_line(&mut line_buf),
241                )
242                .await
243                {
244                    Ok(result) => result,
245                    Err(_) => break, // Timeout, return what we have (empty)
246                }
247            } else {
248                // Subsequent messages: non-blocking attempt via short timeout
249                match tokio::time::timeout(
250                    std::time::Duration::from_millis(1),
251                    stdin.read_line(&mut line_buf),
252                )
253                .await
254                {
255                    Ok(result) => result,
256                    Err(_) => break, // No more data ready
257                }
258            };
259
260            match read_result {
261                Ok(0) => {
262                    // EOF on stdin
263                    if messages.is_empty() {
264                        return Err(TransportError::Closed);
265                    }
266                    break;
267                }
268                Ok(_) => {
269                    // Strip trailing newline
270                    let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
271                    if payload.is_empty() {
272                        continue;
273                    }
274
275                    let payload_bytes = payload.as_bytes().to_vec();
276                    let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
277                    let format = PayloadFormat::detect(&payload_bytes);
278                    let timestamp_ms = chrono::Utc::now().timestamp_millis();
279
280                    messages.push(Message {
281                        key: None,
282                        payload: payload_bytes,
283                        token: PipeToken { seq },
284                        timestamp_ms: Some(timestamp_ms),
285                        format,
286                    });
287
288                    #[cfg(feature = "metrics")]
289                    metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
290                        .increment(1);
291                }
292                Err(e) => {
293                    return Err(TransportError::Recv(format!("stdin read failed: {e}")));
294                }
295            }
296        }
297
298        // Apply inbound filters via the shared partition helper; DLQ entries
299        // are returned in the RecvBatch for the caller to route onward.
300        let batch = self.filter_engine.partition_batch(
301            messages,
302            |m| m.payload.as_slice(),
303            |m| m.key.clone(),
304        );
305        let messages = batch.messages;
306        let dlq_entries = batch.dlq_entries;
307
308        #[cfg(feature = "logger")]
309        if !messages.is_empty() {
310            tracing::debug!(
311                lines = messages.len(),
312                "Pipe transport: batch received from stdin"
313            );
314        }
315
316        Ok(RecvBatch {
317            messages,
318            dlq_entries,
319        })
320    }
321
322    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
323        // No-op: stdin is a forward-only stream, cannot rewind or acknowledge
324        Ok(())
325    }
326}
327
328impl super::traits::FromCascade for PipeTransportConfig {}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn token_display() {
336        let token = PipeToken { seq: 42 };
337        assert_eq!(token.to_string(), "pipe:42");
338    }
339
340    #[test]
341    fn token_as_str() {
342        let token = PipeToken { seq: 7 };
343        assert_eq!(token.as_str(), "pipe:7");
344    }
345
346    #[test]
347    fn token_clone() {
348        let token = PipeToken { seq: 99 };
349        let cloned = token;
350        assert_eq!(token, cloned);
351    }
352
353    #[test]
354    fn config_defaults() {
355        let config = PipeTransportConfig::default();
356        assert_eq!(config.recv_timeout_ms, 100);
357    }
358
359    #[test]
360    fn config_serde_roundtrip() {
361        let config = PipeTransportConfig {
362            recv_timeout_ms: 500,
363            ..Default::default()
364        };
365        let json = serde_json::to_string(&config).unwrap();
366        let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
367        assert_eq!(parsed.recv_timeout_ms, 500);
368    }
369
370    #[test]
371    fn config_serde_default_fields() {
372        // Empty JSON should use defaults
373        let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
374        assert_eq!(parsed.recv_timeout_ms, 100);
375    }
376
377    #[tokio::test]
378    async fn new_transport_is_healthy() {
379        let config = PipeTransportConfig::default();
380        let transport = PipeTransport::new(&config);
381        assert!(transport.is_healthy());
382        assert_eq!(transport.name(), "pipe");
383    }
384
385    #[tokio::test]
386    async fn close_marks_unhealthy() {
387        let config = PipeTransportConfig::default();
388        let transport = PipeTransport::new(&config);
389
390        transport.close().await.unwrap();
391        assert!(!transport.is_healthy());
392    }
393
394    #[tokio::test]
395    async fn send_after_close_returns_fatal() {
396        let config = PipeTransportConfig::default();
397        let transport = PipeTransport::new(&config);
398
399        transport.close().await.unwrap();
400        let result = transport
401            .send("key", bytes::Bytes::from_static(b"data"))
402            .await;
403        assert!(result.is_fatal());
404    }
405
406    #[tokio::test]
407    async fn recv_after_close_returns_error() {
408        let config = PipeTransportConfig::default();
409        let transport = PipeTransport::new(&config);
410
411        transport.close().await.unwrap();
412        let result = transport.recv(1).await;
413        assert!(result.is_err());
414    }
415
416    #[tokio::test]
417    async fn commit_is_noop() {
418        let config = PipeTransportConfig::default();
419        let transport = PipeTransport::new(&config);
420
421        let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
422        let result = transport.commit(&tokens).await;
423        assert!(result.is_ok());
424    }
425}