Skip to main content

hyperi_rustlib/transport/
file.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/file.rs
3// Purpose:   NDJSON file transport
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # File Transport
10//!
11//! NDJSON (newline-delimited JSON) file transport for debugging, audit
12//! trails, and replay. Wraps async file I/O behind the Transport traits.
13//!
14//! ## Send
15//!
16//! Appends one NDJSON line per `send()` call to the configured file path.
17//!
18//! ## Receive
19//!
20//! Reads NDJSON lines from the file, tracking byte offset for commit.
21//! Position is persisted to a `.pos` sidecar file so reads survive restarts.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::file::{FileTransport, FileTransportConfig};
27//!
28//! let config = FileTransportConfig { path: "/tmp/events.ndjson".into(), append: true, ..Default::default() };
29//! let transport = FileTransport::new(&config).await?;
30//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
31//! ```
32
33use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use serde::{Deserialize, Serialize};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
41use tokio::sync::Mutex;
42
43/// Commit token for file transport.
44///
45/// Contains the byte offset in the file after reading the line.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct FileToken {
48    /// Byte offset after the line was read.
49    pub offset: u64,
50}
51
52impl CommitToken for FileToken {}
53
54impl std::fmt::Display for FileToken {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "file:{}", self.offset)
57    }
58}
59
60/// Configuration for file transport.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FileTransportConfig {
63    /// File path for read/write.
64    pub path: String,
65
66    /// Append mode (default true for send).
67    #[serde(default = "default_append")]
68    pub append: bool,
69
70    /// Inbound message filters (applied on recv before caller sees messages).
71    #[serde(default)]
72    pub filters_in: Vec<super::filter::FilterRule>,
73
74    /// Outbound message filters (applied on send before transport dispatches).
75    #[serde(default)]
76    pub filters_out: Vec<super::filter::FilterRule>,
77}
78
79fn default_append() -> bool {
80    true
81}
82
83impl Default for FileTransportConfig {
84    fn default() -> Self {
85        Self {
86            path: String::new(),
87            append: true,
88            filters_in: Vec::new(),
89            filters_out: Vec::new(),
90        }
91    }
92}
93
94impl FileTransportConfig {
95    /// Load from the config cascade under the `transport.file` key.
96    #[must_use]
97    pub fn from_cascade() -> Self {
98        <Self as super::traits::FromCascade>::from_cascade_key("transport.file")
99    }
100}
101
102/// Internal state for the write side.
103struct WriteState {
104    file: tokio::fs::File,
105}
106
107/// Internal state for the read side.
108struct ReadState {
109    reader: BufReader<tokio::fs::File>,
110    offset: u64,
111    line_buf: String,
112}
113
114/// NDJSON file transport.
115///
116/// Supports both send (append) and receive (sequential read with
117/// position tracking). Position is persisted to a `.pos` sidecar
118/// file so reads survive process restarts.
119pub struct FileTransport {
120    config: FileTransportConfig,
121    writer: Mutex<Option<WriteState>>,
122    reader: Mutex<Option<ReadState>>,
123    closed: Arc<AtomicBool>,
124    filter_engine: super::filter::TransportFilterEngine,
125}
126
127impl FileTransport {
128    /// Create a new file transport.
129    ///
130    /// # Errors
131    ///
132    /// Returns error if the file path is empty.
133    pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
134        if config.path.is_empty() {
135            return Err(TransportError::Config("file path is empty".into()));
136        }
137
138        #[cfg(feature = "logger")]
139        tracing::info!(path = %config.path, append = config.append, "File transport opened");
140
141        // Fail loud on bad filter config -- silently disabling filters
142        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
143        let filter_engine = super::filter::TransportFilterEngine::new(
144            &config.filters_in,
145            &config.filters_out,
146            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
147        )?;
148
149        let closed = Arc::new(AtomicBool::new(false));
150
151        #[cfg(feature = "health")]
152        {
153            let h = Arc::clone(&closed);
154            crate::health::HealthRegistry::register("transport:file", move || {
155                if h.load(Ordering::Relaxed) {
156                    crate::health::HealthStatus::Unhealthy
157                } else {
158                    crate::health::HealthStatus::Healthy
159                }
160            });
161        }
162
163        Ok(Self {
164            config: config.clone(),
165            writer: Mutex::new(None),
166            reader: Mutex::new(None),
167            closed,
168            filter_engine,
169        })
170    }
171
172    /// Path to the `.pos` sidecar file that tracks read position.
173    fn pos_path(data_path: &Path) -> PathBuf {
174        let mut pos_path = data_path.as_os_str().to_owned();
175        pos_path.push(".pos");
176        PathBuf::from(pos_path)
177    }
178
179    /// Load committed read position from the sidecar file.
180    async fn load_position(data_path: &Path) -> u64 {
181        let pos_path = Self::pos_path(data_path);
182        match tokio::fs::read_to_string(&pos_path).await {
183            Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
184            Err(_) => 0,
185        }
186    }
187
188    /// Save read position to the sidecar file.
189    async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
190        let pos_path = Self::pos_path(data_path);
191        tokio::fs::write(&pos_path, offset.to_string())
192            .await
193            .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
194    }
195
196    /// Lazily open the write file handle.
197    async fn ensure_writer(&self) -> TransportResult<()> {
198        let mut guard = self.writer.lock().await;
199        if guard.is_none() {
200            let file = tokio::fs::OpenOptions::new()
201                .create(true)
202                .append(self.config.append)
203                .write(true)
204                .open(&self.config.path)
205                .await
206                .map_err(|e| {
207                    TransportError::Connection(format!(
208                        "failed to open '{}' for writing: {e}",
209                        self.config.path
210                    ))
211                })?;
212            *guard = Some(WriteState { file });
213        }
214        Ok(())
215    }
216
217    /// Lazily open the read file handle and seek to committed position.
218    async fn ensure_reader(&self) -> TransportResult<()> {
219        let mut guard = self.reader.lock().await;
220        if guard.is_none() {
221            let path = Path::new(&self.config.path);
222
223            // If the file does not exist yet, there is nothing to read
224            if !path.exists() {
225                return Err(TransportError::Recv(format!(
226                    "file '{}' does not exist",
227                    self.config.path
228                )));
229            }
230
231            let offset = Self::load_position(path).await;
232            let mut file = tokio::fs::File::open(&self.config.path)
233                .await
234                .map_err(|e| {
235                    TransportError::Connection(format!(
236                        "failed to open '{}' for reading: {e}",
237                        self.config.path
238                    ))
239                })?;
240
241            // Seek to committed position
242            file.seek(std::io::SeekFrom::Start(offset))
243                .await
244                .map_err(|e| {
245                    TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
246                })?;
247
248            *guard = Some(ReadState {
249                reader: BufReader::new(file),
250                offset,
251                line_buf: String::with_capacity(4096),
252            });
253        }
254        Ok(())
255    }
256}
257
258impl TransportBase for FileTransport {
259    async fn close(&self) -> TransportResult<()> {
260        self.closed.store(true, Ordering::Relaxed);
261
262        // Flush and drop writer
263        if let Some(mut state) = self.writer.lock().await.take() {
264            let _ = state.file.flush().await;
265        }
266
267        // Drop reader
268        let _ = self.reader.lock().await.take();
269
270        Ok(())
271    }
272
273    fn is_healthy(&self) -> bool {
274        !self.closed.load(Ordering::Relaxed)
275    }
276
277    fn name(&self) -> &'static str {
278        "file"
279    }
280}
281
282impl TransportSender for FileTransport {
283    async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
284        if self.closed.load(Ordering::Relaxed) {
285            return SendResult::Fatal(TransportError::Closed);
286        }
287
288        // Outbound filter check
289        if self.filter_engine.has_outbound_filters() {
290            match self.filter_engine.apply_outbound(&payload) {
291                super::filter::FilterDisposition::Pass => {}
292                super::filter::FilterDisposition::Drop => return SendResult::Ok,
293                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
294            }
295        }
296
297        if let Err(e) = self.ensure_writer().await {
298            return SendResult::Fatal(e);
299        }
300
301        let mut guard = self.writer.lock().await;
302        let Some(state) = guard.as_mut() else {
303            return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
304        };
305
306        // Write payload + newline as a single operation
307        if let Err(e) = state.file.write_all(&payload).await {
308            #[cfg(feature = "logger")]
309            tracing::warn!(error = %e, "File transport: write error");
310            return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
311        }
312        if let Err(e) = state.file.write_all(b"\n").await {
313            #[cfg(feature = "logger")]
314            tracing::warn!(error = %e, "File transport: newline write error");
315            return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
316        }
317        if let Err(e) = state.file.flush().await {
318            #[cfg(feature = "logger")]
319            tracing::warn!(error = %e, "File transport: flush error");
320            return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
321        }
322
323        #[cfg(feature = "logger")]
324        tracing::debug!(bytes = payload.len(), "File transport: message sent");
325
326        #[cfg(feature = "metrics")]
327        metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
328
329        SendResult::Ok
330    }
331}
332
333impl TransportReceiver for FileTransport {
334    type Token = FileToken;
335
336    async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
337        if self.closed.load(Ordering::Relaxed) {
338            return Err(TransportError::Closed);
339        }
340
341        self.ensure_reader().await?;
342
343        let mut guard = self.reader.lock().await;
344        let state = guard
345            .as_mut()
346            .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
347
348        let mut messages = Vec::with_capacity(max.min(100));
349
350        for _ in 0..max {
351            state.line_buf.clear();
352            let bytes_read = state
353                .reader
354                .read_line(&mut state.line_buf)
355                .await
356                .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
357
358            if bytes_read == 0 {
359                // EOF
360                break;
361            }
362
363            state.offset += bytes_read as u64;
364
365            // Strip trailing newline
366            let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
367            if line.is_empty() {
368                continue;
369            }
370
371            let payload = line.as_bytes().to_vec();
372            let format = PayloadFormat::detect(&payload);
373            let timestamp_ms = chrono::Utc::now().timestamp_millis();
374
375            messages.push(Message {
376                key: None,
377                payload,
378                token: FileToken {
379                    offset: state.offset,
380                },
381                timestamp_ms: Some(timestamp_ms),
382                format,
383            });
384        }
385
386        // Apply inbound filters via the shared partition helper; DLQ entries
387        // are returned in the RecvBatch for the caller to route onward.
388        let batch = self.filter_engine.partition_batch(
389            messages,
390            |m| m.payload.as_slice(),
391            |m| m.key.clone(),
392        );
393        let messages = batch.messages;
394        let dlq_entries = batch.dlq_entries;
395
396        #[cfg(feature = "logger")]
397        if !messages.is_empty() {
398            tracing::debug!(lines = messages.len(), "File transport: batch received");
399        }
400
401        #[cfg(feature = "metrics")]
402        if !messages.is_empty() {
403            metrics::counter!("dfe_transport_received_total", "transport" => "file")
404                .increment(messages.len() as u64);
405        }
406
407        Ok(RecvBatch {
408            messages,
409            dlq_entries,
410        })
411    }
412
413    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
414        if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
415            let path = Path::new(&self.config.path);
416            Self::save_position(path, max_token.offset).await?;
417
418            #[cfg(feature = "logger")]
419            tracing::debug!(
420                offset = max_token.offset,
421                "File transport: position committed"
422            );
423        }
424        Ok(())
425    }
426}
427
428impl super::traits::FromCascade for FileTransportConfig {}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use tempfile::TempDir;
434
435    async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
436        let path = dir.path().join(filename);
437        let config = FileTransportConfig {
438            path: path.to_str().unwrap().to_string(),
439            append: true,
440            ..Default::default()
441        };
442        FileTransport::new(&config).await.unwrap()
443    }
444
445    #[tokio::test]
446    async fn send_and_receive() {
447        let dir = TempDir::new().unwrap();
448        let path = dir.path().join("test.ndjson");
449        let path_str = path.to_str().unwrap().to_string();
450
451        // Write messages
452        let config = FileTransportConfig {
453            path: path_str.clone(),
454            append: true,
455            ..Default::default()
456        };
457        let sender = FileTransport::new(&config).await.unwrap();
458
459        let r1 = sender
460            .send("key", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
461            .await;
462        assert!(r1.is_ok());
463        let r2 = sender
464            .send("key", bytes::Bytes::from_static(b"{\"msg\":\"world\"}"))
465            .await;
466        assert!(r2.is_ok());
467        sender.close().await.unwrap();
468
469        // Read messages back
470        let reader_config = FileTransportConfig {
471            path: path_str,
472            append: true,
473            ..Default::default()
474        };
475        let reader = FileTransport::new(&reader_config).await.unwrap();
476        let messages = reader.recv(10).await.unwrap().messages;
477
478        assert_eq!(messages.len(), 2);
479        assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
480        assert_eq!(messages[1].payload, b"{\"msg\":\"world\"}");
481
482        // Tokens should have increasing offsets
483        assert!(messages[1].token.offset > messages[0].token.offset);
484    }
485
486    #[tokio::test]
487    async fn commit_persists_position() {
488        let dir = TempDir::new().unwrap();
489        let path = dir.path().join("commit_test.ndjson");
490        let path_str = path.to_str().unwrap().to_string();
491
492        // Write 3 messages
493        let config = FileTransportConfig {
494            path: path_str.clone(),
495            append: true,
496            ..Default::default()
497        };
498        let sender = FileTransport::new(&config).await.unwrap();
499        sender.send("k", bytes::Bytes::from_static(b"line1")).await;
500        sender.send("k", bytes::Bytes::from_static(b"line2")).await;
501        sender.send("k", bytes::Bytes::from_static(b"line3")).await;
502        sender.close().await.unwrap();
503
504        // Read first 2 messages and commit
505        let r1 = FileTransport::new(&FileTransportConfig {
506            path: path_str.clone(),
507            append: true,
508            ..Default::default()
509        })
510        .await
511        .unwrap();
512        let msgs = r1.recv(2).await.unwrap().messages;
513        assert_eq!(msgs.len(), 2);
514        assert_eq!(msgs[0].payload, b"line1");
515        assert_eq!(msgs[1].payload, b"line2");
516
517        // Commit up to message 2
518        let tokens: Vec<_> = msgs.iter().map(|m| m.token).collect();
519        r1.commit(&tokens).await.unwrap();
520        r1.close().await.unwrap();
521
522        // Open a new transport -- should resume from committed position
523        let r2 = FileTransport::new(&FileTransportConfig {
524            path: path_str,
525            append: true,
526            ..Default::default()
527        })
528        .await
529        .unwrap();
530        let remaining = r2.recv(10).await.unwrap().messages;
531        assert_eq!(remaining.len(), 1);
532        assert_eq!(remaining[0].payload, b"line3");
533    }
534
535    #[tokio::test]
536    async fn close_prevents_operations() {
537        let dir = TempDir::new().unwrap();
538        let transport = make_transport(&dir, "close_test.ndjson").await;
539
540        transport.close().await.unwrap();
541        assert!(!transport.is_healthy());
542
543        let result = transport
544            .send("k", bytes::Bytes::from_static(b"data"))
545            .await;
546        assert!(result.is_fatal());
547
548        let result = transport.recv(1).await;
549        assert!(result.is_err());
550    }
551
552    #[tokio::test]
553    async fn file_token_display() {
554        let token = FileToken { offset: 42 };
555        assert_eq!(format!("{token}"), "file:42");
556    }
557
558    #[tokio::test]
559    async fn recv_returns_empty_at_eof() {
560        let dir = TempDir::new().unwrap();
561        let path = dir.path().join("eof_test.ndjson");
562        let path_str = path.to_str().unwrap().to_string();
563
564        // Write one line
565        let config = FileTransportConfig {
566            path: path_str.clone(),
567            append: true,
568            ..Default::default()
569        };
570        let transport = FileTransport::new(&config).await.unwrap();
571        transport
572            .send("k", bytes::Bytes::from_static(b"only_line"))
573            .await;
574        transport.close().await.unwrap();
575
576        // Read all, then read again -- should get empty
577        let reader = FileTransport::new(&FileTransportConfig {
578            path: path_str,
579            append: true,
580            ..Default::default()
581        })
582        .await
583        .unwrap();
584        let msgs = reader.recv(10).await.unwrap().messages;
585        assert_eq!(msgs.len(), 1);
586
587        let more = reader.recv(10).await.unwrap().messages;
588        assert!(more.is_empty());
589    }
590
591    #[tokio::test]
592    async fn empty_path_is_config_error() {
593        let result = FileTransport::new(&FileTransportConfig::default()).await;
594        assert!(result.is_err());
595    }
596
597    #[tokio::test]
598    async fn transport_name() {
599        let dir = TempDir::new().unwrap();
600        let transport = make_transport(&dir, "name_test.ndjson").await;
601        assert_eq!(transport.name(), "file");
602    }
603}