hyperi_rustlib/transport/
file.rs1use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use super::work_batch::WorkBatch;
37use serde::{Deserialize, Serialize};
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
42use tokio::sync::Mutex;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub struct FileToken {
49 pub offset: u64,
51}
52
53impl CommitToken for FileToken {}
54
55impl std::fmt::Display for FileToken {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(f, "file:{}", self.offset)
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct FileTransportConfig {
64 pub path: String,
66
67 #[serde(default = "default_append")]
69 pub append: bool,
70
71 #[serde(default)]
73 pub filters_in: Vec<super::filter::FilterRule>,
74
75 #[serde(default)]
77 pub filters_out: Vec<super::filter::FilterRule>,
78}
79
80fn default_append() -> bool {
81 true
82}
83
84impl Default for FileTransportConfig {
85 fn default() -> Self {
86 Self {
87 path: String::new(),
88 append: true,
89 filters_in: Vec::new(),
90 filters_out: Vec::new(),
91 }
92 }
93}
94
95impl FileTransportConfig {
96 #[must_use]
98 pub fn from_cascade() -> Self {
99 <Self as super::traits::FromCascade>::from_cascade_key("transport.file")
100 }
101}
102
103struct WriteState {
105 file: tokio::fs::File,
106}
107
108struct ReadState {
110 reader: BufReader<tokio::fs::File>,
111 offset: u64,
112 line_buf: String,
113}
114
115pub struct FileTransport {
121 config: FileTransportConfig,
122 writer: Mutex<Option<WriteState>>,
123 reader: Mutex<Option<ReadState>>,
124 closed: Arc<AtomicBool>,
125 filter_engine: super::filter::TransportFilterEngine,
126}
127
128impl FileTransport {
129 pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
135 if config.path.is_empty() {
136 return Err(TransportError::Config("file path is empty".into()));
137 }
138
139 #[cfg(feature = "logger")]
140 tracing::info!(path = %config.path, append = config.append, "File transport opened");
141
142 let filter_engine = super::filter::TransportFilterEngine::new(
145 &config.filters_in,
146 &config.filters_out,
147 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
148 )?;
149
150 let closed = Arc::new(AtomicBool::new(false));
151
152 #[cfg(feature = "health")]
153 {
154 let h = Arc::clone(&closed);
155 crate::health::HealthRegistry::register("transport:file", move || {
156 if h.load(Ordering::Relaxed) {
157 crate::health::HealthStatus::Unhealthy
158 } else {
159 crate::health::HealthStatus::Healthy
160 }
161 });
162 }
163
164 Ok(Self {
165 config: config.clone(),
166 writer: Mutex::new(None),
167 reader: Mutex::new(None),
168 closed,
169 filter_engine,
170 })
171 }
172
173 fn pos_path(data_path: &Path) -> PathBuf {
175 let mut pos_path = data_path.as_os_str().to_owned();
176 pos_path.push(".pos");
177 PathBuf::from(pos_path)
178 }
179
180 async fn load_position(data_path: &Path) -> u64 {
182 let pos_path = Self::pos_path(data_path);
183 match tokio::fs::read_to_string(&pos_path).await {
184 Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
185 Err(_) => 0,
186 }
187 }
188
189 async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
191 let pos_path = Self::pos_path(data_path);
192 tokio::fs::write(&pos_path, offset.to_string())
193 .await
194 .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
195 }
196
197 async fn ensure_writer(&self) -> TransportResult<()> {
199 let mut guard = self.writer.lock().await;
200 if guard.is_none() {
201 let file = tokio::fs::OpenOptions::new()
202 .create(true)
203 .append(self.config.append)
204 .write(true)
205 .open(&self.config.path)
206 .await
207 .map_err(|e| {
208 TransportError::Connection(format!(
209 "failed to open '{}' for writing: {e}",
210 self.config.path
211 ))
212 })?;
213 *guard = Some(WriteState { file });
214 }
215 Ok(())
216 }
217
218 async fn ensure_reader(&self) -> TransportResult<()> {
220 let mut guard = self.reader.lock().await;
221 if guard.is_none() {
222 let path = Path::new(&self.config.path);
223
224 if !path.exists() {
226 return Err(TransportError::Recv(format!(
227 "file '{}' does not exist",
228 self.config.path
229 )));
230 }
231
232 let offset = Self::load_position(path).await;
233 let mut file = tokio::fs::File::open(&self.config.path)
234 .await
235 .map_err(|e| {
236 TransportError::Connection(format!(
237 "failed to open '{}' for reading: {e}",
238 self.config.path
239 ))
240 })?;
241
242 file.seek(std::io::SeekFrom::Start(offset))
244 .await
245 .map_err(|e| {
246 TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
247 })?;
248
249 *guard = Some(ReadState {
250 reader: BufReader::new(file),
251 offset,
252 line_buf: String::with_capacity(4096),
253 });
254 }
255 Ok(())
256 }
257}
258
259impl TransportBase for FileTransport {
260 async fn close(&self) -> TransportResult<()> {
261 self.closed.store(true, Ordering::Relaxed);
262
263 if let Some(mut state) = self.writer.lock().await.take() {
265 let _ = state.file.flush().await;
266 }
267
268 let _ = self.reader.lock().await.take();
270
271 Ok(())
272 }
273
274 fn is_healthy(&self) -> bool {
275 !self.closed.load(Ordering::Relaxed)
276 }
277
278 fn name(&self) -> &'static str {
279 "file"
280 }
281}
282
283impl TransportSender for FileTransport {
284 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
285 if self.closed.load(Ordering::Relaxed) {
286 return SendResult::Fatal(TransportError::Closed);
287 }
288
289 if self.filter_engine.has_outbound_filters() {
291 match self.filter_engine.apply_outbound(&payload) {
292 super::filter::FilterDisposition::Pass => {}
293 super::filter::FilterDisposition::Drop => return SendResult::Ok,
294 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
295 }
296 }
297
298 if let Err(e) = self.ensure_writer().await {
299 return SendResult::Fatal(e);
300 }
301
302 let mut guard = self.writer.lock().await;
303 let Some(state) = guard.as_mut() else {
304 return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
305 };
306
307 if let Err(e) = state.file.write_all(&payload).await {
309 #[cfg(feature = "logger")]
310 tracing::warn!(error = %e, "File transport: write error");
311 return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
312 }
313 if let Err(e) = state.file.write_all(b"\n").await {
314 #[cfg(feature = "logger")]
315 tracing::warn!(error = %e, "File transport: newline write error");
316 return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
317 }
318 if let Err(e) = state.file.flush().await {
319 #[cfg(feature = "logger")]
320 tracing::warn!(error = %e, "File transport: flush error");
321 return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
322 }
323
324 #[cfg(feature = "logger")]
325 tracing::debug!(bytes = payload.len(), "File transport: message sent");
326
327 #[cfg(feature = "metrics")]
328 metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
329
330 SendResult::Ok
331 }
332}
333
334impl TransportReceiver for FileTransport {
335 type Token = FileToken;
336
337 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
338 if self.closed.load(Ordering::Relaxed) {
339 return Err(TransportError::Closed);
340 }
341
342 self.ensure_reader().await?;
343
344 let mut guard = self.reader.lock().await;
345 let state = guard
346 .as_mut()
347 .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
348
349 let mut messages = Vec::with_capacity(max.min(100));
350
351 for _ in 0..max {
352 state.line_buf.clear();
353 let bytes_read = state
354 .reader
355 .read_line(&mut state.line_buf)
356 .await
357 .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
358
359 if bytes_read == 0 {
360 break;
362 }
363
364 state.offset += bytes_read as u64;
365
366 let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
368 if line.is_empty() {
369 continue;
370 }
371
372 let payload: bytes::Bytes = line.as_bytes().to_vec().into();
373 let format = PayloadFormat::detect(&payload);
374 let timestamp_ms = chrono::Utc::now().timestamp_millis();
375
376 messages.push(Message {
377 key: None,
378 payload,
379 token: FileToken {
380 offset: state.offset,
381 },
382 timestamp_ms: Some(timestamp_ms),
383 format,
384 });
385 }
386
387 let batch = self.filter_engine.partition_batch(
390 messages,
391 |m| m.payload.as_ref(),
392 |m| m.key.clone(),
393 |m| m.token,
394 );
395 let messages = batch.messages;
396 let dlq_entries = batch.dlq_entries;
397 let filtered_tokens = batch.filtered_tokens;
398
399 #[cfg(feature = "logger")]
400 if !messages.is_empty() {
401 tracing::debug!(lines = messages.len(), "File transport: batch received");
402 }
403
404 #[cfg(feature = "metrics")]
405 if !messages.is_empty() {
406 metrics::counter!("dfe_transport_received_total", "transport" => "file")
407 .increment(messages.len() as u64);
408 }
409
410 Ok(RecvBatch {
411 messages,
412 dlq_entries,
413 filtered_tokens,
414 }
415 .into())
416 }
417
418 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
419 if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
420 let path = Path::new(&self.config.path);
421 Self::save_position(path, max_token.offset).await?;
422
423 #[cfg(feature = "logger")]
424 tracing::debug!(
425 offset = max_token.offset,
426 "File transport: position committed"
427 );
428 }
429 Ok(())
430 }
431}
432
433impl super::traits::FromCascade for FileTransportConfig {}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use tempfile::TempDir;
439
440 async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
441 let path = dir.path().join(filename);
442 let config = FileTransportConfig {
443 path: path.to_str().unwrap().to_string(),
444 append: true,
445 ..Default::default()
446 };
447 FileTransport::new(&config).await.unwrap()
448 }
449
450 #[tokio::test]
451 async fn send_and_receive() {
452 let dir = TempDir::new().unwrap();
453 let path = dir.path().join("test.ndjson");
454 let path_str = path.to_str().unwrap().to_string();
455
456 let config = FileTransportConfig {
458 path: path_str.clone(),
459 append: true,
460 ..Default::default()
461 };
462 let sender = FileTransport::new(&config).await.unwrap();
463
464 let r1 = sender
465 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
466 .await;
467 assert!(r1.is_ok());
468 let r2 = sender
469 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"world\"}"))
470 .await;
471 assert!(r2.is_ok());
472 sender.close().await.unwrap();
473
474 let reader_config = FileTransportConfig {
476 path: path_str,
477 append: true,
478 ..Default::default()
479 };
480 let reader = FileTransport::new(&reader_config).await.unwrap();
481 let batch = reader.recv(10).await.unwrap();
482
483 assert_eq!(batch.records.len(), 2);
484 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
485 assert_eq!(batch.records[1].payload.as_ref(), b"{\"msg\":\"world\"}");
486
487 assert!(batch.commit_tokens[1].offset > batch.commit_tokens[0].offset);
490 }
491
492 #[tokio::test]
493 async fn commit_persists_position() {
494 let dir = TempDir::new().unwrap();
495 let path = dir.path().join("commit_test.ndjson");
496 let path_str = path.to_str().unwrap().to_string();
497
498 let config = FileTransportConfig {
500 path: path_str.clone(),
501 append: true,
502 ..Default::default()
503 };
504 let sender = FileTransport::new(&config).await.unwrap();
505 sender.send("k", bytes::Bytes::from_static(b"line1")).await;
506 sender.send("k", bytes::Bytes::from_static(b"line2")).await;
507 sender.send("k", bytes::Bytes::from_static(b"line3")).await;
508 sender.close().await.unwrap();
509
510 let r1 = FileTransport::new(&FileTransportConfig {
512 path: path_str.clone(),
513 append: true,
514 ..Default::default()
515 })
516 .await
517 .unwrap();
518 let batch = r1.recv(2).await.unwrap();
519 assert_eq!(batch.records.len(), 2);
520 assert_eq!(batch.records[0].payload.as_ref(), b"line1");
521 assert_eq!(batch.records[1].payload.as_ref(), b"line2");
522
523 r1.commit(&batch.commit_tokens).await.unwrap();
525 r1.close().await.unwrap();
526
527 let r2 = FileTransport::new(&FileTransportConfig {
529 path: path_str,
530 append: true,
531 ..Default::default()
532 })
533 .await
534 .unwrap();
535 let remaining = r2.recv(10).await.unwrap().records;
536 assert_eq!(remaining.len(), 1);
537 assert_eq!(remaining[0].payload.as_ref(), b"line3");
538 }
539
540 #[tokio::test]
541 async fn close_prevents_operations() {
542 let dir = TempDir::new().unwrap();
543 let transport = make_transport(&dir, "close_test.ndjson").await;
544
545 transport.close().await.unwrap();
546 assert!(!transport.is_healthy());
547
548 let result = transport
549 .send("k", bytes::Bytes::from_static(b"data"))
550 .await;
551 assert!(result.is_fatal());
552
553 let result = transport.recv(1).await;
554 assert!(result.is_err());
555 }
556
557 #[tokio::test]
558 async fn file_token_display() {
559 let token = FileToken { offset: 42 };
560 assert_eq!(format!("{token}"), "file:42");
561 }
562
563 #[tokio::test]
564 async fn recv_returns_empty_at_eof() {
565 let dir = TempDir::new().unwrap();
566 let path = dir.path().join("eof_test.ndjson");
567 let path_str = path.to_str().unwrap().to_string();
568
569 let config = FileTransportConfig {
571 path: path_str.clone(),
572 append: true,
573 ..Default::default()
574 };
575 let transport = FileTransport::new(&config).await.unwrap();
576 transport
577 .send("k", bytes::Bytes::from_static(b"only_line"))
578 .await;
579 transport.close().await.unwrap();
580
581 let reader = FileTransport::new(&FileTransportConfig {
583 path: path_str,
584 append: true,
585 ..Default::default()
586 })
587 .await
588 .unwrap();
589 let msgs = reader.recv(10).await.unwrap().records;
590 assert_eq!(msgs.len(), 1);
591
592 let more = reader.recv(10).await.unwrap().records;
593 assert!(more.is_empty());
594 }
595
596 #[tokio::test]
597 async fn empty_path_is_config_error() {
598 let result = FileTransport::new(&FileTransportConfig::default()).await;
599 assert!(result.is_err());
600 }
601
602 #[tokio::test]
603 async fn transport_name() {
604 let dir = TempDir::new().unwrap();
605 let transport = make_transport(&dir, "name_test.ndjson").await;
606 assert_eq!(transport.name(), "file");
607 }
608}