hyperi_rustlib/transport/
pipe.rs1use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct PipeToken {
43 pub seq: u64,
45}
46
47impl CommitToken for PipeToken {}
48
49impl std::fmt::Display for PipeToken {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "pipe:{}", self.seq)
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PipeTransportConfig {
58 #[serde(default = "default_recv_timeout_ms")]
60 pub recv_timeout_ms: u64,
61
62 #[serde(default)]
64 pub filters_in: Vec<super::filter::FilterRule>,
65
66 #[serde(default)]
68 pub filters_out: Vec<super::filter::FilterRule>,
69}
70
71fn default_recv_timeout_ms() -> u64 {
72 100
73}
74
75impl Default for PipeTransportConfig {
76 fn default() -> Self {
77 Self {
78 recv_timeout_ms: default_recv_timeout_ms(),
79 filters_in: Vec::new(),
80 filters_out: Vec::new(),
81 }
82 }
83}
84
85impl PipeTransportConfig {
86 #[must_use]
88 pub fn from_cascade() -> Self {
89 #[cfg(feature = "config")]
90 {
91 if let Some(cfg) = crate::config::try_get()
92 && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.pipe")
93 {
94 return tc;
95 }
96 }
97 Self::default()
98 }
99}
100
101pub struct PipeTransport {
107 stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
108 stdout: tokio::sync::Mutex<tokio::io::Stdout>,
109 sequence: AtomicU64,
110 closed: Arc<AtomicBool>,
111 recv_timeout_ms: u64,
112 filter_engine: super::filter::TransportFilterEngine,
113 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
116}
117
118impl PipeTransport {
119 #[must_use]
121 pub fn new(config: &PipeTransportConfig) -> Self {
122 #[cfg(feature = "logger")]
123 tracing::info!(
124 recv_timeout_ms = config.recv_timeout_ms,
125 "Pipe transport opened"
126 );
127
128 let filter_engine = super::filter::TransportFilterEngine::new(
129 &config.filters_in,
130 &config.filters_out,
131 &crate::transport::filter::TransportFilterTierConfig::default(),
132 )
133 .unwrap_or_else(|e| {
134 tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
135 super::filter::TransportFilterEngine::empty()
136 });
137
138 let closed = Arc::new(AtomicBool::new(false));
139
140 #[cfg(feature = "health")]
141 {
142 let h = Arc::clone(&closed);
143 crate::health::HealthRegistry::register("transport:pipe", move || {
144 if h.load(Ordering::Relaxed) {
145 crate::health::HealthStatus::Unhealthy
146 } else {
147 crate::health::HealthStatus::Healthy
148 }
149 });
150 }
151
152 Self {
153 stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
154 stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
155 sequence: AtomicU64::new(0),
156 closed,
157 recv_timeout_ms: config.recv_timeout_ms,
158 filter_engine,
159 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
160 }
161 }
162}
163
164impl TransportBase for PipeTransport {
165 async fn close(&self) -> TransportResult<()> {
166 self.closed.store(true, Ordering::Relaxed);
167
168 let mut stdout = self.stdout.lock().await;
170 stdout
171 .flush()
172 .await
173 .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
174
175 Ok(())
176 }
177
178 fn is_healthy(&self) -> bool {
179 !self.closed.load(Ordering::Relaxed)
180 }
181
182 fn name(&self) -> &'static str {
183 "pipe"
184 }
185}
186
187impl TransportSender for PipeTransport {
188 async fn send(&self, _key: &str, payload: &[u8]) -> SendResult {
189 if self.closed.load(Ordering::Relaxed) {
190 return SendResult::Fatal(TransportError::Closed);
191 }
192
193 if self.filter_engine.has_outbound_filters() {
195 match self.filter_engine.apply_outbound(payload) {
196 super::filter::FilterDisposition::Pass => {}
197 super::filter::FilterDisposition::Drop => return SendResult::Ok,
198 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
199 }
200 }
201
202 let mut stdout = self.stdout.lock().await;
203
204 if let Err(e) = stdout.write_all(payload).await {
206 return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
207 }
208 if let Err(e) = stdout.write_all(b"\n").await {
209 return SendResult::Fatal(TransportError::Send(format!(
210 "stdout newline write failed: {e}"
211 )));
212 }
213 if let Err(e) = stdout.flush().await {
214 return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
215 }
216
217 #[cfg(feature = "logger")]
218 tracing::debug!(
219 bytes = payload.len(),
220 "Pipe transport: message sent to stdout"
221 );
222
223 #[cfg(feature = "metrics")]
224 metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
225
226 SendResult::Ok
227 }
228}
229
230impl TransportReceiver for PipeTransport {
231 type Token = PipeToken;
232
233 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
234 if self.closed.load(Ordering::Relaxed) {
235 return Err(TransportError::Closed);
236 }
237
238 let mut stdin = self.stdin.lock().await;
239 let mut messages = Vec::with_capacity(max.min(100));
240 let mut line_buf = String::new();
241
242 for _ in 0..max {
243 line_buf.clear();
244
245 let read_result = if self.recv_timeout_ms == 0 {
246 stdin.read_line(&mut line_buf).await
248 } else if messages.is_empty() {
249 match tokio::time::timeout(
251 std::time::Duration::from_millis(self.recv_timeout_ms),
252 stdin.read_line(&mut line_buf),
253 )
254 .await
255 {
256 Ok(result) => result,
257 Err(_) => break, }
259 } else {
260 match tokio::time::timeout(
262 std::time::Duration::from_millis(1),
263 stdin.read_line(&mut line_buf),
264 )
265 .await
266 {
267 Ok(result) => result,
268 Err(_) => break, }
270 };
271
272 match read_result {
273 Ok(0) => {
274 if messages.is_empty() {
276 return Err(TransportError::Closed);
277 }
278 break;
279 }
280 Ok(_) => {
281 let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
283 if payload.is_empty() {
284 continue;
285 }
286
287 let payload_bytes = payload.as_bytes().to_vec();
288 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
289 let format = PayloadFormat::detect(&payload_bytes);
290 let timestamp_ms = chrono::Utc::now().timestamp_millis();
291
292 messages.push(Message {
293 key: None,
294 payload: payload_bytes,
295 token: PipeToken { seq },
296 timestamp_ms: Some(timestamp_ms),
297 format,
298 });
299
300 #[cfg(feature = "metrics")]
301 metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
302 .increment(1);
303 }
304 Err(e) => {
305 return Err(TransportError::Recv(format!("stdin read failed: {e}")));
306 }
307 }
308 }
309
310 if self.filter_engine.has_inbound_filters() {
312 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
313 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
314 super::filter::FilterDisposition::Pass => true,
315 super::filter::FilterDisposition::Drop => false,
316 super::filter::FilterDisposition::Dlq => {
317 staged_dlq.push(super::filter::FilteredDlqEntry {
318 payload: msg.payload.clone(),
319 key: msg.key.clone(),
320 reason: "transport filter".to_string(),
321 });
322 false
323 }
324 });
325 if !staged_dlq.is_empty() {
326 self.filtered_dlq_buffer.lock().extend(staged_dlq);
327 }
328 }
329
330 #[cfg(feature = "logger")]
331 if !messages.is_empty() {
332 tracing::debug!(
333 lines = messages.len(),
334 "Pipe transport: batch received from stdin"
335 );
336 }
337
338 Ok(messages)
339 }
340
341 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
342 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
343 }
344
345 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
346 Ok(())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn token_display() {
357 let token = PipeToken { seq: 42 };
358 assert_eq!(token.to_string(), "pipe:42");
359 }
360
361 #[test]
362 fn token_as_str() {
363 let token = PipeToken { seq: 7 };
364 assert_eq!(token.as_str(), "pipe:7");
365 }
366
367 #[test]
368 fn token_clone() {
369 let token = PipeToken { seq: 99 };
370 let cloned = token;
371 assert_eq!(token, cloned);
372 }
373
374 #[test]
375 fn config_defaults() {
376 let config = PipeTransportConfig::default();
377 assert_eq!(config.recv_timeout_ms, 100);
378 }
379
380 #[test]
381 fn config_serde_roundtrip() {
382 let config = PipeTransportConfig {
383 recv_timeout_ms: 500,
384 ..Default::default()
385 };
386 let json = serde_json::to_string(&config).unwrap();
387 let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
388 assert_eq!(parsed.recv_timeout_ms, 500);
389 }
390
391 #[test]
392 fn config_serde_default_fields() {
393 let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
395 assert_eq!(parsed.recv_timeout_ms, 100);
396 }
397
398 #[tokio::test]
399 async fn new_transport_is_healthy() {
400 let config = PipeTransportConfig::default();
401 let transport = PipeTransport::new(&config);
402 assert!(transport.is_healthy());
403 assert_eq!(transport.name(), "pipe");
404 }
405
406 #[tokio::test]
407 async fn close_marks_unhealthy() {
408 let config = PipeTransportConfig::default();
409 let transport = PipeTransport::new(&config);
410
411 transport.close().await.unwrap();
412 assert!(!transport.is_healthy());
413 }
414
415 #[tokio::test]
416 async fn send_after_close_returns_fatal() {
417 let config = PipeTransportConfig::default();
418 let transport = PipeTransport::new(&config);
419
420 transport.close().await.unwrap();
421 let result = transport.send("key", b"data").await;
422 assert!(result.is_fatal());
423 }
424
425 #[tokio::test]
426 async fn recv_after_close_returns_error() {
427 let config = PipeTransportConfig::default();
428 let transport = PipeTransport::new(&config);
429
430 transport.close().await.unwrap();
431 let result = transport.recv(1).await;
432 assert!(result.is_err());
433 }
434
435 #[tokio::test]
436 async fn commit_is_noop() {
437 let config = PipeTransportConfig::default();
438 let transport = PipeTransport::new(&config);
439
440 let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
441 let result = transport.commit(&tokens).await;
442 assert!(result.is_ok());
443 }
444}