hyperi_rustlib/transport/
pipe.rs1use super::error::{TransportError, TransportResult};
30use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
31use super::types::{Message, PayloadFormat, SendResult};
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct PipeToken {
43 pub seq: u64,
45}
46
47impl CommitToken for PipeToken {}
48
49impl std::fmt::Display for PipeToken {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "pipe:{}", self.seq)
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PipeTransportConfig {
58 #[serde(default = "default_recv_timeout_ms")]
60 pub recv_timeout_ms: u64,
61
62 #[serde(default)]
64 pub filters_in: Vec<super::filter::FilterRule>,
65
66 #[serde(default)]
68 pub filters_out: Vec<super::filter::FilterRule>,
69}
70
71fn default_recv_timeout_ms() -> u64 {
72 100
73}
74
75impl Default for PipeTransportConfig {
76 fn default() -> Self {
77 Self {
78 recv_timeout_ms: default_recv_timeout_ms(),
79 filters_in: Vec::new(),
80 filters_out: Vec::new(),
81 }
82 }
83}
84
85impl PipeTransportConfig {
86 #[must_use]
88 pub fn from_cascade() -> Self {
89 <Self as super::traits::FromCascade>::from_cascade_key("transport.pipe")
90 }
91}
92
93pub struct PipeTransport {
99 stdin: tokio::sync::Mutex<BufReader<tokio::io::Stdin>>,
100 stdout: tokio::sync::Mutex<tokio::io::Stdout>,
101 sequence: AtomicU64,
102 closed: Arc<AtomicBool>,
103 recv_timeout_ms: u64,
104 filter_engine: super::filter::TransportFilterEngine,
105}
106
107impl PipeTransport {
108 #[must_use]
110 pub fn new(config: &PipeTransportConfig) -> Self {
111 #[cfg(feature = "logger")]
112 tracing::info!(
113 recv_timeout_ms = config.recv_timeout_ms,
114 "Pipe transport opened"
115 );
116
117 let filter_engine = super::filter::TransportFilterEngine::new(
118 &config.filters_in,
119 &config.filters_out,
120 &crate::transport::filter::TransportFilterTierConfig::default(),
121 )
122 .unwrap_or_else(|e| {
123 tracing::warn!(error = %e, "Failed to compile transport filters, filtering disabled");
124 super::filter::TransportFilterEngine::empty()
125 });
126
127 let closed = Arc::new(AtomicBool::new(false));
128
129 #[cfg(feature = "health")]
130 {
131 let h = Arc::clone(&closed);
132 crate::health::HealthRegistry::register("transport:pipe", move || {
133 if h.load(Ordering::Relaxed) {
134 crate::health::HealthStatus::Unhealthy
135 } else {
136 crate::health::HealthStatus::Healthy
137 }
138 });
139 }
140
141 Self {
142 stdin: tokio::sync::Mutex::new(BufReader::new(tokio::io::stdin())),
143 stdout: tokio::sync::Mutex::new(tokio::io::stdout()),
144 sequence: AtomicU64::new(0),
145 closed,
146 recv_timeout_ms: config.recv_timeout_ms,
147 filter_engine,
148 }
149 }
150}
151
152impl TransportBase for PipeTransport {
153 async fn close(&self) -> TransportResult<()> {
154 self.closed.store(true, Ordering::Relaxed);
155
156 let mut stdout = self.stdout.lock().await;
158 stdout
159 .flush()
160 .await
161 .map_err(|e| TransportError::Internal(format!("stdout flush failed: {e}")))?;
162
163 Ok(())
164 }
165
166 fn is_healthy(&self) -> bool {
167 !self.closed.load(Ordering::Relaxed)
168 }
169
170 fn name(&self) -> &'static str {
171 "pipe"
172 }
173}
174
175impl TransportSender for PipeTransport {
176 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
177 if self.closed.load(Ordering::Relaxed) {
178 return SendResult::Fatal(TransportError::Closed);
179 }
180
181 if self.filter_engine.has_outbound_filters() {
183 match self.filter_engine.apply_outbound(&payload) {
184 super::filter::FilterDisposition::Pass => {}
185 super::filter::FilterDisposition::Drop => return SendResult::Ok,
186 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
187 }
188 }
189
190 let mut stdout = self.stdout.lock().await;
191
192 if let Err(e) = stdout.write_all(&payload).await {
194 return SendResult::Fatal(TransportError::Send(format!("stdout write failed: {e}")));
195 }
196 if let Err(e) = stdout.write_all(b"\n").await {
197 return SendResult::Fatal(TransportError::Send(format!(
198 "stdout newline write failed: {e}"
199 )));
200 }
201 if let Err(e) = stdout.flush().await {
202 return SendResult::Fatal(TransportError::Send(format!("stdout flush failed: {e}")));
203 }
204
205 #[cfg(feature = "logger")]
206 tracing::debug!(
207 bytes = payload.len(),
208 "Pipe transport: message sent to stdout"
209 );
210
211 #[cfg(feature = "metrics")]
212 metrics::counter!("dfe_transport_sent_total", "transport" => "pipe").increment(1);
213
214 SendResult::Ok
215 }
216}
217
218impl TransportReceiver for PipeTransport {
219 type Token = PipeToken;
220
221 async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
222 if self.closed.load(Ordering::Relaxed) {
223 return Err(TransportError::Closed);
224 }
225
226 let mut stdin = self.stdin.lock().await;
227 let mut messages = Vec::with_capacity(max.min(100));
228 let mut line_buf = String::new();
229
230 for _ in 0..max {
231 line_buf.clear();
232
233 let read_result = if self.recv_timeout_ms == 0 {
234 stdin.read_line(&mut line_buf).await
236 } else if messages.is_empty() {
237 match tokio::time::timeout(
239 std::time::Duration::from_millis(self.recv_timeout_ms),
240 stdin.read_line(&mut line_buf),
241 )
242 .await
243 {
244 Ok(result) => result,
245 Err(_) => break, }
247 } else {
248 match tokio::time::timeout(
250 std::time::Duration::from_millis(1),
251 stdin.read_line(&mut line_buf),
252 )
253 .await
254 {
255 Ok(result) => result,
256 Err(_) => break, }
258 };
259
260 match read_result {
261 Ok(0) => {
262 if messages.is_empty() {
264 return Err(TransportError::Closed);
265 }
266 break;
267 }
268 Ok(_) => {
269 let payload = line_buf.trim_end_matches('\n').trim_end_matches('\r');
271 if payload.is_empty() {
272 continue;
273 }
274
275 let payload_bytes = payload.as_bytes().to_vec();
276 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
277 let format = PayloadFormat::detect(&payload_bytes);
278 let timestamp_ms = chrono::Utc::now().timestamp_millis();
279
280 messages.push(Message {
281 key: None,
282 payload: payload_bytes,
283 token: PipeToken { seq },
284 timestamp_ms: Some(timestamp_ms),
285 format,
286 });
287
288 #[cfg(feature = "metrics")]
289 metrics::counter!("dfe_transport_received_total", "transport" => "pipe")
290 .increment(1);
291 }
292 Err(e) => {
293 return Err(TransportError::Recv(format!("stdin read failed: {e}")));
294 }
295 }
296 }
297
298 let batch = self.filter_engine.partition_batch(
301 messages,
302 |m| m.payload.as_slice(),
303 |m| m.key.clone(),
304 );
305 let messages = batch.messages;
306 let dlq_entries = batch.dlq_entries;
307
308 #[cfg(feature = "logger")]
309 if !messages.is_empty() {
310 tracing::debug!(
311 lines = messages.len(),
312 "Pipe transport: batch received from stdin"
313 );
314 }
315
316 Ok(RecvBatch {
317 messages,
318 dlq_entries,
319 })
320 }
321
322 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
323 Ok(())
325 }
326}
327
328impl super::traits::FromCascade for PipeTransportConfig {}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn token_display() {
336 let token = PipeToken { seq: 42 };
337 assert_eq!(token.to_string(), "pipe:42");
338 }
339
340 #[test]
341 fn token_as_str() {
342 let token = PipeToken { seq: 7 };
343 assert_eq!(token.as_str(), "pipe:7");
344 }
345
346 #[test]
347 fn token_clone() {
348 let token = PipeToken { seq: 99 };
349 let cloned = token;
350 assert_eq!(token, cloned);
351 }
352
353 #[test]
354 fn config_defaults() {
355 let config = PipeTransportConfig::default();
356 assert_eq!(config.recv_timeout_ms, 100);
357 }
358
359 #[test]
360 fn config_serde_roundtrip() {
361 let config = PipeTransportConfig {
362 recv_timeout_ms: 500,
363 ..Default::default()
364 };
365 let json = serde_json::to_string(&config).unwrap();
366 let parsed: PipeTransportConfig = serde_json::from_str(&json).unwrap();
367 assert_eq!(parsed.recv_timeout_ms, 500);
368 }
369
370 #[test]
371 fn config_serde_default_fields() {
372 let parsed: PipeTransportConfig = serde_json::from_str("{}").unwrap();
374 assert_eq!(parsed.recv_timeout_ms, 100);
375 }
376
377 #[tokio::test]
378 async fn new_transport_is_healthy() {
379 let config = PipeTransportConfig::default();
380 let transport = PipeTransport::new(&config);
381 assert!(transport.is_healthy());
382 assert_eq!(transport.name(), "pipe");
383 }
384
385 #[tokio::test]
386 async fn close_marks_unhealthy() {
387 let config = PipeTransportConfig::default();
388 let transport = PipeTransport::new(&config);
389
390 transport.close().await.unwrap();
391 assert!(!transport.is_healthy());
392 }
393
394 #[tokio::test]
395 async fn send_after_close_returns_fatal() {
396 let config = PipeTransportConfig::default();
397 let transport = PipeTransport::new(&config);
398
399 transport.close().await.unwrap();
400 let result = transport
401 .send("key", bytes::Bytes::from_static(b"data"))
402 .await;
403 assert!(result.is_fatal());
404 }
405
406 #[tokio::test]
407 async fn recv_after_close_returns_error() {
408 let config = PipeTransportConfig::default();
409 let transport = PipeTransport::new(&config);
410
411 transport.close().await.unwrap();
412 let result = transport.recv(1).await;
413 assert!(result.is_err());
414 }
415
416 #[tokio::test]
417 async fn commit_is_noop() {
418 let config = PipeTransportConfig::default();
419 let transport = PipeTransport::new(&config);
420
421 let tokens = vec![PipeToken { seq: 0 }, PipeToken { seq: 1 }];
422 let result = transport.commit(&tokens).await;
423 assert!(result.is_ok());
424 }
425}