Skip to main content

hyperi_rustlib/transport/
file.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/file.rs
3// Purpose:   NDJSON file transport
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # File Transport
10//!
11//! NDJSON (newline-delimited JSON) file transport for debugging, audit
12//! trails, and replay. Wraps async file I/O behind the Transport traits.
13//!
14//! ## Send
15//!
16//! Appends one NDJSON line per `send()` call to the configured file path.
17//!
18//! ## Receive
19//!
20//! Reads NDJSON lines from the file, tracking byte offset for commit.
21//! Position is persisted to a `.pos` sidecar file so reads survive restarts.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::file::{FileTransport, FileTransportConfig};
27//!
28//! let config = FileTransportConfig { path: "/tmp/events.ndjson".into(), append: true, ..Default::default() };
29//! let transport = FileTransport::new(&config).await?;
30//! transport.send("events", b"{\"msg\":\"hello\"}").await;
31//! ```
32
33use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use serde::{Deserialize, Serialize};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
41use tokio::sync::Mutex;
42
43/// Commit token for file transport.
44///
45/// Contains the byte offset in the file after reading the line.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct FileToken {
48    /// Byte offset after the line was read.
49    pub offset: u64,
50}
51
52impl CommitToken for FileToken {}
53
54impl std::fmt::Display for FileToken {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "file:{}", self.offset)
57    }
58}
59
60/// Configuration for file transport.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FileTransportConfig {
63    /// File path for read/write.
64    pub path: String,
65
66    /// Append mode (default true for send).
67    #[serde(default = "default_append")]
68    pub append: bool,
69
70    /// Inbound message filters (applied on recv before caller sees messages).
71    #[serde(default)]
72    pub filters_in: Vec<super::filter::FilterRule>,
73
74    /// Outbound message filters (applied on send before transport dispatches).
75    #[serde(default)]
76    pub filters_out: Vec<super::filter::FilterRule>,
77}
78
79fn default_append() -> bool {
80    true
81}
82
83impl Default for FileTransportConfig {
84    fn default() -> Self {
85        Self {
86            path: String::new(),
87            append: true,
88            filters_in: Vec::new(),
89            filters_out: Vec::new(),
90        }
91    }
92}
93
94impl FileTransportConfig {
95    /// Load from the config cascade under the `transport.file` key.
96    #[must_use]
97    pub fn from_cascade() -> Self {
98        #[cfg(feature = "config")]
99        {
100            if let Some(cfg) = crate::config::try_get()
101                && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.file")
102            {
103                return tc;
104            }
105        }
106        Self::default()
107    }
108}
109
110/// Internal state for the write side.
111struct WriteState {
112    file: tokio::fs::File,
113}
114
115/// Internal state for the read side.
116struct ReadState {
117    reader: BufReader<tokio::fs::File>,
118    offset: u64,
119    line_buf: String,
120}
121
122/// NDJSON file transport.
123///
124/// Supports both send (append) and receive (sequential read with
125/// position tracking). Position is persisted to a `.pos` sidecar
126/// file so reads survive process restarts.
127pub struct FileTransport {
128    config: FileTransportConfig,
129    writer: Mutex<Option<WriteState>>,
130    reader: Mutex<Option<ReadState>>,
131    closed: Arc<AtomicBool>,
132    filter_engine: super::filter::TransportFilterEngine,
133    /// Buffer for messages staged to DLQ by inbound filters.
134    /// Drained by `take_filtered_dlq_entries()`.
135    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
136}
137
138impl FileTransport {
139    /// Create a new file transport.
140    ///
141    /// # Errors
142    ///
143    /// Returns error if the file path is empty.
144    pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
145        if config.path.is_empty() {
146            return Err(TransportError::Config("file path is empty".into()));
147        }
148
149        #[cfg(feature = "logger")]
150        tracing::info!(path = %config.path, append = config.append, "File transport opened");
151
152        // Fail loud on bad filter config -- silently disabling filters
153        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
154        let filter_engine = super::filter::TransportFilterEngine::new(
155            &config.filters_in,
156            &config.filters_out,
157            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
158        )?;
159
160        let closed = Arc::new(AtomicBool::new(false));
161
162        #[cfg(feature = "health")]
163        {
164            let h = Arc::clone(&closed);
165            crate::health::HealthRegistry::register("transport:file", move || {
166                if h.load(Ordering::Relaxed) {
167                    crate::health::HealthStatus::Unhealthy
168                } else {
169                    crate::health::HealthStatus::Healthy
170                }
171            });
172        }
173
174        Ok(Self {
175            config: config.clone(),
176            writer: Mutex::new(None),
177            reader: Mutex::new(None),
178            closed,
179            filter_engine,
180            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
181        })
182    }
183
184    /// Path to the `.pos` sidecar file that tracks read position.
185    fn pos_path(data_path: &Path) -> PathBuf {
186        let mut pos_path = data_path.as_os_str().to_owned();
187        pos_path.push(".pos");
188        PathBuf::from(pos_path)
189    }
190
191    /// Load committed read position from the sidecar file.
192    async fn load_position(data_path: &Path) -> u64 {
193        let pos_path = Self::pos_path(data_path);
194        match tokio::fs::read_to_string(&pos_path).await {
195            Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
196            Err(_) => 0,
197        }
198    }
199
200    /// Save read position to the sidecar file.
201    async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
202        let pos_path = Self::pos_path(data_path);
203        tokio::fs::write(&pos_path, offset.to_string())
204            .await
205            .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
206    }
207
208    /// Lazily open the write file handle.
209    async fn ensure_writer(&self) -> TransportResult<()> {
210        let mut guard = self.writer.lock().await;
211        if guard.is_none() {
212            let file = tokio::fs::OpenOptions::new()
213                .create(true)
214                .append(self.config.append)
215                .write(true)
216                .open(&self.config.path)
217                .await
218                .map_err(|e| {
219                    TransportError::Connection(format!(
220                        "failed to open '{}' for writing: {e}",
221                        self.config.path
222                    ))
223                })?;
224            *guard = Some(WriteState { file });
225        }
226        Ok(())
227    }
228
229    /// Lazily open the read file handle and seek to committed position.
230    async fn ensure_reader(&self) -> TransportResult<()> {
231        let mut guard = self.reader.lock().await;
232        if guard.is_none() {
233            let path = Path::new(&self.config.path);
234
235            // If the file does not exist yet, there is nothing to read
236            if !path.exists() {
237                return Err(TransportError::Recv(format!(
238                    "file '{}' does not exist",
239                    self.config.path
240                )));
241            }
242
243            let offset = Self::load_position(path).await;
244            let mut file = tokio::fs::File::open(&self.config.path)
245                .await
246                .map_err(|e| {
247                    TransportError::Connection(format!(
248                        "failed to open '{}' for reading: {e}",
249                        self.config.path
250                    ))
251                })?;
252
253            // Seek to committed position
254            file.seek(std::io::SeekFrom::Start(offset))
255                .await
256                .map_err(|e| {
257                    TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
258                })?;
259
260            *guard = Some(ReadState {
261                reader: BufReader::new(file),
262                offset,
263                line_buf: String::with_capacity(4096),
264            });
265        }
266        Ok(())
267    }
268}
269
270impl TransportBase for FileTransport {
271    async fn close(&self) -> TransportResult<()> {
272        self.closed.store(true, Ordering::Relaxed);
273
274        // Flush and drop writer
275        if let Some(mut state) = self.writer.lock().await.take() {
276            let _ = state.file.flush().await;
277        }
278
279        // Drop reader
280        let _ = self.reader.lock().await.take();
281
282        Ok(())
283    }
284
285    fn is_healthy(&self) -> bool {
286        !self.closed.load(Ordering::Relaxed)
287    }
288
289    fn name(&self) -> &'static str {
290        "file"
291    }
292}
293
294impl TransportSender for FileTransport {
295    async fn send(&self, _key: &str, payload: &[u8]) -> SendResult {
296        if self.closed.load(Ordering::Relaxed) {
297            return SendResult::Fatal(TransportError::Closed);
298        }
299
300        // Outbound filter check
301        if self.filter_engine.has_outbound_filters() {
302            match self.filter_engine.apply_outbound(payload) {
303                super::filter::FilterDisposition::Pass => {}
304                super::filter::FilterDisposition::Drop => return SendResult::Ok,
305                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
306            }
307        }
308
309        if let Err(e) = self.ensure_writer().await {
310            return SendResult::Fatal(e);
311        }
312
313        let mut guard = self.writer.lock().await;
314        let Some(state) = guard.as_mut() else {
315            return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
316        };
317
318        // Write payload + newline as a single operation
319        if let Err(e) = state.file.write_all(payload).await {
320            #[cfg(feature = "logger")]
321            tracing::warn!(error = %e, "File transport: write error");
322            return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
323        }
324        if let Err(e) = state.file.write_all(b"\n").await {
325            #[cfg(feature = "logger")]
326            tracing::warn!(error = %e, "File transport: newline write error");
327            return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
328        }
329        if let Err(e) = state.file.flush().await {
330            #[cfg(feature = "logger")]
331            tracing::warn!(error = %e, "File transport: flush error");
332            return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
333        }
334
335        #[cfg(feature = "logger")]
336        tracing::debug!(bytes = payload.len(), "File transport: message sent");
337
338        #[cfg(feature = "metrics")]
339        metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
340
341        SendResult::Ok
342    }
343}
344
345impl TransportReceiver for FileTransport {
346    type Token = FileToken;
347
348    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
349        if self.closed.load(Ordering::Relaxed) {
350            return Err(TransportError::Closed);
351        }
352
353        self.ensure_reader().await?;
354
355        let mut guard = self.reader.lock().await;
356        let state = guard
357            .as_mut()
358            .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
359
360        let mut messages = Vec::with_capacity(max.min(100));
361
362        for _ in 0..max {
363            state.line_buf.clear();
364            let bytes_read = state
365                .reader
366                .read_line(&mut state.line_buf)
367                .await
368                .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
369
370            if bytes_read == 0 {
371                // EOF
372                break;
373            }
374
375            state.offset += bytes_read as u64;
376
377            // Strip trailing newline
378            let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
379            if line.is_empty() {
380                continue;
381            }
382
383            let payload = line.as_bytes().to_vec();
384            let format = PayloadFormat::detect(&payload);
385            let timestamp_ms = chrono::Utc::now().timestamp_millis();
386
387            messages.push(Message {
388                key: None,
389                payload,
390                token: FileToken {
391                    offset: state.offset,
392                },
393                timestamp_ms: Some(timestamp_ms),
394                format,
395            });
396        }
397
398        // Apply inbound filters: drop messages, stage DLQ entries
399        if self.filter_engine.has_inbound_filters() {
400            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
401            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
402                super::filter::FilterDisposition::Pass => true,
403                super::filter::FilterDisposition::Drop => false,
404                super::filter::FilterDisposition::Dlq => {
405                    staged_dlq.push(super::filter::FilteredDlqEntry {
406                        payload: msg.payload.clone(),
407                        key: msg.key.clone(),
408                        reason: "transport filter".to_string(),
409                    });
410                    false
411                }
412            });
413            if !staged_dlq.is_empty() {
414                self.filtered_dlq_buffer.lock().extend(staged_dlq);
415            }
416        }
417
418        #[cfg(feature = "logger")]
419        if !messages.is_empty() {
420            tracing::debug!(lines = messages.len(), "File transport: batch received");
421        }
422
423        #[cfg(feature = "metrics")]
424        if !messages.is_empty() {
425            metrics::counter!("dfe_transport_received_total", "transport" => "file")
426                .increment(messages.len() as u64);
427        }
428
429        Ok(messages)
430    }
431
432    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
433        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
434    }
435
436    async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
437        if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
438            let path = Path::new(&self.config.path);
439            Self::save_position(path, max_token.offset).await?;
440
441            #[cfg(feature = "logger")]
442            tracing::debug!(
443                offset = max_token.offset,
444                "File transport: position committed"
445            );
446        }
447        Ok(())
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use tempfile::TempDir;
455
456    async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
457        let path = dir.path().join(filename);
458        let config = FileTransportConfig {
459            path: path.to_str().unwrap().to_string(),
460            append: true,
461            ..Default::default()
462        };
463        FileTransport::new(&config).await.unwrap()
464    }
465
466    #[tokio::test]
467    async fn send_and_receive() {
468        let dir = TempDir::new().unwrap();
469        let path = dir.path().join("test.ndjson");
470        let path_str = path.to_str().unwrap().to_string();
471
472        // Write messages
473        let config = FileTransportConfig {
474            path: path_str.clone(),
475            append: true,
476            ..Default::default()
477        };
478        let sender = FileTransport::new(&config).await.unwrap();
479
480        let r1 = sender.send("key", b"{\"msg\":\"hello\"}").await;
481        assert!(r1.is_ok());
482        let r2 = sender.send("key", b"{\"msg\":\"world\"}").await;
483        assert!(r2.is_ok());
484        sender.close().await.unwrap();
485
486        // Read messages back
487        let reader_config = FileTransportConfig {
488            path: path_str,
489            append: true,
490            ..Default::default()
491        };
492        let reader = FileTransport::new(&reader_config).await.unwrap();
493        let messages = reader.recv(10).await.unwrap();
494
495        assert_eq!(messages.len(), 2);
496        assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
497        assert_eq!(messages[1].payload, b"{\"msg\":\"world\"}");
498
499        // Tokens should have increasing offsets
500        assert!(messages[1].token.offset > messages[0].token.offset);
501    }
502
503    #[tokio::test]
504    async fn commit_persists_position() {
505        let dir = TempDir::new().unwrap();
506        let path = dir.path().join("commit_test.ndjson");
507        let path_str = path.to_str().unwrap().to_string();
508
509        // Write 3 messages
510        let config = FileTransportConfig {
511            path: path_str.clone(),
512            append: true,
513            ..Default::default()
514        };
515        let sender = FileTransport::new(&config).await.unwrap();
516        sender.send("k", b"line1").await;
517        sender.send("k", b"line2").await;
518        sender.send("k", b"line3").await;
519        sender.close().await.unwrap();
520
521        // Read first 2 messages and commit
522        let r1 = FileTransport::new(&FileTransportConfig {
523            path: path_str.clone(),
524            append: true,
525            ..Default::default()
526        })
527        .await
528        .unwrap();
529        let msgs = r1.recv(2).await.unwrap();
530        assert_eq!(msgs.len(), 2);
531        assert_eq!(msgs[0].payload, b"line1");
532        assert_eq!(msgs[1].payload, b"line2");
533
534        // Commit up to message 2
535        let tokens: Vec<_> = msgs.iter().map(|m| m.token).collect();
536        r1.commit(&tokens).await.unwrap();
537        r1.close().await.unwrap();
538
539        // Open a new transport -- should resume from committed position
540        let r2 = FileTransport::new(&FileTransportConfig {
541            path: path_str,
542            append: true,
543            ..Default::default()
544        })
545        .await
546        .unwrap();
547        let remaining = r2.recv(10).await.unwrap();
548        assert_eq!(remaining.len(), 1);
549        assert_eq!(remaining[0].payload, b"line3");
550    }
551
552    #[tokio::test]
553    async fn close_prevents_operations() {
554        let dir = TempDir::new().unwrap();
555        let transport = make_transport(&dir, "close_test.ndjson").await;
556
557        transport.close().await.unwrap();
558        assert!(!transport.is_healthy());
559
560        let result = transport.send("k", b"data").await;
561        assert!(result.is_fatal());
562
563        let result = transport.recv(1).await;
564        assert!(result.is_err());
565    }
566
567    #[tokio::test]
568    async fn file_token_display() {
569        let token = FileToken { offset: 42 };
570        assert_eq!(format!("{token}"), "file:42");
571    }
572
573    #[tokio::test]
574    async fn recv_returns_empty_at_eof() {
575        let dir = TempDir::new().unwrap();
576        let path = dir.path().join("eof_test.ndjson");
577        let path_str = path.to_str().unwrap().to_string();
578
579        // Write one line
580        let config = FileTransportConfig {
581            path: path_str.clone(),
582            append: true,
583            ..Default::default()
584        };
585        let transport = FileTransport::new(&config).await.unwrap();
586        transport.send("k", b"only_line").await;
587        transport.close().await.unwrap();
588
589        // Read all, then read again -- should get empty
590        let reader = FileTransport::new(&FileTransportConfig {
591            path: path_str,
592            append: true,
593            ..Default::default()
594        })
595        .await
596        .unwrap();
597        let msgs = reader.recv(10).await.unwrap();
598        assert_eq!(msgs.len(), 1);
599
600        let more = reader.recv(10).await.unwrap();
601        assert!(more.is_empty());
602    }
603
604    #[tokio::test]
605    async fn empty_path_is_config_error() {
606        let result = FileTransport::new(&FileTransportConfig::default()).await;
607        assert!(result.is_err());
608    }
609
610    #[tokio::test]
611    async fn transport_name() {
612        let dir = TempDir::new().unwrap();
613        let transport = make_transport(&dir, "name_test.ndjson").await;
614        assert_eq!(transport.name(), "file");
615    }
616}