hyperi_rustlib/transport/
pipe.rs1use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use super::work_batch::WorkBatch;
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct PipeToken {
44 pub seq: u64,
46}
47
48impl CommitToken for PipeToken {}
49
50impl std::fmt::Display for PipeToken {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "pipe:{}", self.seq)
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PipeTransportConfig {
59 #[serde(default = "default_recv_timeout_ms")]
61 pub recv_timeout_ms: u64,
62
63 #[serde(default)]
65 pub filters_in: Vec<super::filter::FilterRule>,
66
67 #[serde(default)]
69 pub filters_out: Vec<super::filter::FilterRule>,
70}
71
72fn default_recv_timeout_ms() -> u64 {
73 100
74}
75
76impl Default for PipeTransportConfig {
77 fn default() -> Self {
78 Self {
79 recv_timeout_ms: default_recv_timeout_ms(),
80 filters_in: Vec::new(),
81 filters_out: Vec::new(),
82 }
83 }
84}
85
86impl PipeTransportConfig {
87 #[must_use]
89 pub fn from_cascade() -> Self {
90 <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
91 }
92}
93
94pub struct PipeTransport {
100 stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
101 stdout: tokio::sync::Mutex<tokio::io::Stdout>,
102 sequence: AtomicU64,
103 closed: Arc<AtomicBool>,
104 recv_timeout_ms: u64,
105 filter_engine: super::filter::TransportFilterEngine,
106}
107
108impl PipeTransport {
109 #[must_use]
111 pub fn new(config: &PipeTransportConfig) -> Self {
112 #[cfg(feature = "logger")]
113 tracing::info!(
114 recv_timeout_ms = config.recv_timeout_ms,
115 "Pipe transport opened"
116 );
117
118 let filter_engine = super::filter::TransportFilterEngine::new(
119 &config.filters_in,
120 &config.filters_out,
121 &crate::transport::filter::TransportFilterTierConfig::default(),
122 )
123 .unwrap_or_else(|e| {
124 tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
125 super::filter::TransportFilterEngine::empty()
126 });
127
128 let closed = Arc::new(AtomicBool::new(false));
129
130 #[cfg(feature = "health")]
131 {
132 let h = Arc::clone(&closed);
133 crate::health::HealthRegistry::register("transport:pipe", move || {
134 if h.load(Ordering::Relaxed) {
135 crate::health::HealthStatus::Unhealthy
136 } else {
137 crate::health::HealthStatus::Healthy
138 }
139 });
140 }
141
142 Self {
143 stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
144 stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
145 sequence: AtomicU64::new(0),
146 closed,
147 recv_timeout_ms: config.recv_timeout_ms,
148 filter_engine,
149 }
150 }
151}
152
153impl TransportBase for PipeTransport {
154 async fn close(&self) -> TransportResult<()> {
155 self.closed.store(true, Ordering::Relaxed);
156
157 let mut stdout = self.stdout.lock().await;
159 stdout
160 .flush()
161 .await
162 .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
163
164 Ok(())
165 }
166
167 fn is_healthy(&self) -> bool {
168 !self.closed.load(Ordering::Relaxed)
169 }
170
171 fn name(&self) -> &'static str {
172 "pipe"
173 }
174}
175
176impl TransportSender for PipeTransport {
177 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
178 if self.closed.load(Ordering::Relaxed) {
179 return SendResult::Fatal(TransportError::Closed);
180 }
181
182 if self.filter_engine.has_outbound_filters() {
184 match self.filter_engine.apply_outbound(&payload) {
185 super::filter::FilterDisposition::Pass => {}
186 super::filter::FilterDisposition::Drop => return SendResult::Ok,
187 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
188 }
189 }
190
191 let mut stdout = self.stdout.lock().await;
192
193 if let Err(e) = stdout.write_all(&payload).await {
195 return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
196 }
197 if let Err(e) = stdout.write_all(b"\n").await {
198 return SendResult::Fatal(TransportError::Send(format!(
199 "stdout newline write failed: {e}"
200 )));
201 }
202 if let Err(e) = stdout.flush().await {
203 return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
204 }
205
206 #[cfg(feature = "logger")]
207 tracing::debug!(
208 bytes = payload.len(),
209 "Pipe transport: message sent to stdout"
210 );
211
212 #[cfg(feature = "metrics")]
213 metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
214
215 SendResult::Ok
216 }
217}
218
219impl TransportReceiver for PipeTransport {
220 type Token = PipeToken;
221
222 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
223 if self.closed.load(Ordering::Relaxed) {
224 return Err(TransportError::Closed);
225 }
226
227 let mut stdin = self.stdin.lock().await;
228 let mut messages = Vec::with_capacity(max.min(100));
229 let mut line_buf = String::new();
230
231 for _ in 0..max {
232 line_buf.clear();
233
234 let read_result = if self.recv_timeout_ms == 0 {
235 stdin.read_line(&mut line_buf).await
237 } else if messages.is_empty() {
238 match tokio::time::timeout(
240 std::time::Duration::from_millis(self.recv_timeout_ms),
241 stdin.read_line(&mut line_buf),
242 )
243 .await
244 {
245 Ok(result) => result,
246 Err(_) => break, }
248 } else {
249 match tokio::time::timeout(
251 std::time::Duration::from_millis(1),
252 stdin.read_line(&mut line_buf),
253 )
254 .await
255 {
256 Ok(result) => result,
257 Err(_) => break, }
259 };
260
261 match read_result {
262 Ok(0) => {
263 if messages.is_empty() {
265 return Err(TransportError::Closed);
266 }
267 break;
268 }
269 Ok(_) => {
270 let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
272 if payload.is_empty() {
273 continue;
274 }
275
276 let payload_bytes: bytes::Bytes = payload.as_bytes().to_vec().into();
277 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
278 let format = PayloadFormat::detect(&payload_bytes);
279 let timestamp_ms = chrono::Utc::now().timestamp_millis();
280
281 messages.push(Message {
282 key: None,
283 payload: payload_bytes,
284 token: PipeToken { seq },
285 timestamp_ms: Some(timestamp_ms),
286 format,
287 });
288
289 #[cfg(feature = "metrics")]
290 metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
291 .increment(1);
292 }
293 Err(e) => {
294 return Err(TransportError::Recv(format!("stdin read failed: {e}")));
295 }
296 }
297 }
298
299 let batch =
302 self.filter_engine
303 .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
304 let messages = batch.messages;
305 let dlq_entries = batch.dlq_entries;
306
307 #[cfg(feature = "logger")]
308 if !messages.is_empty() {
309 tracing::debug!(
310 lines = messages.len(),
311 "Pipe transport: batch received from stdin"
312 );
313 }
314
315 Ok(RecvBatch {
316 messages,
317 dlq_entries,
318 }
319 .into())
320 }
321
322 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
323 Ok(())
325 }
326}
327
328impl super::traits::FromCascade for PipeTransportConfig {}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn token_display() {
336 let token = PipeToken { seq: 42 };
337 assert_eq!(token.to_string(), "pipe:42");
338 }
339
340 #[test]
341 fn token_as_str() {
342 let token = PipeToken { seq: 7 };
343 assert_eq!(token.as_str(), "pipe:7");
344 }
345
346 #[test]
347 fn token_clone() {
348 let token = PipeToken { seq: 99 };
349 let cloned = token;
350 assert_eq!(token, cloned);
351 }
352
353 #[test]
354 fn config_defaults() {
355 let config = PipeTransportConfig::default();
356 assert_eq!(config.recv_timeout_ms, 100);
357 }
358
359 #[test]
360 fn config_serde_roundtrip() {
361 let config = PipeTransportConfig {
362 recv_timeout_ms: 500,
363 ..Default::default()
364 };
365 let json = serde_json::to_string(&config).unwrap();
366 let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
367 assert_eq!(parsed.recv_timeout_ms, 500);
368 }
369
370 #[test]
371 fn config_serde_default_fields() {
372 let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
374 assert_eq!(parsed.recv_timeout_ms, 100);
375 }
376
377 #[tokio::test]
378 async fn new_transport_is_healthy() {
379 let config = PipeTransportConfig::default();
380 let transport = PipeTransport::new(&config);
381 assert!(transport.is_healthy());
382 assert_eq!(transport.name(), "pipe");
383 }
384
385 #[tokio::test]
386 async fn close_marks_unhealthy() {
387 let config = PipeTransportConfig::default();
388 let transport = PipeTransport::new(&config);
389
390 transport.close().await.unwrap();
391 assert!(!transport.is_healthy());
392 }
393
394 #[tokio::test]
395 async fn send_after_close_returns_fatal() {
396 let config = PipeTransportConfig::default();
397 let transport = PipeTransport::new(&config);
398
399 transport.close().await.unwrap();
400 let result = transport
401 .send("key", bytes::Bytes::from_static(b"data"))
402 .await;
403 assert!(result.is_fatal());
404 }
405
406 #[tokio::test]
407 async fn recv_after_close_returns_error() {
408 let config = PipeTransportConfig::default();
409 let transport = PipeTransport::new(&config);
410
411 transport.close().await.unwrap();
412 let result = transport.recv(1).await;
413 assert!(result.is_err());
414 }
415
416 #[tokio::test]
417 async fn commit_is_noop() {
418 let config = PipeTransportConfig::default();
419 let transport = PipeTransport::new(&config);
420
421 let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
422 let result = transport.commit(&tokens).await;
423 assert!(result.is_ok());
424 }
425}