hyperi_rustlib/transport/
pipe.rs1use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use super::work_batch::WorkBatch;
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct PipeToken {
44 pub seq: u64,
46}
47
48impl CommitToken for PipeToken {}
49
50impl std::fmt::Display for PipeToken {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "pipe:{}", self.seq)
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PipeTransportConfig {
59 #[serde(default = "default_recv_timeout_ms")]
61 pub recv_timeout_ms: u64,
62
63 #[serde(default)]
65 pub filters_in: Vec<super::filter::FilterRule>,
66
67 #[serde(default)]
69 pub filters_out: Vec<super::filter::FilterRule>,
70}
71
72fn default_recv_timeout_ms() -> u64 {
73 100
74}
75
76impl Default for PipeTransportConfig {
77 fn default() -> Self {
78 Self {
79 recv_timeout_ms: default_recv_timeout_ms(),
80 filters_in: Vec::new(),
81 filters_out: Vec::new(),
82 }
83 }
84}
85
86impl PipeTransportConfig {
87 #[must_use]
89 pub fn from_cascade() -> Self {
90 <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
91 }
92}
93
94pub struct PipeTransport {
100 stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
101 stdout: tokio::sync::Mutex<tokio::io::Stdout>,
102 sequence: AtomicU64,
103 closed: Arc<AtomicBool>,
104 recv_timeout_ms: u64,
105 filter_engine: super::filter::TransportFilterEngine,
106}
107
108impl PipeTransport {
109 #[must_use]
111 pub fn new(config: &PipeTransportConfig) -> Self {
112 #[cfg(feature = "logger")]
113 tracing::info!(
114 recv_timeout_ms = config.recv_timeout_ms,
115 "Pipe transport opened"
116 );
117
118 let filter_engine = super::filter::TransportFilterEngine::new(
119 &config.filters_in,
120 &config.filters_out,
121 &crate::transport::filter::TransportFilterTierConfig::default(),
122 )
123 .unwrap_or_else(|e| {
124 tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
125 super::filter::TransportFilterEngine::empty()
126 });
127
128 let closed = Arc::new(AtomicBool::new(false));
129
130 #[cfg(feature = "health")]
131 {
132 let h = Arc::clone(&closed);
133 crate::health::HealthRegistry::register("transport:pipe", move || {
134 if h.load(Ordering::Relaxed) {
135 crate::health::HealthStatus::Unhealthy
136 } else {
137 crate::health::HealthStatus::Healthy
138 }
139 });
140 }
141
142 Self {
143 stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
144 stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
145 sequence: AtomicU64::new(0),
146 closed,
147 recv_timeout_ms: config.recv_timeout_ms,
148 filter_engine,
149 }
150 }
151}
152
153impl TransportBase for PipeTransport {
154 async fn close(&self) -> TransportResult<()> {
155 self.closed.store(true, Ordering::Relaxed);
156
157 let mut stdout = self.stdout.lock().await;
159 stdout
160 .flush()
161 .await
162 .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
163
164 Ok(())
165 }
166
167 fn is_healthy(&self) -> bool {
168 !self.closed.load(Ordering::Relaxed)
169 }
170
171 fn name(&self) -> &'static str {
172 "pipe"
173 }
174}
175
176impl TransportSender for PipeTransport {
177 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
178 if self.closed.load(Ordering::Relaxed) {
179 return SendResult::Fatal(TransportError::Closed);
180 }
181
182 if self.filter_engine.has_outbound_filters() {
184 match self.filter_engine.apply_outbound(&payload) {
185 super::filter::FilterDisposition::Pass => {}
186 super::filter::FilterDisposition::Drop => return SendResult::Ok,
187 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
188 }
189 }
190
191 let mut stdout = self.stdout.lock().await;
192
193 if let Err(e) = stdout.write_all(&payload).await {
195 return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
196 }
197 if let Err(e) = stdout.write_all(b"\n").await {
198 return SendResult::Fatal(TransportError::Send(format!(
199 "stdout newline write failed: {e}"
200 )));
201 }
202 if let Err(e) = stdout.flush().await {
203 return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
204 }
205
206 #[cfg(feature = "logger")]
207 tracing::debug!(
208 bytes = payload.len(),
209 "Pipe transport: message sent to stdout"
210 );
211
212 #[cfg(feature = "metrics")]
213 metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
214
215 SendResult::Ok
216 }
217}
218
219impl TransportReceiver for PipeTransport {
220 type Token = PipeToken;
221
222 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
223 if self.closed.load(Ordering::Relaxed) {
224 return Err(TransportError::Closed);
225 }
226
227 let mut stdin = self.stdin.lock().await;
228 let mut messages = Vec::with_capacity(max.min(100));
229 let mut line_buf = String::new();
230
231 for _ in 0..max {
232 line_buf.clear();
233
234 let read_result = if self.recv_timeout_ms == 0 {
235 stdin.read_line(&mut line_buf).await
237 } else if messages.is_empty() {
238 match tokio::time::timeout(
240 std::time::Duration::from_millis(self.recv_timeout_ms),
241 stdin.read_line(&mut line_buf),
242 )
243 .await
244 {
245 Ok(result) => result,
246 Err(_) => break, }
248 } else {
249 match tokio::time::timeout(
251 std::time::Duration::from_millis(1),
252 stdin.read_line(&mut line_buf),
253 )
254 .await
255 {
256 Ok(result) => result,
257 Err(_) => break, }
259 };
260
261 match read_result {
262 Ok(0) => {
263 if messages.is_empty() {
265 return Err(TransportError::Closed);
266 }
267 break;
268 }
269 Ok(_) => {
270 let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
272 if payload.is_empty() {
273 continue;
274 }
275
276 let payload_bytes: bytes::Bytes = payload.as_bytes().to_vec().into();
277 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
278 let format = PayloadFormat::detect(&payload_bytes);
279 let timestamp_ms = chrono::Utc::now().timestamp_millis();
280
281 messages.push(Message {
282 key: None,
283 payload: payload_bytes,
284 token: PipeToken { seq },
285 timestamp_ms: Some(timestamp_ms),
286 format,
287 });
288
289 #[cfg(feature = "metrics")]
290 metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
291 .increment(1);
292 }
293 Err(e) => {
294 return Err(TransportError::Recv(format!("stdin read failed: {e}")));
295 }
296 }
297 }
298
299 let batch = self.filter_engine.partition_batch(
302 messages,
303 |m| m.payload.as_ref(),
304 |m| m.key.clone(),
305 |m| m.token,
306 );
307 let messages = batch.messages;
308 let dlq_entries = batch.dlq_entries;
309 let filtered_tokens = batch.filtered_tokens;
310
311 #[cfg(feature = "logger")]
312 if !messages.is_empty() {
313 tracing::debug!(
314 lines = messages.len(),
315 "Pipe transport: batch received from stdin"
316 );
317 }
318
319 Ok(RecvBatch {
320 messages,
321 dlq_entries,
322 filtered_tokens,
323 }
324 .into())
325 }
326
327 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
328 Ok(())
330 }
331}
332
333impl super::traits::FromCascade for PipeTransportConfig {}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn token_display() {
341 let token = PipeToken { seq: 42 };
342 assert_eq!(token.to_string(), "pipe:42");
343 }
344
345 #[test]
346 fn token_as_str() {
347 let token = PipeToken { seq: 7 };
348 assert_eq!(token.as_str(), "pipe:7");
349 }
350
351 #[test]
352 fn token_clone() {
353 let token = PipeToken { seq: 99 };
354 let cloned = token;
355 assert_eq!(token, cloned);
356 }
357
358 #[test]
359 fn config_defaults() {
360 let config = PipeTransportConfig::default();
361 assert_eq!(config.recv_timeout_ms, 100);
362 }
363
364 #[test]
365 fn config_serde_roundtrip() {
366 let config = PipeTransportConfig {
367 recv_timeout_ms: 500,
368 ..Default::default()
369 };
370 let json = serde_json::to_string(&config).unwrap();
371 let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
372 assert_eq!(parsed.recv_timeout_ms, 500);
373 }
374
375 #[test]
376 fn config_serde_default_fields() {
377 let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
379 assert_eq!(parsed.recv_timeout_ms, 100);
380 }
381
382 #[tokio::test]
383 async fn new_transport_is_healthy() {
384 let config = PipeTransportConfig::default();
385 let transport = PipeTransport::new(&config);
386 assert!(transport.is_healthy());
387 assert_eq!(transport.name(), "pipe");
388 }
389
390 #[tokio::test]
391 async fn close_marks_unhealthy() {
392 let config = PipeTransportConfig::default();
393 let transport = PipeTransport::new(&config);
394
395 transport.close().await.unwrap();
396 assert!(!transport.is_healthy());
397 }
398
399 #[tokio::test]
400 async fn send_after_close_returns_fatal() {
401 let config = PipeTransportConfig::default();
402 let transport = PipeTransport::new(&config);
403
404 transport.close().await.unwrap();
405 let result = transport
406 .send("key", bytes::Bytes::from_static(b"data"))
407 .await;
408 assert!(result.is_fatal());
409 }
410
411 #[tokio::test]
412 async fn recv_after_close_returns_error() {
413 let config = PipeTransportConfig::default();
414 let transport = PipeTransport::new(&config);
415
416 transport.close().await.unwrap();
417 let result = transport.recv(1).await;
418 assert!(result.is_err());
419 }
420
421 #[tokio::test]
422 async fn commit_is_noop() {
423 let config = PipeTransportConfig::default();
424 let transport = PipeTransport::new(&config);
425
426 let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
427 let result = transport.commit(&tokens).await;
428 assert!(result.is_ok());
429 }
430}