1use async_trait::async_trait;
126use memmap2::{Mmap, MmapOptions};
127use std::fs::File;
128use std::io::SeekFrom;
129use std::path::Path;
130
131use tokio::fs;
132use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
133
134use parking_lot::RwLock;
135
136use adaptive_pipeline_domain::services::file_io_service::{
137 FileIOConfig, FileIOService, FileIOStats, FileInfo, ReadOptions, ReadResult, WriteOptions, WriteResult,
138};
139use adaptive_pipeline_domain::{FileChunk, PipelineError};
140
141pub struct TokioFileIO {
157 config: RwLock<FileIOConfig>,
158 stats: RwLock<FileIOStats>,
159}
160
161impl TokioFileIO {
162 pub fn new(config: FileIOConfig) -> Self {
164 Self {
165 config: RwLock::new(config),
166 stats: RwLock::new(FileIOStats::default()),
167 }
168 }
169
170 pub fn new_default() -> Self {
172 Self::new(FileIOConfig::default())
173 }
174
175 fn should_use_mmap(&self, file_size: u64) -> bool {
177 let config = self.config.read();
178 config.enable_memory_mapping && file_size <= config.max_mmap_size
179 }
180
181 fn create_chunks_from_mmap(
183 &self,
184 mmap: &Mmap,
185 chunk_size: usize,
186 calculate_checksums: bool,
187 start_offset: u64,
188 max_bytes: Option<u64>,
189 ) -> Result<Vec<FileChunk>, PipelineError> {
190 let mut chunks = Vec::new();
191 let data_len = mmap.len() as u64;
192 let start = start_offset.min(data_len);
193 let end = match max_bytes {
194 Some(max) => (start + max).min(data_len),
195 None => data_len,
196 };
197
198 let mut current_offset = start;
199 let mut sequence = 0u64;
200
201 while current_offset < end {
202 let chunk_end = (current_offset + (chunk_size as u64)).min(end) as usize;
203 let chunk_start = current_offset as usize;
204 let chunk_data = mmap[chunk_start..chunk_end].to_vec();
205 let is_final = (chunk_end as u64) >= end;
206
207 let chunk = FileChunk::new(sequence, current_offset, chunk_data, is_final)?;
208
209 let chunk = if calculate_checksums {
210 chunk.with_calculated_checksum()?
211 } else {
212 chunk
213 };
214
215 chunks.push(chunk);
216 current_offset = chunk_end as u64;
217 sequence += 1;
218 }
219
220 Ok(chunks)
221 }
222
223 fn update_stats<F>(&self, update_fn: F)
225 where
226 F: FnOnce(&mut FileIOStats),
227 {
228 let mut stats = self.stats.write();
229 update_fn(&mut stats);
230 }
231
232 async fn get_file_metadata(&self, path: &Path) -> Result<std::fs::Metadata, PipelineError> {
234 fs::metadata(path)
235 .await
236 .map_err(|e| PipelineError::IoError(format!("Failed to get file metadata for {}: {}", path.display(), e)))
237 }
238}
239
240#[async_trait]
241impl FileIOService for TokioFileIO {
242 async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
243 let start_time = std::time::Instant::now();
244 let metadata = self.get_file_metadata(path).await?;
245 let file_size = metadata.len();
246
247 if options.use_memory_mapping && self.should_use_mmap(file_size) {
249 return self.read_file_mmap(path, options).await;
250 }
251
252 let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
254 let mut file = fs::File::open(path)
255 .await
256 .map_err(|e| PipelineError::IoError(format!("Failed to open file {}: {}", path.display(), e)))?;
257
258 if let Some(offset) = options.start_offset {
259 file.seek(SeekFrom::Start(offset))
260 .await
261 .map_err(|e| PipelineError::IoError(format!("Failed to seek to offset {}: {}", offset, e)))?;
262 }
263
264 let mut chunks = Vec::new();
265 let mut buffer = vec![0u8; chunk_size];
266 let mut current_offset = options.start_offset.unwrap_or(0);
267 let mut sequence = 0u64;
268 let mut total_read = 0u64;
269
270 let max_bytes = options.max_bytes.unwrap_or(file_size);
271
272 loop {
273 if total_read >= max_bytes {
274 break;
275 }
276
277 let bytes_to_read = ((max_bytes - total_read) as usize).min(chunk_size);
278 let bytes_read = file
279 .read(&mut buffer[..bytes_to_read])
280 .await
281 .map_err(|e| PipelineError::IoError(format!("Failed to read from file: {}", e)))?;
282
283 if bytes_read == 0 {
284 break;
285 }
286
287 let chunk_data = buffer[..bytes_read].to_vec();
288 let is_final = bytes_read < bytes_to_read || total_read + (bytes_read as u64) >= max_bytes;
289
290 let chunk = FileChunk::new(sequence, current_offset, chunk_data, is_final)?;
291
292 let chunk = if options.calculate_checksums {
293 chunk.with_calculated_checksum()?
294 } else {
295 chunk
296 };
297
298 chunks.push(chunk);
299 current_offset += bytes_read as u64;
300 total_read += bytes_read as u64;
301 sequence += 1;
302 }
303
304 let file_info = FileInfo {
305 path: path.to_path_buf(),
306 size: file_size,
307 is_memory_mapped: false,
308 modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
309 created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
310 permissions: 0o644, mime_type: None,
312 };
313
314 self.update_stats(|stats| {
315 stats.bytes_read += total_read;
316 stats.chunks_processed += chunks.len() as u64;
317 stats.files_processed += 1;
318 stats.total_processing_time_ms += start_time.elapsed().as_millis() as u64;
319 });
320
321 Ok(ReadResult {
322 chunks,
323 file_info,
324 bytes_read: total_read,
325 complete: total_read >= file_size,
326 })
327 }
328
329 async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
330 let start_time = std::time::Instant::now();
331 let metadata = self.get_file_metadata(path).await?;
332 let file_size = metadata.len();
333
334 let file = File::open(path)
335 .map_err(|e| PipelineError::IoError(format!("Failed to open file for mmap {}: {}", path.display(), e)))?;
336
337 let mmap = unsafe {
338 MmapOptions::new()
339 .map(&file)
340 .map_err(|e| PipelineError::IoError(format!("Failed to create memory map: {}", e)))?
341 };
342
343 let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
344 let start_offset = options.start_offset.unwrap_or(0);
345
346 let chunks = self.create_chunks_from_mmap(
347 &mmap,
348 chunk_size,
349 options.calculate_checksums,
350 start_offset,
351 options.max_bytes,
352 )?;
353
354 let bytes_read = chunks.iter().map(|c| c.data_len() as u64).sum();
355
356 let file_info = FileInfo {
357 path: path.to_path_buf(),
358 size: file_size,
359 is_memory_mapped: true,
360 modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
361 created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
362 permissions: 0o644,
363 mime_type: None,
364 };
365
366 self.update_stats(|stats| {
367 stats.bytes_read += bytes_read;
368 stats.chunks_processed += chunks.len() as u64;
369 stats.files_processed += 1;
370 stats.memory_mapped_files += 1;
371 stats.total_processing_time_ms += start_time.elapsed().as_millis() as u64;
372 });
373
374 Ok(ReadResult {
375 chunks,
376 file_info,
377 bytes_read,
378 complete: true,
379 })
380 }
381
382 async fn write_file_chunks(
383 &self,
384 path: &Path,
385 chunks: &[FileChunk],
386 options: WriteOptions,
387 ) -> Result<WriteResult, PipelineError> {
388 if options.create_dirs {
389 if let Some(parent) = path.parent() {
390 fs::create_dir_all(parent)
391 .await
392 .map_err(|e| PipelineError::IoError(format!("Failed to create directories: {}", e)))?;
393 }
394 }
395
396 let mut file = (if options.append {
397 fs::OpenOptions::new().create(true).append(true).open(path).await
398 } else {
399 fs::File::create(path).await
400 })
401 .map_err(|e| PipelineError::IoError(format!("Failed to create/open file {}: {}", path.display(), e)))?;
402
403 let mut total_written = 0u64;
404 let mut file_hasher = ring::digest::Context::new(&ring::digest::SHA256);
405
406 for chunk in chunks {
407 let data = chunk.data();
408 file.write_all(data)
409 .await
410 .map_err(|e| PipelineError::IoError(format!("Failed to write chunk: {}", e)))?;
411
412 if options.calculate_checksums {
413 file_hasher.update(data);
414 }
415
416 total_written += data.len() as u64;
417 }
418
419 if options.sync {
420 file.sync_all()
421 .await
422 .map_err(|e| PipelineError::IoError(format!("Failed to sync file: {}", e)))?;
423 }
424
425 let checksum = if options.calculate_checksums {
426 Some(hex::encode(file_hasher.finish().as_ref()))
427 } else {
428 None
429 };
430
431 self.update_stats(|stats| {
432 stats.bytes_written += total_written;
433 stats.chunks_processed += chunks.len() as u64;
434 });
435
436 Ok(WriteResult {
437 path: path.to_path_buf(),
438 bytes_written: total_written,
439 checksum,
440 success: true,
441 })
442 }
443
444 async fn write_file_data(
445 &self,
446 path: &Path,
447 data: &[u8],
448 options: WriteOptions,
449 ) -> Result<WriteResult, PipelineError> {
450 let chunk = FileChunk::new(0, 0, data.to_vec(), true)?;
452 self.write_file_chunks(path, &[chunk], options).await
453 }
454
455 async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError> {
456 let metadata = self.get_file_metadata(path).await?;
457
458 Ok(FileInfo {
459 path: path.to_path_buf(),
460 size: metadata.len(),
461 is_memory_mapped: false,
462 modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
463 created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
464 permissions: 0o644,
465 mime_type: None,
466 })
467 }
468
469 async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError> {
470 Ok(fs::metadata(path).await.is_ok())
471 }
472
473 async fn delete_file(&self, path: &Path) -> Result<(), PipelineError> {
474 fs::remove_file(path)
475 .await
476 .map_err(|e| PipelineError::IoError(format!("Failed to delete file {}: {}", path.display(), e)))
477 }
478
479 async fn copy_file(
480 &self,
481 source: &Path,
482 destination: &Path,
483 options: WriteOptions,
484 ) -> Result<WriteResult, PipelineError> {
485 let read_result = self.read_file_chunks(source, ReadOptions::default()).await?;
486 self.write_file_chunks(destination, &read_result.chunks, options).await
487 }
488
489 async fn move_file(
490 &self,
491 source: &Path,
492 destination: &Path,
493 options: WriteOptions,
494 ) -> Result<WriteResult, PipelineError> {
495 let result = self.copy_file(source, destination, options).await?;
496 self.delete_file(source).await?;
497 Ok(result)
498 }
499
500 async fn create_directory(&self, path: &Path) -> Result<(), PipelineError> {
501 fs::create_dir_all(path)
502 .await
503 .map_err(|e| PipelineError::IoError(format!("Failed to create directory {}: {}", path.display(), e)))
504 }
505
506 async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError> {
507 match fs::metadata(path).await {
508 Ok(metadata) => Ok(metadata.is_dir()),
509 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
510 Err(e) => Err(PipelineError::IoError(format!(
511 "Failed to check directory {}: {}",
512 path.display(),
513 e
514 ))),
515 }
516 }
517
518 async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError> {
519 let mut entries = fs::read_dir(path)
520 .await
521 .map_err(|e| PipelineError::IoError(format!("Failed to read directory {}: {}", path.display(), e)))?;
522
523 let mut files = Vec::new();
524 while let Some(entry) = entries
525 .next_entry()
526 .await
527 .map_err(|e| PipelineError::IoError(format!("Failed to read directory entry: {}", e)))?
528 {
529 let metadata = entry
530 .metadata()
531 .await
532 .map_err(|e| PipelineError::IoError(format!("Failed to get entry metadata: {}", e)))?;
533
534 if metadata.is_file() {
535 files.push(FileInfo {
536 path: entry.path(),
537 size: metadata.len(),
538 is_memory_mapped: false,
539 modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
540 created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
541 permissions: 0o644,
542 mime_type: None,
543 });
544 }
545 }
546
547 Ok(files)
548 }
549
550 fn get_config(&self) -> FileIOConfig {
551 self.config.read().clone()
552 }
553
554 fn update_config(&mut self, config: FileIOConfig) {
555 *self.config.write() = config;
556 }
557
558 fn get_stats(&self) -> FileIOStats {
559 self.stats.read().clone()
560 }
561
562 fn reset_stats(&mut self) {
563 *self.stats.write() = FileIOStats::default();
564 }
565
566 async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError> {
567 let calculated_checksum = self.calculate_file_checksum(path).await?;
568 Ok(calculated_checksum == expected_checksum)
569 }
570
571 async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError> {
572 let read_result = self
573 .read_file_chunks(
574 path,
575 ReadOptions {
576 calculate_checksums: false,
577 ..Default::default()
578 },
579 )
580 .await?;
581
582 let mut hasher = ring::digest::Context::new(&ring::digest::SHA256);
583 for chunk in &read_result.chunks {
584 hasher.update(chunk.data());
585 }
586
587 Ok(hex::encode(hasher.finish().as_ref()))
588 }
589
590 async fn stream_file_chunks(
591 &self,
592 path: &Path,
593 options: ReadOptions,
594 ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>
595 {
596 let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
597 let file = fs::File::open(path)
598 .await
599 .map_err(|e| PipelineError::IoError(format!("Failed to open file {}: {}", path.display(), e)))?;
600
601 let file = if let Some(offset) = options.start_offset {
602 let mut f = file;
603 f.seek(std::io::SeekFrom::Start(offset))
604 .await
605 .map_err(|e| PipelineError::IoError(format!("Failed to seek to offset {}: {}", offset, e)))?;
606 f
607 } else {
608 file
609 };
610
611 struct StreamState {
613 file: fs::File,
614 buffer: Vec<u8>,
615 current_offset: u64,
616 sequence: u64,
617 total_read: u64,
618 max_bytes: u64,
619 calculate_checksums: bool,
620 }
621
622 let state = StreamState {
623 file,
624 buffer: vec![0u8; chunk_size],
625 current_offset: options.start_offset.unwrap_or(0),
626 sequence: 0,
627 total_read: 0,
628 max_bytes: options.max_bytes.unwrap_or(u64::MAX),
629 calculate_checksums: options.calculate_checksums,
630 };
631
632 let stream = futures::stream::unfold(state, |mut state| async move {
633 if state.total_read >= state.max_bytes {
634 return None;
635 }
636
637 let bytes_to_read = std::cmp::min(state.buffer.len(), (state.max_bytes - state.total_read) as usize);
638 state.buffer.resize(bytes_to_read, 0);
639
640 match state.file.read(&mut state.buffer[..bytes_to_read]).await {
641 Ok(0) => None, Ok(bytes_read) => {
643 state.buffer.truncate(bytes_read);
644 let is_final =
645 bytes_read < bytes_to_read || state.total_read + (bytes_read as u64) >= state.max_bytes;
646
647 match FileChunk::new(state.sequence, state.current_offset, state.buffer.clone(), is_final) {
648 Ok(chunk) => {
649 let chunk = if state.calculate_checksums {
650 match chunk.with_calculated_checksum() {
651 Ok(c) => c,
652 Err(e) => {
653 return Some((Err(e), state));
654 }
655 }
656 } else {
657 chunk
658 };
659
660 state.current_offset += bytes_read as u64;
661 state.sequence += 1;
662 state.total_read += bytes_read as u64;
663
664 Some((Ok(chunk), state))
665 }
666 Err(e) => Some((Err(e), state)),
667 }
668 }
669 Err(e) => Some((
670 Err(PipelineError::IoError(format!("Failed to read chunk: {}", e))),
671 state,
672 )),
673 }
674 });
675
676 Ok(Box::pin(stream))
677 }
678
679 async fn write_chunk_to_file(
680 &self,
681 path: &Path,
682 chunk: &FileChunk,
683 options: WriteOptions,
684 is_first_chunk: bool,
685 ) -> Result<WriteResult, PipelineError> {
686 let start_time = std::time::Instant::now();
687
688 if options.create_dirs {
690 if let Some(parent) = path.parent() {
691 fs::create_dir_all(parent).await.map_err(|e| {
692 PipelineError::IoError(format!("Failed to create directories for {}: {}", path.display(), e))
693 })?;
694 }
695 }
696
697 let file = (if is_first_chunk {
700 fs::OpenOptions::new()
701 .create(true)
702 .write(true)
703 .truncate(true)
704 .open(path)
705 .await
706 } else {
707 fs::OpenOptions::new()
708 .create(true)
709 .write(true)
710 .append(true)
711 .open(path)
712 .await
713 })
714 .map_err(|e| PipelineError::IoError(format!("Failed to open file {} for writing: {}", path.display(), e)))?;
715
716 let mut file = file;
717 file.write_all(chunk.data())
718 .await
719 .map_err(|e| PipelineError::IoError(format!("Failed to write chunk to {}: {}", path.display(), e)))?;
720
721 if options.sync {
722 file.sync_all()
723 .await
724 .map_err(|e| PipelineError::IoError(format!("Failed to sync file {}: {}", path.display(), e)))?;
725 }
726
727 let bytes_written = chunk.data().len() as u64;
728 let write_time = start_time.elapsed();
729
730 self.update_stats(|stats| {
732 stats.bytes_written += bytes_written;
733 stats.chunks_processed += 1;
734 stats.total_processing_time_ms += write_time.as_millis() as u64;
735 });
736
737 Ok(WriteResult {
738 path: path.to_path_buf(),
739 bytes_written,
740 success: true,
741 checksum: if options.calculate_checksums {
742 chunk.checksum().map(|c| c.to_string())
743 } else {
744 None
745 },
746 })
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use tempfile::NamedTempFile;
754 use tokio::io::AsyncWriteExt;
755
756 #[tokio::test]
757 async fn test_file_io_basic_operations() {
758 let service = TokioFileIO::new_default();
759
760 let temp_file = NamedTempFile::new().unwrap();
762 let temp_path = temp_file.path().to_path_buf();
763 let test_data = vec![b'A'; 2 * 1024 * 1024]; let mut file = tokio::fs::File::create(&temp_path).await.unwrap();
768 file.write_all(&test_data).await.unwrap();
769 file.flush().await.unwrap();
770 drop(file);
771
772 let read_result = service
774 .read_file_chunks(&temp_path, ReadOptions::default())
775 .await
776 .unwrap();
777
778 assert!(!read_result.chunks.is_empty());
779 assert_eq!(read_result.bytes_read, test_data.len() as u64);
780
781 let copy_path = temp_path.with_extension("copy");
783 let write_result = service
784 .write_file_data(©_path, &test_data, WriteOptions::default())
785 .await
786 .unwrap();
787
788 assert_eq!(write_result.bytes_written, test_data.len() as u64);
789 assert!(write_result.success);
790 }
791
792 #[tokio::test]
793 async fn test_memory_mapping() {
794 let service = TokioFileIO::new_default();
795
796 let temp_file = NamedTempFile::new().unwrap();
798 let temp_path = temp_file.path().to_path_buf();
799 let test_data = vec![0u8; 3 * 1024 * 1024]; let mut file = tokio::fs::File::create(&temp_path).await.unwrap();
805 file.write_all(&test_data).await.unwrap();
806 file.flush().await.unwrap();
807 drop(file);
808
809 let read_result = service
811 .read_file_mmap(&temp_path, ReadOptions::default())
812 .await
813 .unwrap();
814
815 assert!(!read_result.chunks.is_empty());
816 assert!(read_result.file_info.is_memory_mapped);
817 assert_eq!(read_result.bytes_read, test_data.len() as u64);
818 }
819}