Skip to main content

hyperi_rustlib/transport/
pipe.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/pipe.rs
3// Purpose:   Unix pipe transport (stdin/stdout)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Pipe Transport
10//!
11//! Reads from stdin and writes to stdout for Unix pipeline composition.
12//! Newline-delimited: each line is one message.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{PipeTransport, PipeTransportConfig};
18//!
19//! let config = PipeTransportConfig::default();
20//! let transport = PipeTransport::new(&config);
21//!
22//! // Send writes payload + newline to stdout
23//! transport.send("ignored", bytes::Bytes::from_static(b"hello world")).await;
24//!
25//! // Recv reads lines from stdin
26//! let records = transport.recv(10).await?.records;
27//! ```
28
29use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use super::work_batch::WorkBatch;
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
37
38/// Commit token for pipe transport.
39///
40/// Contains a monotonic sequence number. Commit is a no-op
41/// because stdin is a forward-only stream.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct PipeToken {
44    /// Message sequence number.
45    pub seq: u64,
46}
47
48impl CommitToken for PipeToken {}
49
50impl std::fmt::Display for PipeToken {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "pipe:{}", self.seq)
53    }
54}
55
56/// Configuration for pipe transport.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PipeTransportConfig {
59    /// Receive timeout in milliseconds (0 = block until data). Default: 100.
60    #[serde(default = "default_recv_timeout_ms")]
61    pub recv_timeout_ms: u64,
62
63    /// Inbound message filters (applied on recv before caller sees messages).
64    #[serde(default)]
65    pub filters_in: Vec<super::filter::FilterRule>,
66
67    /// Outbound message filters (applied on send before transport dispatches).
68    #[serde(default)]
69    pub filters_out: Vec<super::filter::FilterRule>,
70}
71
72fn default_recv_timeout_ms() -> u64 {
73    100
74}
75
76impl Default for PipeTransportConfig {
77    fn default() -> Self {
78        Self {
79            recv_timeout_ms: default_recv_timeout_ms(),
80            filters_in: Vec::new(),
81            filters_out: Vec::new(),
82        }
83    }
84}
85
86impl PipeTransportConfig {
87    /// Load from the config cascade under the `transport.pipe` key.
88    #[must_use]
89    pub fn from_cascade() -> Self {
90        <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
91    }
92}
93
94/// Unix pipe transport (stdin/stdout).
95///
96/// Send writes newline-delimited payloads to stdout.
97/// Receive reads lines from stdin, each becoming a message.
98/// Commit is a no-op (stdin cannot be rewound).
99pub struct PipeTransport {
100    stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
101    stdout: tokio::sync::Mutex<tokio::io::Stdout>,
102    sequence: AtomicU64,
103    closed: Arc<AtomicBool>,
104    recv_timeout_ms: u64,
105    filter_engine: super::filter::TransportFilterEngine,
106}
107
108impl PipeTransport {
109    /// Create a new pipe transport.
110    #[must_use]
111    pub fn new(config: &PipeTransportConfig) -> Self {
112        #[cfg(feature = "logger")]
113        tracing::info!(
114            recv_timeout_ms = config.recv_timeout_ms,
115            "Pipe transport opened"
116        );
117
118        let filter_engine = super::filter::TransportFilterEngine::new(
119            &config.filters_in,
120            &config.filters_out,
121            &crate::transport::filter::TransportFilterTierConfig::default(),
122        )
123        .unwrap_or_else(|e| {
124            tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
125            super::filter::TransportFilterEngine::empty()
126        });
127
128        let closed = Arc::new(AtomicBool::new(false));
129
130        #[cfg(feature = "health")]
131        {
132            let h = Arc::clone(&closed);
133            crate::health::HealthRegistry::register("transport:pipe", move || {
134                if h.load(Ordering::Relaxed) {
135                    crate::health::HealthStatus::Unhealthy
136                } else {
137                    crate::health::HealthStatus::Healthy
138                }
139            });
140        }
141
142        Self {
143            stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
144            stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
145            sequence: AtomicU64::new(0),
146            closed,
147            recv_timeout_ms: config.recv_timeout_ms,
148            filter_engine,
149        }
150    }
151}
152
153impl TransportBase for PipeTransport {
154    async fn close(&self) -> TransportResult<()> {
155        self.closed.store(true, Ordering::Relaxed);
156
157        // Flush stdout before closing
158        let mut stdout = self.stdout.lock().await;
159        stdout
160            .flush()
161            .await
162            .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
163
164        Ok(())
165    }
166
167    fn is_healthy(&self) -> bool {
168        !self.closed.load(Ordering::Relaxed)
169    }
170
171    fn name(&self) -> &'static str {
172        "pipe"
173    }
174}
175
176impl TransportSender for PipeTransport {
177    async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
178        if self.closed.load(Ordering::Relaxed) {
179            return SendResult::Fatal(TransportError::Closed);
180        }
181
182        // Outbound filter check
183        if self.filter_engine.has_outbound_filters() {
184            match self.filter_engine.apply_outbound(&payload) {
185                super::filter::FilterDisposition::Pass => {}
186                super::filter::FilterDisposition::Drop => return SendResult::Ok,
187                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
188            }
189        }
190
191        let mut stdout = self.stdout.lock().await;
192
193        // Write payload + newline
194        if let Err(e) = stdout.write_all(&payload).await {
195            return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
196        }
197        if let Err(e) = stdout.write_all(b"\n").await {
198            return SendResult::Fatal(TransportError::Send(format!(
199                "stdout newline write failed: {e}"
200            )));
201        }
202        if let Err(e) = stdout.flush().await {
203            return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
204        }
205
206        #[cfg(feature = "logger")]
207        tracing::debug!(
208            bytes = payload.len(),
209            "Pipe transport: message sent to stdout"
210        );
211
212        #[cfg(feature = "metrics")]
213        metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
214
215        SendResult::Ok
216    }
217}
218
219impl TransportReceiver for PipeTransport {
220    type Token = PipeToken;
221
222    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
223        if self.closed.load(Ordering::Relaxed) {
224            return Err(TransportError::Closed);
225        }
226
227        let mut stdin = self.stdin.lock().await;
228        let mut messages = Vec::with_capacity(max.min(100));
229        let mut line_buf = String::new();
230
231        for _ in 0..max {
232            line_buf.clear();
233
234            let read_result = if self.recv_timeout_ms == 0 {
235                // Block until data arrives
236                stdin.read_line(&mut line_buf).await
237            } else if messages.is_empty() {
238                // First message: wait up to timeout
239                match tokio::time::timeout(
240                    std::time::Duration::from_millis(self.recv_timeout_ms),
241                    stdin.read_line(&mut line_buf),
242                )
243                .await
244                {
245                    Ok(result) => result,
246                    Err(_) => break, // Timeout, return what we have (empty)
247                }
248            } else {
249                // Subsequent messages: non-blocking attempt via short timeout
250                match tokio::time::timeout(
251                    std::time::Duration::from_millis(1),
252                    stdin.read_line(&mut line_buf),
253                )
254                .await
255                {
256                    Ok(result) => result,
257                    Err(_) => break, // No more data ready
258                }
259            };
260
261            match read_result {
262                Ok(0) => {
263                    // EOF on stdin
264                    if messages.is_empty() {
265                        return Err(TransportError::Closed);
266                    }
267                    break;
268                }
269                Ok(_) => {
270                    // Strip trailing newline
271                    let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
272                    if payload.is_empty() {
273                        continue;
274                    }
275
276                    let payload_bytes: bytes::Bytes = payload.as_bytes().to_vec().into();
277                    let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
278                    let format = PayloadFormat::detect(&payload_bytes);
279                    let timestamp_ms = chrono::Utc::now().timestamp_millis();
280
281                    messages.push(Message {
282                        key: None,
283                        payload: payload_bytes,
284                        token: PipeToken { seq },
285                        timestamp_ms: Some(timestamp_ms),
286                        format,
287                    });
288
289                    #[cfg(feature = "metrics")]
290                    metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
291                        .increment(1);
292                }
293                Err(e) => {
294                    return Err(TransportError::Recv(format!("stdin read failed: {e}")));
295                }
296            }
297        }
298
299        // Apply inbound filters via the shared partition helper; DLQ entries
300        // are returned in the RecvBatch for the caller to route onward.
301        let batch = self.filter_engine.partition_batch(
302            messages,
303            |m| m.payload.as_ref(),
304            |m| m.key.clone(),
305            |m| m.token,
306        );
307        let messages = batch.messages;
308        let dlq_entries = batch.dlq_entries;
309        let filtered_tokens = batch.filtered_tokens;
310
311        #[cfg(feature = "logger")]
312        if !messages.is_empty() {
313            tracing::debug!(
314                lines = messages.len(),
315                "Pipe transport: batch received from stdin"
316            );
317        }
318
319        Ok(RecvBatch {
320            messages,
321            dlq_entries,
322            filtered_tokens,
323        }
324        .into())
325    }
326
327    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
328        // No-op: stdin is a forward-only stream, cannot rewind or acknowledge
329        Ok(())
330    }
331}
332
333impl super::traits::FromCascade for PipeTransportConfig {}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn token_display() {
341        let token = PipeToken { seq: 42 };
342        assert_eq!(token.to_string(), "pipe:42");
343    }
344
345    #[test]
346    fn token_as_str() {
347        let token = PipeToken { seq: 7 };
348        assert_eq!(token.as_str(), "pipe:7");
349    }
350
351    #[test]
352    fn token_clone() {
353        let token = PipeToken { seq: 99 };
354        let cloned = token;
355        assert_eq!(token, cloned);
356    }
357
358    #[test]
359    fn config_defaults() {
360        let config = PipeTransportConfig::default();
361        assert_eq!(config.recv_timeout_ms, 100);
362    }
363
364    #[test]
365    fn config_serde_roundtrip() {
366        let config = PipeTransportConfig {
367            recv_timeout_ms: 500,
368            ..Default::default()
369        };
370        let json = serde_json::to_string(&config).unwrap();
371        let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
372        assert_eq!(parsed.recv_timeout_ms, 500);
373    }
374
375    #[test]
376    fn config_serde_default_fields() {
377        // Empty JSON should use defaults
378        let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
379        assert_eq!(parsed.recv_timeout_ms, 100);
380    }
381
382    #[tokio::test]
383    async fn new_transport_is_healthy() {
384        let config = PipeTransportConfig::default();
385        let transport = PipeTransport::new(&config);
386        assert!(transport.is_healthy());
387        assert_eq!(transport.name(), "pipe");
388    }
389
390    #[tokio::test]
391    async fn close_marks_unhealthy() {
392        let config = PipeTransportConfig::default();
393        let transport = PipeTransport::new(&config);
394
395        transport.close().await.unwrap();
396        assert!(!transport.is_healthy());
397    }
398
399    #[tokio::test]
400    async fn send_after_close_returns_fatal() {
401        let config = PipeTransportConfig::default();
402        let transport = PipeTransport::new(&config);
403
404        transport.close().await.unwrap();
405        let result = transport
406            .send("key", bytes::Bytes::from_static(b"data"))
407            .await;
408        assert!(result.is_fatal());
409    }
410
411    #[tokio::test]
412    async fn recv_after_close_returns_error() {
413        let config = PipeTransportConfig::default();
414        let transport = PipeTransport::new(&config);
415
416        transport.close().await.unwrap();
417        let result = transport.recv(1).await;
418        assert!(result.is_err());
419    }
420
421    #[tokio::test]
422    async fn commit_is_noop() {
423        let config = PipeTransportConfig::default();
424        let transport = PipeTransport::new(&config);
425
426        let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
427        let result = transport.commit(&tokens).await;
428        assert!(result.is_ok());
429    }
430}