hyperi_rustlib/transport/
file.rs1use super::error::{TransportError, TransportResult};
34use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
35use super::types::{Message, PayloadFormat, SendResult};
36use serde::{Deserialize, Serialize};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
41use tokio::sync::Mutex;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct FileToken {
48 pub offset: u64,
50}
51
52impl CommitToken for FileToken {}
53
54impl std::fmt::Display for FileToken {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "file:{}", self.offset)
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FileTransportConfig {
63 pub path: String,
65
66 #[serde(default = "default_append")]
68 pub append: bool,
69
70 #[serde(default)]
72 pub filters_in: Vec<super::filter::FilterRule>,
73
74 #[serde(default)]
76 pub filters_out: Vec<super::filter::FilterRule>,
77}
78
79fn default_append() -> bool {
80 true
81}
82
83impl Default for FileTransportConfig {
84 fn default() -> Self {
85 Self {
86 path: String::new(),
87 append: true,
88 filters_in: Vec::new(),
89 filters_out: Vec::new(),
90 }
91 }
92}
93
94impl FileTransportConfig {
95 #[must_use]
97 pub fn from_cascade() -> Self {
98 <Self as super::traits::FromCascade>::from_cascade_key("transport.file")
99 }
100}
101
102struct WriteState {
104 file: tokio::fs::File,
105}
106
107struct ReadState {
109 reader: BufReader<tokio::fs::File>,
110 offset: u64,
111 line_buf: String,
112}
113
114pub struct FileTransport {
120 config: FileTransportConfig,
121 writer: Mutex<Option<WriteState>>,
122 reader: Mutex<Option<ReadState>>,
123 closed: Arc<AtomicBool>,
124 filter_engine: super::filter::TransportFilterEngine,
125}
126
127impl FileTransport {
128 pub async fn new(config: &FileTransportConfig) -> TransportResult<Self> {
134 if config.path.is_empty() {
135 return Err(TransportError::Config("file path is empty".into()));
136 }
137
138 #[cfg(feature = "logger")]
139 tracing::info!(path = %config.path, append = config.append, "File transport opened");
140
141 let filter_engine = super::filter::TransportFilterEngine::new(
144 &config.filters_in,
145 &config.filters_out,
146 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
147 )?;
148
149 let closed = Arc::new(AtomicBool::new(false));
150
151 #[cfg(feature = "health")]
152 {
153 let h = Arc::clone(&closed);
154 crate::health::HealthRegistry::register("transport:file", move || {
155 if h.load(Ordering::Relaxed) {
156 crate::health::HealthStatus::Unhealthy
157 } else {
158 crate::health::HealthStatus::Healthy
159 }
160 });
161 }
162
163 Ok(Self {
164 config: config.clone(),
165 writer: Mutex::new(None),
166 reader: Mutex::new(None),
167 closed,
168 filter_engine,
169 })
170 }
171
172 fn pos_path(data_path: &Path) -> PathBuf {
174 let mut pos_path = data_path.as_os_str().to_owned();
175 pos_path.push(".pos");
176 PathBuf::from(pos_path)
177 }
178
179 async fn load_position(data_path: &Path) -> u64 {
181 let pos_path = Self::pos_path(data_path);
182 match tokio::fs::read_to_string(&pos_path).await {
183 Ok(content) => content.trim().parse::<u64>().unwrap_or(0),
184 Err(_) => 0,
185 }
186 }
187
188 async fn save_position(data_path: &Path, offset: u64) -> TransportResult<()> {
190 let pos_path = Self::pos_path(data_path);
191 tokio::fs::write(&pos_path, offset.to_string())
192 .await
193 .map_err(|e| TransportError::Commit(format!("failed to write position file: {e}")))
194 }
195
196 async fn ensure_writer(&self) -> TransportResult<()> {
198 let mut guard = self.writer.lock().await;
199 if guard.is_none() {
200 let file = tokio::fs::OpenOptions::new()
201 .create(true)
202 .append(self.config.append)
203 .write(true)
204 .open(&self.config.path)
205 .await
206 .map_err(|e| {
207 TransportError::Connection(format!(
208 "failed to open '{}' for writing: {e}",
209 self.config.path
210 ))
211 })?;
212 *guard = Some(WriteState { file });
213 }
214 Ok(())
215 }
216
217 async fn ensure_reader(&self) -> TransportResult<()> {
219 let mut guard = self.reader.lock().await;
220 if guard.is_none() {
221 let path = Path::new(&self.config.path);
222
223 if !path.exists() {
225 return Err(TransportError::Recv(format!(
226 "file '{}' does not exist",
227 self.config.path
228 )));
229 }
230
231 let offset = Self::load_position(path).await;
232 let mut file = tokio::fs::File::open(&self.config.path)
233 .await
234 .map_err(|e| {
235 TransportError::Connection(format!(
236 "failed to open '{}' for reading: {e}",
237 self.config.path
238 ))
239 })?;
240
241 file.seek(std::io::SeekFrom::Start(offset))
243 .await
244 .map_err(|e| {
245 TransportError::Recv(format!("failed to seek to offset {offset}: {e}"))
246 })?;
247
248 *guard = Some(ReadState {
249 reader: BufReader::new(file),
250 offset,
251 line_buf: String::with_capacity(4096),
252 });
253 }
254 Ok(())
255 }
256}
257
258impl TransportBase for FileTransport {
259 async fn close(&self) -> TransportResult<()> {
260 self.closed.store(true, Ordering::Relaxed);
261
262 if let Some(mut state) = self.writer.lock().await.take() {
264 let _ = state.file.flush().await;
265 }
266
267 let _ = self.reader.lock().await.take();
269
270 Ok(())
271 }
272
273 fn is_healthy(&self) -> bool {
274 !self.closed.load(Ordering::Relaxed)
275 }
276
277 fn name(&self) -> &'static str {
278 "file"
279 }
280}
281
282impl TransportSender for FileTransport {
283 async fn send(&self, _key: &str, payload: bytes::Bytes) -> SendResult {
284 if self.closed.load(Ordering::Relaxed) {
285 return SendResult::Fatal(TransportError::Closed);
286 }
287
288 if self.filter_engine.has_outbound_filters() {
290 match self.filter_engine.apply_outbound(&payload) {
291 super::filter::FilterDisposition::Pass => {}
292 super::filter::FilterDisposition::Drop => return SendResult::Ok,
293 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
294 }
295 }
296
297 if let Err(e) = self.ensure_writer().await {
298 return SendResult::Fatal(e);
299 }
300
301 let mut guard = self.writer.lock().await;
302 let Some(state) = guard.as_mut() else {
303 return SendResult::Fatal(TransportError::Internal("writer not initialised".into()));
304 };
305
306 if let Err(e) = state.file.write_all(&payload).await {
308 #[cfg(feature = "logger")]
309 tracing::warn!(error = %e, "File transport: write error");
310 return SendResult::Fatal(TransportError::Send(format!("write failed: {e}")));
311 }
312 if let Err(e) = state.file.write_all(b"\n").await {
313 #[cfg(feature = "logger")]
314 tracing::warn!(error = %e, "File transport: newline write error");
315 return SendResult::Fatal(TransportError::Send(format!("write newline failed: {e}")));
316 }
317 if let Err(e) = state.file.flush().await {
318 #[cfg(feature = "logger")]
319 tracing::warn!(error = %e, "File transport: flush error");
320 return SendResult::Fatal(TransportError::Send(format!("flush failed: {e}")));
321 }
322
323 #[cfg(feature = "logger")]
324 tracing::debug!(bytes = payload.len(), "File transport: message sent");
325
326 #[cfg(feature = "metrics")]
327 metrics::counter!("dfe_transport_sent_total", "transport" => "file").increment(1);
328
329 SendResult::Ok
330 }
331}
332
333impl TransportReceiver for FileTransport {
334 type Token = FileToken;
335
336 async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
337 if self.closed.load(Ordering::Relaxed) {
338 return Err(TransportError::Closed);
339 }
340
341 self.ensure_reader().await?;
342
343 let mut guard = self.reader.lock().await;
344 let state = guard
345 .as_mut()
346 .ok_or_else(|| TransportError::Internal("reader not initialised".into()))?;
347
348 let mut messages = Vec::with_capacity(max.min(100));
349
350 for _ in 0..max {
351 state.line_buf.clear();
352 let bytes_read = state
353 .reader
354 .read_line(&mut state.line_buf)
355 .await
356 .map_err(|e| TransportError::Recv(format!("read failed: {e}")))?;
357
358 if bytes_read == 0 {
359 break;
361 }
362
363 state.offset += bytes_read as u64;
364
365 let line = state.line_buf.trim_end_matches('\n').trim_end_matches('\r');
367 if line.is_empty() {
368 continue;
369 }
370
371 let payload = line.as_bytes().to_vec();
372 let format = PayloadFormat::detect(&payload);
373 let timestamp_ms = chrono::Utc::now().timestamp_millis();
374
375 messages.push(Message {
376 key: None,
377 payload,
378 token: FileToken {
379 offset: state.offset,
380 },
381 timestamp_ms: Some(timestamp_ms),
382 format,
383 });
384 }
385
386 let batch = self.filter_engine.partition_batch(
389 messages,
390 |m| m.payload.as_slice(),
391 |m| m.key.clone(),
392 );
393 let messages = batch.messages;
394 let dlq_entries = batch.dlq_entries;
395
396 #[cfg(feature = "logger")]
397 if !messages.is_empty() {
398 tracing::debug!(lines = messages.len(), "File transport: batch received");
399 }
400
401 #[cfg(feature = "metrics")]
402 if !messages.is_empty() {
403 metrics::counter!("dfe_transport_received_total", "transport" => "file")
404 .increment(messages.len() as u64);
405 }
406
407 Ok(RecvBatch {
408 messages,
409 dlq_entries,
410 })
411 }
412
413 async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
414 if let Some(max_token) = tokens.iter().max_by_key(|t| t.offset) {
415 let path = Path::new(&self.config.path);
416 Self::save_position(path, max_token.offset).await?;
417
418 #[cfg(feature = "logger")]
419 tracing::debug!(
420 offset = max_token.offset,
421 "File transport: position committed"
422 );
423 }
424 Ok(())
425 }
426}
427
428impl super::traits::FromCascade for FileTransportConfig {}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use tempfile::TempDir;
434
435 async fn make_transport(dir: &TempDir, filename: &str) -> FileTransport {
436 let path = dir.path().join(filename);
437 let config = FileTransportConfig {
438 path: path.to_str().unwrap().to_string(),
439 append: true,
440 ..Default::default()
441 };
442 FileTransport::new(&config).await.unwrap()
443 }
444
445 #[tokio::test]
446 async fn send_and_receive() {
447 let dir = TempDir::new().unwrap();
448 let path = dir.path().join("test.ndjson");
449 let path_str = path.to_str().unwrap().to_string();
450
451 let config = FileTransportConfig {
453 path: path_str.clone(),
454 append: true,
455 ..Default::default()
456 };
457 let sender = FileTransport::new(&config).await.unwrap();
458
459 let r1 = sender
460 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
461 .await;
462 assert!(r1.is_ok());
463 let r2 = sender
464 .send("key", bytes::Bytes::from_static(b"{\"msg\":\"world\"}"))
465 .await;
466 assert!(r2.is_ok());
467 sender.close().await.unwrap();
468
469 let reader_config = FileTransportConfig {
471 path: path_str,
472 append: true,
473 ..Default::default()
474 };
475 let reader = FileTransport::new(&reader_config).await.unwrap();
476 let messages = reader.recv(10).await.unwrap().messages;
477
478 assert_eq!(messages.len(), 2);
479 assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
480 assert_eq!(messages[1].payload, b"{\"msg\":\"world\"}");
481
482 assert!(messages[1].token.offset > messages[0].token.offset);
484 }
485
486 #[tokio::test]
487 async fn commit_persists_position() {
488 let dir = TempDir::new().unwrap();
489 let path = dir.path().join("commit_test.ndjson");
490 let path_str = path.to_str().unwrap().to_string();
491
492 let config = FileTransportConfig {
494 path: path_str.clone(),
495 append: true,
496 ..Default::default()
497 };
498 let sender = FileTransport::new(&config).await.unwrap();
499 sender.send("k", bytes::Bytes::from_static(b"line1")).await;
500 sender.send("k", bytes::Bytes::from_static(b"line2")).await;
501 sender.send("k", bytes::Bytes::from_static(b"line3")).await;
502 sender.close().await.unwrap();
503
504 let r1 = FileTransport::new(&FileTransportConfig {
506 path: path_str.clone(),
507 append: true,
508 ..Default::default()
509 })
510 .await
511 .unwrap();
512 let msgs = r1.recv(2).await.unwrap().messages;
513 assert_eq!(msgs.len(), 2);
514 assert_eq!(msgs[0].payload, b"line1");
515 assert_eq!(msgs[1].payload, b"line2");
516
517 let tokens: Vec<_> = msgs.iter().map(|m| m.token).collect();
519 r1.commit(&tokens).await.unwrap();
520 r1.close().await.unwrap();
521
522 let r2 = FileTransport::new(&FileTransportConfig {
524 path: path_str,
525 append: true,
526 ..Default::default()
527 })
528 .await
529 .unwrap();
530 let remaining = r2.recv(10).await.unwrap().messages;
531 assert_eq!(remaining.len(), 1);
532 assert_eq!(remaining[0].payload, b"line3");
533 }
534
535 #[tokio::test]
536 async fn close_prevents_operations() {
537 let dir = TempDir::new().unwrap();
538 let transport = make_transport(&dir, "close_test.ndjson").await;
539
540 transport.close().await.unwrap();
541 assert!(!transport.is_healthy());
542
543 let result = transport
544 .send("k", bytes::Bytes::from_static(b"data"))
545 .await;
546 assert!(result.is_fatal());
547
548 let result = transport.recv(1).await;
549 assert!(result.is_err());
550 }
551
552 #[tokio::test]
553 async fn file_token_display() {
554 let token = FileToken { offset: 42 };
555 assert_eq!(format!("{token}"), "file:42");
556 }
557
558 #[tokio::test]
559 async fn recv_returns_empty_at_eof() {
560 let dir = TempDir::new().unwrap();
561 let path = dir.path().join("eof_test.ndjson");
562 let path_str = path.to_str().unwrap().to_string();
563
564 let config = FileTransportConfig {
566 path: path_str.clone(),
567 append: true,
568 ..Default::default()
569 };
570 let transport = FileTransport::new(&config).await.unwrap();
571 transport
572 .send("k", bytes::Bytes::from_static(b"only_line"))
573 .await;
574 transport.close().await.unwrap();
575
576 let reader = FileTransport::new(&FileTransportConfig {
578 path: path_str,
579 append: true,
580 ..Default::default()
581 })
582 .await
583 .unwrap();
584 let msgs = reader.recv(10).await.unwrap().messages;
585 assert_eq!(msgs.len(), 1);
586
587 let more = reader.recv(10).await.unwrap().messages;
588 assert!(more.is_empty());
589 }
590
591 #[tokio::test]
592 async fn empty_path_is_config_error() {
593 let result = FileTransport::new(&FileTransportConfig::default()).await;
594 assert!(result.is_err());
595 }
596
597 #[tokio::test]
598 async fn transport_name() {
599 let dir = TempDir::new().unwrap();
600 let transport = make_transport(&dir, "name_test.ndjson").await;
601 assert_eq!(transport.name(), "file");
602 }
603}