hyperi_rustlib/transport/
file.rs1use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use serde::{Deserialize, Serialize};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
41use tokio::sync::Mutex;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct FileToken {
48 pub offset: u64,
50}
51
52impl CommitToken for FileToken {}
53
54impl std::fmt::Display for FileToken {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "file:{}", self.offset)
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FileTransportConfig {
63 pub path: String,
65
66 #[serde(default = "default_append")]
68 pub append: bool,
69
70 #[serde(default)]
72 pub filters_in: Vec<super::filter::FilterRule>,
73
74 #[serde(default)]
76 pub filters_out: Vec<super::filter::FilterRule>,
77}
78
79fn default_append() -> bool {
80 true
81}
82
83impl Default for FileTransportConfig {
84 fn default() -> Self {
85 Self {
86 path: String::new(),
87 append: true,
88 filters_in: Vec::new(),
89 filters_out: Vec::new(),
90 }
91 }
92}
93
94impl FileTransportConfig {
95 #[must_use]
97 pub fn from_cascade() -> Self {
98 #[cfg(feature = "config")]
99 {
100 if let Some(cfg) = crate::config::try_get()
101 && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.file")
102 {
103 return tc;
104 }
105 }
106 Self::default()
107 }
108}
109
110struct WriteState {
112 file: tokio::fs::File,
113}
114
115struct ReadState {
117 reader: BufReader<tokio::fs::File>,
118 offset: u64,
119 line_buf: String,
120}
121
122pub struct FileTransport {
128 config: FileTransportConfig,
129 writer: Mutex<Option<WriteState>>,
130 reader: Mutex<Option<ReadState>>,
131 closed: Arc<AtomicBool>,
132 filter_engine: super::filter::TransportFilterEngine,
133 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
136}
137
138impl FileTransport {
139 pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
145 if config.path.is_empty() {
146 return Err(TransportError::Config("file path is empty".into()));
147 }
148
149 #[cfg(feature = "logger")]
150 tracing::info!(path = %config.path, append = config.append, "File transport opened");
151
152 let filter_engine = super::filter::TransportFilterEngine::new(
155 &config.filters_in,
156 &config.filters_out,
157 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
158 )?;
159
160 let closed = Arc::new(AtomicBool::new(false));
161
162 #[cfg(feature = "health")]
163 {
164 let h = Arc::clone(&closed);
165 crate::health::HealthRegistry::register("transport:file", move || {
166 if h.load(Ordering::Relaxed) {
167 crate::health::HealthStatus::Unhealthy
168 } else {
169 crate::health::HealthStatus::Healthy
170 }
171 });
172 }
173
174 Ok(Self {
175 config: config.clone(),
176 writer: Mutex::new(None),
177 reader: Mutex::new(None),
178 closed,
179 filter_engine,
180 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
181 })
182 }
183
184 fn pos_path(data_path: &Path) -> PathBuf {
186 let mut pos_path = data_path.as_os_str().to_owned();
187 pos_path.push(".pos");
188 PathBuf::from(pos_path)
189 }
190
191 async fn load_position(data_path: &Path) -> u64 {
193 let pos_path = Self::pos_path(data_path);
194 match tokio::fs::read_to_string(&pos_path).await {
195 Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
196 Err(_) => 0,
197 }
198 }
199
200 async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
202 let pos_path = Self::pos_path(data_path);
203 tokio::fs::write(&pos_path, offset.to_string())
204 .await
205 .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
206 }
207
208 async fn ensure_writer(&self) -> TransportResult<()> {
210 let mut guard = self.writer.lock().await;
211 if guard.is_none() {
212 let file = tokio::fs::OpenOptions::new()
213 .create(true)
214 .append(self.config.append)
215 .write(true)
216 .open(&self.config.path)
217 .await
218 .map_err(|e| {
219 TransportError::Connection(format!(
220 "failed to open '{}' for writing: {e}",
221 self.config.path
222 ))
223 })?;
224 *guard = Some(WriteState { file });
225 }
226 Ok(())
227 }
228
229 async fn ensure_reader(&self) -> TransportResult<()> {
231 let mut guard = self.reader.lock().await;
232 if guard.is_none() {
233 let path = Path::new(&self.config.path);
234
235 if !path.exists() {
237 return Err(TransportError::Recv(format!(
238 "file '{}' does not exist",
239 self.config.path
240 )));
241 }
242
243 let offset = Self::load_position(path).await;
244 let mut file = tokio::fs::File::open(&self.config.path)
245 .await
246 .map_err(|e| {
247 TransportError::Connection(format!(
248 "failed to open '{}' for reading: {e}",
249 self.config.path
250 ))
251 })?;
252
253 file.seek(std::io::SeekFrom::Start(offset))
255 .await
256 .map_err(|e| {
257 TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
258 })?;
259
260 *guard = Some(ReadState {
261 reader: BufReader::new(file),
262 offset,
263 line_buf: String::with_capacity(4096),
264 });
265 }
266 Ok(())
267 }
268}
269
270impl TransportBase for FileTransport {
271 async fn close(&self) -> TransportResult<()> {
272 self.closed.store(true, Ordering::Relaxed);
273
274 if let Some(mut state) = self.writer.lock().await.take() {
276 let _ = state.file.flush().await;
277 }
278
279 let _ = self.reader.lock().await.take();
281
282 Ok(())
283 }
284
285 fn is_healthy(&self) -> bool {
286 !self.closed.load(Ordering::Relaxed)
287 }
288
289 fn name(&self) -> &'static str {
290 "file"
291 }
292}
293
294impl TransportSender for FileTransport {
295 async fn send(&self, _key: &str, payload: &[u8]) -> SendResult {
296 if self.closed.load(Ordering::Relaxed) {
297 return SendResult::Fatal(TransportError::Closed);
298 }
299
300 if self.filter_engine.has_outbound_filters() {
302 match self.filter_engine.apply_outbound(payload) {
303 super::filter::FilterDisposition::Pass => {}
304 super::filter::FilterDisposition::Drop => return SendResult::Ok,
305 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
306 }
307 }
308
309 if let Err(e) = self.ensure_writer().await {
310 return SendResult::Fatal(e);
311 }
312
313 let mut guard = self.writer.lock().await;
314 let Some(state) = guard.as_mut() else {
315 return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
316 };
317
318 if let Err(e) = state.file.write_all(payload).await {
320 #[cfg(feature = "logger")]
321 tracing::warn!(error = %e, "File transport: write error");
322 return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
323 }
324 if let Err(e) = state.file.write_all(b"\n").await {
325 #[cfg(feature = "logger")]
326 tracing::warn!(error = %e, "File transport: newline write error");
327 return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
328 }
329 if let Err(e) = state.file.flush().await {
330 #[cfg(feature = "logger")]
331 tracing::warn!(error = %e, "File transport: flush error");
332 return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
333 }
334
335 #[cfg(feature = "logger")]
336 tracing::debug!(bytes = payload.len(), "File transport: message sent");
337
338 #[cfg(feature = "metrics")]
339 metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
340
341 SendResult::Ok
342 }
343}
344
345impl TransportReceiver for FileTransport {
346 type Token = FileToken;
347
348 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
349 if self.closed.load(Ordering::Relaxed) {
350 return Err(TransportError::Closed);
351 }
352
353 self.ensure_reader().await?;
354
355 let mut guard = self.reader.lock().await;
356 let state = guard
357 .as_mut()
358 .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
359
360 let mut messages = Vec::with_capacity(max.min(100));
361
362 for _ in 0..max {
363 state.line_buf.clear();
364 let bytes_read = state
365 .reader
366 .read_line(&mut state.line_buf)
367 .await
368 .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
369
370 if bytes_read == 0 {
371 break;
373 }
374
375 state.offset += bytes_read as u64;
376
377 let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
379 if line.is_empty() {
380 continue;
381 }
382
383 let payload = line.as_bytes().to_vec();
384 let format = PayloadFormat::detect(&payload);
385 let timestamp_ms = chrono::Utc::now().timestamp_millis();
386
387 messages.push(Message {
388 key: None,
389 payload,
390 token: FileToken {
391 offset: state.offset,
392 },
393 timestamp_ms: Some(timestamp_ms),
394 format,
395 });
396 }
397
398 if self.filter_engine.has_inbound_filters() {
400 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
401 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
402 super::filter::FilterDisposition::Pass => true,
403 super::filter::FilterDisposition::Drop => false,
404 super::filter::FilterDisposition::Dlq => {
405 staged_dlq.push(super::filter::FilteredDlqEntry {
406 payload: msg.payload.clone(),
407 key: msg.key.clone(),
408 reason: "transport filter".to_string(),
409 });
410 false
411 }
412 });
413 if !staged_dlq.is_empty() {
414 self.filtered_dlq_buffer.lock().extend(staged_dlq);
415 }
416 }
417
418 #[cfg(feature = "logger")]
419 if !messages.is_empty() {
420 tracing::debug!(lines = messages.len(), "File transport: batch received");
421 }
422
423 #[cfg(feature = "metrics")]
424 if !messages.is_empty() {
425 metrics::counter!("dfe_transport_received_total", "transport" => "file")
426 .increment(messages.len() as u64);
427 }
428
429 Ok(messages)
430 }
431
432 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
433 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
434 }
435
436 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
437 if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
438 let path = Path::new(&self.config.path);
439 Self::save_position(path, max_token.offset).await?;
440
441 #[cfg(feature = "logger")]
442 tracing::debug!(
443 offset = max_token.offset,
444 "File transport: position committed"
445 );
446 }
447 Ok(())
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use tempfile::TempDir;
455
456 async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
457 let path = dir.path().join(filename);
458 let config = FileTransportConfig {
459 path: path.to_str().unwrap().to_string(),
460 append: true,
461 ..Default::default()
462 };
463 FileTransport::new(&config).await.unwrap()
464 }
465
466 #[tokio::test]
467 async fn send_and_receive() {
468 let dir = TempDir::new().unwrap();
469 let path = dir.path().join("test.ndjson");
470 let path_str = path.to_str().unwrap().to_string();
471
472 let config = FileTransportConfig {
474 path: path_str.clone(),
475 append: true,
476 ..Default::default()
477 };
478 let sender = FileTransport::new(&config).await.unwrap();
479
480 let r1 = sender.send("key", b"{\"msg\":\"hello\"}").await;
481 assert!(r1.is_ok());
482 let r2 = sender.send("key", b"{\"msg\":\"world\"}").await;
483 assert!(r2.is_ok());
484 sender.close().await.unwrap();
485
486 let reader_config = FileTransportConfig {
488 path: path_str,
489 append: true,
490 ..Default::default()
491 };
492 let reader = FileTransport::new(&reader_config).await.unwrap();
493 let messages = reader.recv(10).await.unwrap();
494
495 assert_eq!(messages.len(), 2);
496 assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
497 assert_eq!(messages[1].payload, b"{\"msg\":\"world\"}");
498
499 assert!(messages[1].token.offset > messages[0].token.offset);
501 }
502
503 #[tokio::test]
504 async fn commit_persists_position() {
505 let dir = TempDir::new().unwrap();
506 let path = dir.path().join("commit_test.ndjson");
507 let path_str = path.to_str().unwrap().to_string();
508
509 let config = FileTransportConfig {
511 path: path_str.clone(),
512 append: true,
513 ..Default::default()
514 };
515 let sender = FileTransport::new(&config).await.unwrap();
516 sender.send("k", b"line1").await;
517 sender.send("k", b"line2").await;
518 sender.send("k", b"line3").await;
519 sender.close().await.unwrap();
520
521 let r1 = FileTransport::new(&FileTransportConfig {
523 path: path_str.clone(),
524 append: true,
525 ..Default::default()
526 })
527 .await
528 .unwrap();
529 let msgs = r1.recv(2).await.unwrap();
530 assert_eq!(msgs.len(), 2);
531 assert_eq!(msgs[0].payload, b"line1");
532 assert_eq!(msgs[1].payload, b"line2");
533
534 let tokens: Vec<_> = msgs.iter().map(|m| m.token).collect();
536 r1.commit(&tokens).await.unwrap();
537 r1.close().await.unwrap();
538
539 let r2 = FileTransport::new(&FileTransportConfig {
541 path: path_str,
542 append: true,
543 ..Default::default()
544 })
545 .await
546 .unwrap();
547 let remaining = r2.recv(10).await.unwrap();
548 assert_eq!(remaining.len(), 1);
549 assert_eq!(remaining[0].payload, b"line3");
550 }
551
552 #[tokio::test]
553 async fn close_prevents_operations() {
554 let dir = TempDir::new().unwrap();
555 let transport = make_transport(&dir, "close_test.ndjson").await;
556
557 transport.close().await.unwrap();
558 assert!(!transport.is_healthy());
559
560 let result = transport.send("k", b"data").await;
561 assert!(result.is_fatal());
562
563 let result = transport.recv(1).await;
564 assert!(result.is_err());
565 }
566
567 #[tokio::test]
568 async fn file_token_display() {
569 let token = FileToken { offset: 42 };
570 assert_eq!(format!("{token}"), "file:42");
571 }
572
573 #[tokio::test]
574 async fn recv_returns_empty_at_eof() {
575 let dir = TempDir::new().unwrap();
576 let path = dir.path().join("eof_test.ndjson");
577 let path_str = path.to_str().unwrap().to_string();
578
579 let config = FileTransportConfig {
581 path: path_str.clone(),
582 append: true,
583 ..Default::default()
584 };
585 let transport = FileTransport::new(&config).await.unwrap();
586 transport.send("k", b"only_line").await;
587 transport.close().await.unwrap();
588
589 let reader = FileTransport::new(&FileTransportConfig {
591 path: path_str,
592 append: true,
593 ..Default::default()
594 })
595 .await
596 .unwrap();
597 let msgs = reader.recv(10).await.unwrap();
598 assert_eq!(msgs.len(), 1);
599
600 let more = reader.recv(10).await.unwrap();
601 assert!(more.is_empty());
602 }
603
604 #[tokio::test]
605 async fn empty_path_is_config_error() {
606 let result = FileTransport::new(&FileTransportConfig::default()).await;
607 assert!(result.is_err());
608 }
609
610 #[tokio::test]
611 async fn transport_name() {
612 let dir = TempDir::new().unwrap();
613 let transport = make_transport(&dir, "name_test.ndjson").await;
614 assert_eq!(transport.name(), "file");
615 }
616}