hyperi_rustlib/transport/
file.rs1use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use super::work_batch::WorkBatch;
37use serde::{Deserialize, Serialize};
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
42use tokio::sync::Mutex;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub struct FileToken {
49 pub offset: u64,
51}
52
53impl CommitToken for FileToken {}
54
55impl std::fmt::Display for FileToken {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(f, "file:{}", self.offset)
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct FileTransportConfig {
64 pub path: String,
66
67 #[serde(default = "default_append")]
69 pub append: bool,
70
71 #[serde(default)]
73 pub filters_in: Vec<super::filter::FilterRule>,
74
75 #[serde(default)]
77 pub filters_out: Vec<super::filter::FilterRule>,
78}
79
80fn default_append() -> bool {
81 true
82}
83
84impl Default for FileTransportConfig {
85 fn default() -> Self {
86 Self {
87 path: String::new(),
88 append: true,
89 filters_in: Vec::new(),
90 filters_out: Vec::new(),
91 }
92 }
93}
94
95impl FileTransportConfig {
96 #[must_use]
98 pub fn from_cascade() -> Self {
99 <Self as super::traits::FromCascade>::from_cascade_key("transport.file")
100 }
101}
102
103struct WriteState {
105 file: tokio::fs::File,
106}
107
108struct ReadState {
110 reader: BufReader<tokio::fs::File>,
111 offset: u64,
112 line_buf: String,
113}
114
115pub struct FileTransport {
121 config: FileTransportConfig,
122 writer: Mutex<Option<WriteState>>,
123 reader: Mutex<Option<ReadState>>,
124 closed: Arc<AtomicBool>,
125 filter_engine: super::filter::TransportFilterEngine,
126}
127
128impl FileTransport {
129 pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
135 if config.path.is_empty() {
136 return Err(TransportError::Config("file path is empty".into()));
137 }
138
139 #[cfg(feature = "logger")]
140 tracing::info!(path = %config.path, append = config.append, "File transport opened");
141
142 let filter_engine = super::filter::TransportFilterEngine::new(
145 &config.filters_in,
146 &config.filters_out,
147 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
148 )?;
149
150 let closed = Arc::new(AtomicBool::new(false));
151
152 #[cfg(feature = "health")]
153 {
154 let h = Arc::clone(&closed);
155 crate::health::HealthRegistry::register("transport:file", move || {
156 if h.load(Ordering::Relaxed) {
157 crate::health::HealthStatus::Unhealthy
158 } else {
159 crate::health::HealthStatus::Healthy
160 }
161 });
162 }
163
164 Ok(Self {
165 config: config.clone(),
166 writer: Mutex::new(None),
167 reader: Mutex::new(None),
168 closed,
169 filter_engine,
170 })
171 }
172
173 fn pos_path(data_path: &Path) -> PathBuf {
175 let mut pos_path = data_path.as_os_str().to_owned();
176 pos_path.push(".pos");
177 PathBuf::from(pos_path)
178 }
179
180 async fn load_position(data_path: &Path) -> u64 {
182 let pos_path = Self::pos_path(data_path);
183 match tokio::fs::read_to_string(&pos_path).await {
184 Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
185 Err(_) => 0,
186 }
187 }
188
189 async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
191 let pos_path = Self::pos_path(data_path);
192 tokio::fs::write(&pos_path, offset.to_string())
193 .await
194 .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
195 }
196
197 async fn ensure_writer(&self) -> TransportResult<()> {
199 let mut guard = self.writer.lock().await;
200 if guard.is_none() {
201 let file = tokio::fs::OpenOptions::new()
202 .create(true)
203 .append(self.config.append)
204 .write(true)
205 .open(&self.config.path)
206 .await
207 .map_err(|e| {
208 TransportError::Connection(format!(
209 "failed to open '{}' for writing: {e}",
210 self.config.path
211 ))
212 })?;
213 *guard = Some(WriteState { file });
214 }
215 Ok(())
216 }
217
218 async fn ensure_reader(&self) -> TransportResult<()> {
220 let mut guard = self.reader.lock().await;
221 if guard.is_none() {
222 let path = Path::new(&self.config.path);
223
224 if !path.exists() {
226 return Err(TransportError::Recv(format!(
227 "file '{}' does not exist",
228 self.config.path
229 )));
230 }
231
232 let offset = Self::load_position(path).await;
233 let mut file = tokio::fs::File::open(&self.config.path)
234 .await
235 .map_err(|e| {
236 TransportError::Connection(format!(
237 "failed to open '{}' for reading: {e}",
238 self.config.path
239 ))
240 })?;
241
242 file.seek(std::io::SeekFrom::Start(offset))
244 .await
245 .map_err(|e| {
246 TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
247 })?;
248
249 *guard = Some(ReadState {
250 reader: BufReader::new(file),
251 offset,
252 line_buf: String::with_capacity(4096),
253 });
254 }
255 Ok(())
256 }
257}
258
259impl TransportBase for FileTransport {
260 async fn close(&self) -> TransportResult<()> {
261 self.closed.store(true, Ordering::Relaxed);
262
263 if let Some(mut state) = self.writer.lock().await.take() {
265 let _ = state.file.flush().await;
266 }
267
268 let _ = self.reader.lock().await.take();
270
271 Ok(())
272 }
273
274 fn is_healthy(&self) -> bool {
275 !self.closed.load(Ordering::Relaxed)
276 }
277
278 fn name(&self) -> &'static str {
279 "file"
280 }
281}
282
283impl TransportSender for FileTransport {
284 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
285 if self.closed.load(Ordering::Relaxed) {
286 return SendResult::Fatal(TransportError::Closed);
287 }
288
289 if self.filter_engine.has_outbound_filters() {
291 match self.filter_engine.apply_outbound(&payload) {
292 super::filter::FilterDisposition::Pass => {}
293 super::filter::FilterDisposition::Drop => return SendResult::Ok,
294 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
295 }
296 }
297
298 if let Err(e) = self.ensure_writer().await {
299 return SendResult::Fatal(e);
300 }
301
302 let mut guard = self.writer.lock().await;
303 let Some(state) = guard.as_mut() else {
304 return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
305 };
306
307 if let Err(e) = state.file.write_all(&payload).await {
309 #[cfg(feature = "logger")]
310 tracing::warn!(error = %e, "File transport: write error");
311 return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
312 }
313 if let Err(e) = state.file.write_all(b"\n").await {
314 #[cfg(feature = "logger")]
315 tracing::warn!(error = %e, "File transport: newline write error");
316 return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
317 }
318 if let Err(e) = state.file.flush().await {
319 #[cfg(feature = "logger")]
320 tracing::warn!(error = %e, "File transport: flush error");
321 return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
322 }
323
324 #[cfg(feature = "logger")]
325 tracing::debug!(bytes = payload.len(), "File transport: message sent");
326
327 #[cfg(feature = "metrics")]
328 metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
329
330 SendResult::Ok
331 }
332}
333
334impl TransportReceiver for FileTransport {
335 type Token = FileToken;
336
337 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
338 if self.closed.load(Ordering::Relaxed) {
339 return Err(TransportError::Closed);
340 }
341
342 self.ensure_reader().await?;
343
344 let mut guard = self.reader.lock().await;
345 let state = guard
346 .as_mut()
347 .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
348
349 let mut messages = Vec::with_capacity(max.min(100));
350
351 for _ in 0..max {
352 state.line_buf.clear();
353 let bytes_read = state
354 .reader
355 .read_line(&mut state.line_buf)
356 .await
357 .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
358
359 if bytes_read == 0 {
360 break;
362 }
363
364 state.offset += bytes_read as u64;
365
366 let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
368 if line.is_empty() {
369 continue;
370 }
371
372 let payload: bytes::Bytes = line.as_bytes().to_vec().into();
373 let format = PayloadFormat::detect(&payload);
374 let timestamp_ms = chrono::Utc::now().timestamp_millis();
375
376 messages.push(Message {
377 key: None,
378 payload,
379 token: FileToken {
380 offset: state.offset,
381 },
382 timestamp_ms: Some(timestamp_ms),
383 format,
384 });
385 }
386
387 let batch =
390 self.filter_engine
391 .partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
392 let messages = batch.messages;
393 let dlq_entries = batch.dlq_entries;
394
395 #[cfg(feature = "logger")]
396 if !messages.is_empty() {
397 tracing::debug!(lines = messages.len(), "File transport: batch received");
398 }
399
400 #[cfg(feature = "metrics")]
401 if !messages.is_empty() {
402 metrics::counter!("dfe_transport_received_total", "transport" => "file")
403 .increment(messages.len() as u64);
404 }
405
406 Ok(RecvBatch {
407 messages,
408 dlq_entries,
409 }
410 .into())
411 }
412
413 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
414 if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
415 let path = Path::new(&self.config.path);
416 Self::save_position(path, max_token.offset).await?;
417
418 #[cfg(feature = "logger")]
419 tracing::debug!(
420 offset = max_token.offset,
421 "File transport: position committed"
422 );
423 }
424 Ok(())
425 }
426}
427
428impl super::traits::FromCascade for FileTransportConfig {}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use tempfile::TempDir;
434
435 async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
436 let path = dir.path().join(filename);
437 let config = FileTransportConfig {
438 path: path.to_str().unwrap().to_string(),
439 append: true,
440 ..Default::default()
441 };
442 FileTransport::new(&config).await.unwrap()
443 }
444
445 #[tokio::test]
446 async fn send_and_receive() {
447 let dir = TempDir::new().unwrap();
448 let path = dir.path().join("test.ndjson");
449 let path_str = path.to_str().unwrap().to_string();
450
451 let config = FileTransportConfig {
453 path: path_str.clone(),
454 append: true,
455 ..Default::default()
456 };
457 let sender = FileTransport::new(&config).await.unwrap();
458
459 let r1 = sender
460 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
461 .await;
462 assert!(r1.is_ok());
463 let r2 = sender
464 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"world\"}"))
465 .await;
466 assert!(r2.is_ok());
467 sender.close().await.unwrap();
468
469 let reader_config = FileTransportConfig {
471 path: path_str,
472 append: true,
473 ..Default::default()
474 };
475 let reader = FileTransport::new(&reader_config).await.unwrap();
476 let batch = reader.recv(10).await.unwrap();
477
478 assert_eq!(batch.records.len(), 2);
479 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
480 assert_eq!(batch.records[1].payload.as_ref(), b"{\"msg\":\"world\"}");
481
482 assert!(batch.commit_tokens[1].offset > batch.commit_tokens[0].offset);
485 }
486
487 #[tokio::test]
488 async fn commit_persists_position() {
489 let dir = TempDir::new().unwrap();
490 let path = dir.path().join("commit_test.ndjson");
491 let path_str = path.to_str().unwrap().to_string();
492
493 let config = FileTransportConfig {
495 path: path_str.clone(),
496 append: true,
497 ..Default::default()
498 };
499 let sender = FileTransport::new(&config).await.unwrap();
500 sender.send("k", bytes::Bytes::from_static(b"line1")).await;
501 sender.send("k", bytes::Bytes::from_static(b"line2")).await;
502 sender.send("k", bytes::Bytes::from_static(b"line3")).await;
503 sender.close().await.unwrap();
504
505 let r1 = FileTransport::new(&FileTransportConfig {
507 path: path_str.clone(),
508 append: true,
509 ..Default::default()
510 })
511 .await
512 .unwrap();
513 let batch = r1.recv(2).await.unwrap();
514 assert_eq!(batch.records.len(), 2);
515 assert_eq!(batch.records[0].payload.as_ref(), b"line1");
516 assert_eq!(batch.records[1].payload.as_ref(), b"line2");
517
518 r1.commit(&batch.commit_tokens).await.unwrap();
520 r1.close().await.unwrap();
521
522 let r2 = FileTransport::new(&FileTransportConfig {
524 path: path_str,
525 append: true,
526 ..Default::default()
527 })
528 .await
529 .unwrap();
530 let remaining = r2.recv(10).await.unwrap().records;
531 assert_eq!(remaining.len(), 1);
532 assert_eq!(remaining[0].payload.as_ref(), b"line3");
533 }
534
535 #[tokio::test]
536 async fn close_prevents_operations() {
537 let dir = TempDir::new().unwrap();
538 let transport = make_transport(&dir, "close_test.ndjson").await;
539
540 transport.close().await.unwrap();
541 assert!(!transport.is_healthy());
542
543 let result = transport
544 .send("k", bytes::Bytes::from_static(b"data"))
545 .await;
546 assert!(result.is_fatal());
547
548 let result = transport.recv(1).await;
549 assert!(result.is_err());
550 }
551
552 #[tokio::test]
553 async fn file_token_display() {
554 let token = FileToken { offset: 42 };
555 assert_eq!(format!("{token}"), "file:42");
556 }
557
558 #[tokio::test]
559 async fn recv_returns_empty_at_eof() {
560 let dir = TempDir::new().unwrap();
561 let path = dir.path().join("eof_test.ndjson");
562 let path_str = path.to_str().unwrap().to_string();
563
564 let config = FileTransportConfig {
566 path: path_str.clone(),
567 append: true,
568 ..Default::default()
569 };
570 let transport = FileTransport::new(&config).await.unwrap();
571 transport
572 .send("k", bytes::Bytes::from_static(b"only_line"))
573 .await;
574 transport.close().await.unwrap();
575
576 let reader = FileTransport::new(&FileTransportConfig {
578 path: path_str,
579 append: true,
580 ..Default::default()
581 })
582 .await
583 .unwrap();
584 let msgs = reader.recv(10).await.unwrap().records;
585 assert_eq!(msgs.len(), 1);
586
587 let more = reader.recv(10).await.unwrap().records;
588 assert!(more.is_empty());
589 }
590
591 #[tokio::test]
592 async fn empty_path_is_config_error() {
593 let result = FileTransport::new(&FileTransportConfig::default()).await;
594 assert!(result.is_err());
595 }
596
597 #[tokio::test]
598 async fn transport_name() {
599 let dir = TempDir::new().unwrap();
600 let transport = make_transport(&dir, "name_test.ndjson").await;
601 assert_eq!(transport.name(), "file");
602 }
603}