Skip to main content

hyperi_rustlib/transport/
file.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/file.rs
3// Purpose:   NDJSON file transport
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # File Transport
10//!
11//! NDJSON (newline-delimited JSON) file transport for debugging, audit
12//! trails, and replay. Wraps async file I/O behind the Transport traits.
13//!
14//! ## Send
15//!
16//! Appends one NDJSON line per `send()` call to the configured file path.
17//!
18//! ## Receive
19//!
20//! Reads NDJSON lines from the file, tracking byte offset for commit.
21//! Position is persisted to a `.pos` sidecar file so reads survive restarts.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::file::{FileTransport, FileTransportConfig};
27//!
28//! let config = FileTransportConfig { path: "/tmp/events.ndjson".into(), append: true, ..Default::default() };
29//! let transport = FileTransport::new(&config).await?;
30//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
31//! ```
32
33use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use super::work_batch::WorkBatch;
37use serde::{Deserialize, Serialize};
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
42use tokio::sync::Mutex;
43
44/// Commit token for file transport.
45///
46/// Contains the byte offset in the file after reading the line.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub struct FileToken {
49    /// Byte offset after the line was read.
50    pub offset: u64,
51}
52
53impl CommitToken for FileToken {}
54
55impl std::fmt::Display for FileToken {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(f, "file:{}", self.offset)
58    }
59}
60
61/// Configuration for file transport.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct FileTransportConfig {
64    /// File path for read/write.
65    pub path: String,
66
67    /// Append mode (default true for send).
68    #[serde(default = "default_append")]
69    pub append: bool,
70
71    /// Inbound message filters (applied on recv before caller sees messages).
72    #[serde(default)]
73    pub filters_in: Vec<super::filter::FilterRule>,
74
75    /// Outbound message filters (applied on send before transport dispatches).
76    #[serde(default)]
77    pub filters_out: Vec<super::filter::FilterRule>,
78}
79
80fn default_append() -> bool {
81    true
82}
83
84impl Default for FileTransportConfig {
85    fn default() -> Self {
86        Self {
87            path: String::new(),
88            append: true,
89            filters_in: Vec::new(),
90            filters_out: Vec::new(),
91        }
92    }
93}
94
95impl FileTransportConfig {
96    /// Load from the config cascade under the `transport.file` key.
97    #[must_use]
98    pub fn from_cascade() -> Self {
99        <Self as super::traits::FromCascade>::from_cascade_key("transport.file")
100    }
101}
102
103/// Internal state for the write side.
104struct WriteState {
105    file: tokio::fs::File,
106}
107
108/// Internal state for the read side.
109struct ReadState {
110    reader: BufReader<tokio::fs::File>,
111    offset: u64,
112    line_buf: String,
113}
114
115/// NDJSON file transport.
116///
117/// Supports both send (append) and receive (sequential read with
118/// position tracking). Position is persisted to a `.pos` sidecar
119/// file so reads survive process restarts.
120pub struct FileTransport {
121    config: FileTransportConfig,
122    writer: Mutex<Option<WriteState>>,
123    reader: Mutex<Option<ReadState>>,
124    closed: Arc<AtomicBool>,
125    filter_engine: super::filter::TransportFilterEngine,
126}
127
128impl FileTransport {
129    /// Create a new file transport.
130    ///
131    /// # Errors
132    ///
133    /// Returns error if the file path is empty.
134    pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
135        if config.path.is_empty() {
136            return Err(TransportError::Config("file path is empty".into()));
137        }
138
139        #[cfg(feature = "logger")]
140        tracing::info!(path = %config.path, append = config.append, "File transport opened");
141
142        // Fail loud on bad filter config -- silently disabling filters
143        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
144        let filter_engine = super::filter::TransportFilterEngine::new(
145            &config.filters_in,
146            &config.filters_out,
147            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
148        )?;
149
150        let closed = Arc::new(AtomicBool::new(false));
151
152        #[cfg(feature = "health")]
153        {
154            let h = Arc::clone(&closed);
155            crate::health::HealthRegistry::register("transport:file", move || {
156                if h.load(Ordering::Relaxed) {
157                    crate::health::HealthStatus::Unhealthy
158                } else {
159                    crate::health::HealthStatus::Healthy
160                }
161            });
162        }
163
164        Ok(Self {
165            config: config.clone(),
166            writer: Mutex::new(None),
167            reader: Mutex::new(None),
168            closed,
169            filter_engine,
170        })
171    }
172
173    /// Path to the `.pos` sidecar file that tracks read position.
174    fn pos_path(data_path: &Path) -> PathBuf {
175        let mut pos_path = data_path.as_os_str().to_owned();
176        pos_path.push(".pos");
177        PathBuf::from(pos_path)
178    }
179
180    /// Load committed read position from the sidecar file.
181    async fn load_position(data_path: &Path) -> u64 {
182        let pos_path = Self::pos_path(data_path);
183        match tokio::fs::read_to_string(&pos_path).await {
184            Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
185            Err(_) => 0,
186        }
187    }
188
189    /// Save read position to the sidecar file.
190    async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
191        let pos_path = Self::pos_path(data_path);
192        tokio::fs::write(&pos_path, offset.to_string())
193            .await
194            .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
195    }
196
197    /// Lazily open the write file handle.
198    async fn ensure_writer(&self) -> TransportResult<()> {
199        let mut guard = self.writer.lock().await;
200        if guard.is_none() {
201            let file = tokio::fs::OpenOptions::new()
202                .create(true)
203                .append(self.config.append)
204                .write(true)
205                .open(&self.config.path)
206                .await
207                .map_err(|e| {
208                    TransportError::Connection(format!(
209                        "failed to open '{}' for writing: {e}",
210                        self.config.path
211                    ))
212                })?;
213            *guard = Some(WriteState { file });
214        }
215        Ok(())
216    }
217
218    /// Lazily open the read file handle and seek to committed position.
219    async fn ensure_reader(&self) -> TransportResult<()> {
220        let mut guard = self.reader.lock().await;
221        if guard.is_none() {
222            let path = Path::new(&self.config.path);
223
224            // If the file does not exist yet, there is nothing to read
225            if !path.exists() {
226                return Err(TransportError::Recv(format!(
227                    "file '{}' does not exist",
228                    self.config.path
229                )));
230            }
231
232            let offset = Self::load_position(path).await;
233            let mut file = tokio::fs::File::open(&self.config.path)
234                .await
235                .map_err(|e| {
236                    TransportError::Connection(format!(
237                        "failed to open '{}' for reading: {e}",
238                        self.config.path
239                    ))
240                })?;
241
242            // Seek to committed position
243            file.seek(std::io::SeekFrom::Start(offset))
244                .await
245                .map_err(|e| {
246                    TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
247                })?;
248
249            *guard = Some(ReadState {
250                reader: BufReader::new(file),
251                offset,
252                line_buf: String::with_capacity(4096),
253            });
254        }
255        Ok(())
256    }
257}
258
259impl TransportBase for FileTransport {
260    async fn close(&self) -> TransportResult<()> {
261        self.closed.store(true, Ordering::Relaxed);
262
263        // Flush and drop writer
264        if let Some(mut state) = self.writer.lock().await.take() {
265            let _ = state.file.flush().await;
266        }
267
268        // Drop reader
269        let _ = self.reader.lock().await.take();
270
271        Ok(())
272    }
273
274    fn is_healthy(&self) -> bool {
275        !self.closed.load(Ordering::Relaxed)
276    }
277
278    fn name(&self) -> &'static str {
279        "file"
280    }
281}
282
283impl TransportSender for FileTransport {
284    async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
285        if self.closed.load(Ordering::Relaxed) {
286            return SendResult::Fatal(TransportError::Closed);
287        }
288
289        // Outbound filter check
290        if self.filter_engine.has_outbound_filters() {
291            match self.filter_engine.apply_outbound(&payload) {
292                super::filter::FilterDisposition::Pass => {}
293                super::filter::FilterDisposition::Drop => return SendResult::Ok,
294                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
295            }
296        }
297
298        if let Err(e) = self.ensure_writer().await {
299            return SendResult::Fatal(e);
300        }
301
302        let mut guard = self.writer.lock().await;
303        let Some(state) = guard.as_mut() else {
304            return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
305        };
306
307        // Write payload + newline as a single operation
308        if let Err(e) = state.file.write_all(&payload).await {
309            #[cfg(feature = "logger")]
310            tracing::warn!(error = %e, "File transport: write error");
311            return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
312        }
313        if let Err(e) = state.file.write_all(b"\n").await {
314            #[cfg(feature = "logger")]
315            tracing::warn!(error = %e, "File transport: newline write error");
316            return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
317        }
318        if let Err(e) = state.file.flush().await {
319            #[cfg(feature = "logger")]
320            tracing::warn!(error = %e, "File transport: flush error");
321            return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
322        }
323
324        #[cfg(feature = "logger")]
325        tracing::debug!(bytes = payload.len(), "File transport: message sent");
326
327        #[cfg(feature = "metrics")]
328        metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
329
330        SendResult::Ok
331    }
332}
333
334impl TransportReceiver for FileTransport {
335    type Token = FileToken;
336
337    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
338        if self.closed.load(Ordering::Relaxed) {
339            return Err(TransportError::Closed);
340        }
341
342        self.ensure_reader().await?;
343
344        let mut guard = self.reader.lock().await;
345        let state = guard
346            .as_mut()
347            .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
348
349        let mut messages = Vec::with_capacity(max.min(100));
350
351        for _ in 0..max {
352            state.line_buf.clear();
353            let bytes_read = state
354                .reader
355                .read_line(&mut state.line_buf)
356                .await
357                .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
358
359            if bytes_read == 0 {
360                // EOF
361                break;
362            }
363
364            state.offset += bytes_read as u64;
365
366            // Strip trailing newline
367            let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
368            if line.is_empty() {
369                continue;
370            }
371
372            let payload: bytes::Bytes = line.as_bytes().to_vec().into();
373            let format = PayloadFormat::detect(&payload);
374            let timestamp_ms = chrono::Utc::now().timestamp_millis();
375
376            messages.push(Message {
377                key: None,
378                payload,
379                token: FileToken {
380                    offset: state.offset,
381                },
382                timestamp_ms: Some(timestamp_ms),
383                format,
384            });
385        }
386
387        // Apply inbound filters via the shared partition helper; DLQ entries
388        // are returned in the RecvBatch for the caller to route onward.
389        let batch =
390            self.filter_engine
391                .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
392        let messages = batch.messages;
393        let dlq_entries = batch.dlq_entries;
394
395        #[cfg(feature = "logger")]
396        if !messages.is_empty() {
397            tracing::debug!(lines = messages.len(), "File transport: batch received");
398        }
399
400        #[cfg(feature = "metrics")]
401        if !messages.is_empty() {
402            metrics::counter!("dfe_transport_received_total", "transport" => "file")
403                .increment(messages.len() as u64);
404        }
405
406        Ok(RecvBatch {
407            messages,
408            dlq_entries,
409        }
410        .into())
411    }
412
413    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
414        if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
415            let path = Path::new(&self.config.path);
416            Self::save_position(path, max_token.offset).await?;
417
418            #[cfg(feature = "logger")]
419            tracing::debug!(
420                offset = max_token.offset,
421                "File transport: position committed"
422            );
423        }
424        Ok(())
425    }
426}
427
428impl super::traits::FromCascade for FileTransportConfig {}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use tempfile::TempDir;
434
435    async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
436        let path = dir.path().join(filename);
437        let config = FileTransportConfig {
438            path: path.to_str().unwrap().to_string(),
439            append: true,
440            ..Default::default()
441        };
442        FileTransport::new(&config).await.unwrap()
443    }
444
445    #[tokio::test]
446    async fn send_and_receive() {
447        let dir = TempDir::new().unwrap();
448        let path = dir.path().join("test.ndjson");
449        let path_str = path.to_str().unwrap().to_string();
450
451        // Write messages
452        let config = FileTransportConfig {
453            path: path_str.clone(),
454            append: true,
455            ..Default::default()
456        };
457        let sender = FileTransport::new(&config).await.unwrap();
458
459        let r1 = sender
460            .send("key", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
461            .await;
462        assert!(r1.is_ok());
463        let r2 = sender
464            .send("key", bytes::Bytes::from_static(b"{\"msg\":\"world\"}"))
465            .await;
466        assert!(r2.is_ok());
467        sender.close().await.unwrap();
468
469        // Read messages back
470        let reader_config = FileTransportConfig {
471            path: path_str,
472            append: true,
473            ..Default::default()
474        };
475        let reader = FileTransport::new(&reader_config).await.unwrap();
476        let batch = reader.recv(10).await.unwrap();
477
478        assert_eq!(batch.records.len(), 2);
479        assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
480        assert_eq!(batch.records[1].payload.as_ref(), b"{\"msg\":\"world\"}");
481
482        // Commit tokens (carried on the batch, in record order) should have
483        // increasing offsets.
484        assert!(batch.commit_tokens[1].offset > batch.commit_tokens[0].offset);
485    }
486
487    #[tokio::test]
488    async fn commit_persists_position() {
489        let dir = TempDir::new().unwrap();
490        let path = dir.path().join("commit_test.ndjson");
491        let path_str = path.to_str().unwrap().to_string();
492
493        // Write 3 messages
494        let config = FileTransportConfig {
495            path: path_str.clone(),
496            append: true,
497            ..Default::default()
498        };
499        let sender = FileTransport::new(&config).await.unwrap();
500        sender.send("k", bytes::Bytes::from_static(b"line1")).await;
501        sender.send("k", bytes::Bytes::from_static(b"line2")).await;
502        sender.send("k", bytes::Bytes::from_static(b"line3")).await;
503        sender.close().await.unwrap();
504
505        // Read first 2 messages and commit
506        let r1 = FileTransport::new(&FileTransportConfig {
507            path: path_str.clone(),
508            append: true,
509            ..Default::default()
510        })
511        .await
512        .unwrap();
513        let batch = r1.recv(2).await.unwrap();
514        assert_eq!(batch.records.len(), 2);
515        assert_eq!(batch.records[0].payload.as_ref(), b"line1");
516        assert_eq!(batch.records[1].payload.as_ref(), b"line2");
517
518        // Commit up to message 2 via the batch's commit tokens.
519        r1.commit(&batch.commit_tokens).await.unwrap();
520        r1.close().await.unwrap();
521
522        // Open a new transport -- should resume from committed position
523        let r2 = FileTransport::new(&FileTransportConfig {
524            path: path_str,
525            append: true,
526            ..Default::default()
527        })
528        .await
529        .unwrap();
530        let remaining = r2.recv(10).await.unwrap().records;
531        assert_eq!(remaining.len(), 1);
532        assert_eq!(remaining[0].payload.as_ref(), b"line3");
533    }
534
535    #[tokio::test]
536    async fn close_prevents_operations() {
537        let dir = TempDir::new().unwrap();
538        let transport = make_transport(&dir, "close_test.ndjson").await;
539
540        transport.close().await.unwrap();
541        assert!(!transport.is_healthy());
542
543        let result = transport
544            .send("k", bytes::Bytes::from_static(b"data"))
545            .await;
546        assert!(result.is_fatal());
547
548        let result = transport.recv(1).await;
549        assert!(result.is_err());
550    }
551
552    #[tokio::test]
553    async fn file_token_display() {
554        let token = FileToken { offset: 42 };
555        assert_eq!(format!("{token}"), "file:42");
556    }
557
558    #[tokio::test]
559    async fn recv_returns_empty_at_eof() {
560        let dir = TempDir::new().unwrap();
561        let path = dir.path().join("eof_test.ndjson");
562        let path_str = path.to_str().unwrap().to_string();
563
564        // Write one line
565        let config = FileTransportConfig {
566            path: path_str.clone(),
567            append: true,
568            ..Default::default()
569        };
570        let transport = FileTransport::new(&config).await.unwrap();
571        transport
572            .send("k", bytes::Bytes::from_static(b"only_line"))
573            .await;
574        transport.close().await.unwrap();
575
576        // Read all, then read again -- should get empty
577        let reader = FileTransport::new(&FileTransportConfig {
578            path: path_str,
579            append: true,
580            ..Default::default()
581        })
582        .await
583        .unwrap();
584        let msgs = reader.recv(10).await.unwrap().records;
585        assert_eq!(msgs.len(), 1);
586
587        let more = reader.recv(10).await.unwrap().records;
588        assert!(more.is_empty());
589    }
590
591    #[tokio::test]
592    async fn empty_path_is_config_error() {
593        let result = FileTransport::new(&FileTransportConfig::default()).await;
594        assert!(result.is_err());
595    }
596
597    #[tokio::test]
598    async fn transport_name() {
599        let dir = TempDir::new().unwrap();
600        let transport = make_transport(&dir, "name_test.ndjson").await;
601        assert_eq!(transport.name(), "file");
602    }
603}