Skip to main content

hyperi_rustlib/transport/
pipe.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/pipe.rs
3// Purpose:   Unix pipe transport (stdin/stdout)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Pipe Transport
10//!
11//! Reads from stdin and writes to stdout for Unix pipeline composition.
12//! Newline-delimited: each line is one message.
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use hyperi_rustlib::transport::{PipeTransport, PipeTransportConfig};
18//!
19//! let config = PipeTransportConfig::default();
20//! let transport = PipeTransport::new(&config);
21//!
22//! // Send writes payload + newline to stdout
23//! transport.send("ignored", bytes::Bytes::from_static(b"hello world")).await;
24//!
25//! // Recv reads lines from stdin
26//! let records = transport.recv(10).await?.records;
27//! ```
28
29use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use super::work_batch::WorkBatch;
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
37
38/// Commit token for pipe transport.
39///
40/// Contains a monotonic sequence number. Commit is a no-op
41/// because stdin is a forward-only stream.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct PipeToken {
44    /// Message sequence number.
45    pub seq: u64,
46}
47
48impl CommitToken for PipeToken {}
49
50impl std::fmt::Display for PipeToken {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "pipe:{}", self.seq)
53    }
54}
55
56/// Configuration for pipe transport.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PipeTransportConfig {
59    /// Receive timeout in milliseconds (0 = block until data). Default: 100.
60    #[serde(default = "default_recv_timeout_ms")]
61    pub recv_timeout_ms: u64,
62
63    /// Inbound message filters (applied on recv before caller sees messages).
64    #[serde(default)]
65    pub filters_in: Vec<super::filter::FilterRule>,
66
67    /// Outbound message filters (applied on send before transport dispatches).
68    #[serde(default)]
69    pub filters_out: Vec<super::filter::FilterRule>,
70}
71
72fn default_recv_timeout_ms() -> u64 {
73    100
74}
75
76impl Default for PipeTransportConfig {
77    fn default() -> Self {
78        Self {
79            recv_timeout_ms: default_recv_timeout_ms(),
80            filters_in: Vec::new(),
81            filters_out: Vec::new(),
82        }
83    }
84}
85
86impl PipeTransportConfig {
87    /// Load from the config cascade under the `transport.pipe` key.
88    #[must_use]
89    pub fn from_cascade() -> Self {
90        <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
91    }
92}
93
94/// Unix pipe transport (stdin/stdout).
95///
96/// Send writes newline-delimited payloads to stdout.
97/// Receive reads lines from stdin, each becoming a message.
98/// Commit is a no-op (stdin cannot be rewound).
99pub struct PipeTransport {
100    stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
101    stdout: tokio::sync::Mutex<tokio::io::Stdout>,
102    sequence: AtomicU64,
103    closed: Arc<AtomicBool>,
104    recv_timeout_ms: u64,
105    filter_engine: super::filter::TransportFilterEngine,
106}
107
108impl PipeTransport {
109    /// Create a new pipe transport.
110    #[must_use]
111    pub fn new(config: &PipeTransportConfig) -> Self {
112        #[cfg(feature = "logger")]
113        tracing::info!(
114            recv_timeout_ms = config.recv_timeout_ms,
115            "Pipe transport opened"
116        );
117
118        let filter_engine = super::filter::TransportFilterEngine::new(
119            &config.filters_in,
120            &config.filters_out,
121            &crate::transport::filter::TransportFilterTierConfig::default(),
122        )
123        .unwrap_or_else(|e| {
124            tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
125            super::filter::TransportFilterEngine::empty()
126        });
127
128        let closed = Arc::new(AtomicBool::new(false));
129
130        #[cfg(feature = "health")]
131        {
132            let h = Arc::clone(&closed);
133            crate::health::HealthRegistry::register("transport:pipe", move || {
134                if h.load(Ordering::Relaxed) {
135                    crate::health::HealthStatus::Unhealthy
136                } else {
137                    crate::health::HealthStatus::Healthy
138                }
139            });
140        }
141
142        Self {
143            stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
144            stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
145            sequence: AtomicU64::new(0),
146            closed,
147            recv_timeout_ms: config.recv_timeout_ms,
148            filter_engine,
149        }
150    }
151}
152
153impl TransportBase for PipeTransport {
154    async fn close(&self) -> TransportResult<()> {
155        self.closed.store(true, Ordering::Relaxed);
156
157        // Flush stdout before closing
158        let mut stdout = self.stdout.lock().await;
159        stdout
160            .flush()
161            .await
162            .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
163
164        Ok(())
165    }
166
167    fn is_healthy(&self) -> bool {
168        !self.closed.load(Ordering::Relaxed)
169    }
170
171    fn name(&self) -> &'static str {
172        "pipe"
173    }
174}
175
176impl TransportSender for PipeTransport {
177    async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
178        if self.closed.load(Ordering::Relaxed) {
179            return SendResult::Fatal(TransportError::Closed);
180        }
181
182        // Outbound filter check
183        if self.filter_engine.has_outbound_filters() {
184            match self.filter_engine.apply_outbound(&payload) {
185                super::filter::FilterDisposition::Pass => {}
186                super::filter::FilterDisposition::Drop => return SendResult::Ok,
187                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
188            }
189        }
190
191        let mut stdout = self.stdout.lock().await;
192
193        // Write payload + newline
194        if let Err(e) = stdout.write_all(&payload).await {
195            return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
196        }
197        if let Err(e) = stdout.write_all(b"\n").await {
198            return SendResult::Fatal(TransportError::Send(format!(
199                "stdout newline write failed: {e}"
200            )));
201        }
202        if let Err(e) = stdout.flush().await {
203            return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
204        }
205
206        #[cfg(feature = "logger")]
207        tracing::debug!(
208            bytes = payload.len(),
209            "Pipe transport: message sent to stdout"
210        );
211
212        #[cfg(feature = "metrics")]
213        metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
214
215        SendResult::Ok
216    }
217}
218
219impl TransportReceiver for PipeTransport {
220    type Token = PipeToken;
221
222    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
223        if self.closed.load(Ordering::Relaxed) {
224            return Err(TransportError::Closed);
225        }
226
227        let mut stdin = self.stdin.lock().await;
228        let mut messages = Vec::with_capacity(max.min(100));
229        let mut line_buf = String::new();
230
231        for _ in 0..max {
232            line_buf.clear();
233
234            let read_result = if self.recv_timeout_ms == 0 {
235                // Block until data arrives
236                stdin.read_line(&mut line_buf).await
237            } else if messages.is_empty() {
238                // First message: wait up to timeout
239                match tokio::time::timeout(
240                    std::time::Duration::from_millis(self.recv_timeout_ms),
241                    stdin.read_line(&mut line_buf),
242                )
243                .await
244                {
245                    Ok(result) => result,
246                    Err(_) => break, // Timeout, return what we have (empty)
247                }
248            } else {
249                // Subsequent messages: non-blocking attempt via short timeout
250                match tokio::time::timeout(
251                    std::time::Duration::from_millis(1),
252                    stdin.read_line(&mut line_buf),
253                )
254                .await
255                {
256                    Ok(result) => result,
257                    Err(_) => break, // No more data ready
258                }
259            };
260
261            match read_result {
262                Ok(0) => {
263                    // EOF on stdin
264                    if messages.is_empty() {
265                        return Err(TransportError::Closed);
266                    }
267                    break;
268                }
269                Ok(_) => {
270                    // Strip trailing newline
271                    let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
272                    if payload.is_empty() {
273                        continue;
274                    }
275
276                    let payload_bytes: bytes::Bytes = payload.as_bytes().to_vec().into();
277                    let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
278                    let format = PayloadFormat::detect(&payload_bytes);
279                    let timestamp_ms = chrono::Utc::now().timestamp_millis();
280
281                    messages.push(Message {
282                        key: None,
283                        payload: payload_bytes,
284                        token: PipeToken { seq },
285                        timestamp_ms: Some(timestamp_ms),
286                        format,
287                    });
288
289                    #[cfg(feature = "metrics")]
290                    metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
291                        .increment(1);
292                }
293                Err(e) => {
294                    return Err(TransportError::Recv(format!("stdin read failed: {e}")));
295                }
296            }
297        }
298
299        // Apply inbound filters via the shared partition helper; DLQ entries
300        // are returned in the RecvBatch for the caller to route onward.
301        let batch =
302            self.filter_engine
303                .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
304        let messages = batch.messages;
305        let dlq_entries = batch.dlq_entries;
306
307        #[cfg(feature = "logger")]
308        if !messages.is_empty() {
309            tracing::debug!(
310                lines = messages.len(),
311                "Pipe transport: batch received from stdin"
312            );
313        }
314
315        Ok(RecvBatch {
316            messages,
317            dlq_entries,
318        }
319        .into())
320    }
321
322    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
323        // No-op: stdin is a forward-only stream, cannot rewind or acknowledge
324        Ok(())
325    }
326}
327
328impl super::traits::FromCascade for PipeTransportConfig {}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn token_display() {
336        let token = PipeToken { seq: 42 };
337        assert_eq!(token.to_string(), "pipe:42");
338    }
339
340    #[test]
341    fn token_as_str() {
342        let token = PipeToken { seq: 7 };
343        assert_eq!(token.as_str(), "pipe:7");
344    }
345
346    #[test]
347    fn token_clone() {
348        let token = PipeToken { seq: 99 };
349        let cloned = token;
350        assert_eq!(token, cloned);
351    }
352
353    #[test]
354    fn config_defaults() {
355        let config = PipeTransportConfig::default();
356        assert_eq!(config.recv_timeout_ms, 100);
357    }
358
359    #[test]
360    fn config_serde_roundtrip() {
361        let config = PipeTransportConfig {
362            recv_timeout_ms: 500,
363            ..Default::default()
364        };
365        let json = serde_json::to_string(&config).unwrap();
366        let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
367        assert_eq!(parsed.recv_timeout_ms, 500);
368    }
369
370    #[test]
371    fn config_serde_default_fields() {
372        // Empty JSON should use defaults
373        let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
374        assert_eq!(parsed.recv_timeout_ms, 100);
375    }
376
377    #[tokio::test]
378    async fn new_transport_is_healthy() {
379        let config = PipeTransportConfig::default();
380        let transport = PipeTransport::new(&config);
381        assert!(transport.is_healthy());
382        assert_eq!(transport.name(), "pipe");
383    }
384
385    #[tokio::test]
386    async fn close_marks_unhealthy() {
387        let config = PipeTransportConfig::default();
388        let transport = PipeTransport::new(&config);
389
390        transport.close().await.unwrap();
391        assert!(!transport.is_healthy());
392    }
393
394    #[tokio::test]
395    async fn send_after_close_returns_fatal() {
396        let config = PipeTransportConfig::default();
397        let transport = PipeTransport::new(&config);
398
399        transport.close().await.unwrap();
400        let result = transport
401            .send("key", bytes::Bytes::from_static(b"data"))
402            .await;
403        assert!(result.is_fatal());
404    }
405
406    #[tokio::test]
407    async fn recv_after_close_returns_error() {
408        let config = PipeTransportConfig::default();
409        let transport = PipeTransport::new(&config);
410
411        transport.close().await.unwrap();
412        let result = transport.recv(1).await;
413        assert!(result.is_err());
414    }
415
416    #[tokio::test]
417    async fn commit_is_noop() {
418        let config = PipeTransportConfig::default();
419        let transport = PipeTransport::new(&config);
420
421        let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
422        let result = transport.commit(&tokens).await;
423        assert!(result.is_ok());
424    }
425}