1use crate::config::{CleanupStrategy, FileConfig};
7use crate::handle::{FileHandle, FileIndexEntry, FileMetadata};
8use crate::hooks::HookRegistry;
9use crate::index::SqliteIndex;
10use crate::storage::{compute_hash, FileStorage, StorageStats};
11use crate::{FileError, Result};
12use std::path::PathBuf;
13use std::sync::Arc;
14use tracing::{debug, info, warn};
15
16pub struct FileManager {
18 config: FileConfig,
19 storage: FileStorage,
20 index: Arc<SqliteIndex>,
21 hooks: HookRegistry,
23}
24
25impl FileManager {
26 pub async fn new(config: FileConfig) -> Result<Self> {
30 Self::new_with_hooks(config, HookRegistry::new()).await
31 }
32
33 pub async fn new_with_hooks(config: FileConfig, hooks: HookRegistry) -> Result<Self> {
49 tokio::fs::create_dir_all(&config.storage_path).await?;
51
52 let storage = FileStorage::new(config.clone());
54 storage.initialize().await?;
55
56 let index_path = config.index_path().with_extension("db");
58 let index = SqliteIndex::new(index_path).await?;
59
60 let jsonl_path = config.index_path();
62 if jsonl_path.exists() {
63 let migrated = index.migrate_from_jsonl(&jsonl_path).await?;
64 if migrated > 0 {
65 let backup_path = jsonl_path.with_extension("jsonl.bak");
67 tokio::fs::rename(&jsonl_path, &backup_path).await?;
68 info!("Migrated {} entries and backed up old index", migrated);
69 }
70 }
71
72 info!(
73 "FileManager initialized with {} storage hooks, {} read hooks",
74 hooks.hook_counts().storage,
75 hooks.hook_counts().read
76 );
77
78 Ok(Self {
79 config,
80 storage,
81 index: Arc::new(index),
82 hooks,
83 })
84 }
85
86 pub async fn default() -> Result<Self> {
88 Self::new(FileConfig::default()).await
89 }
90
91 pub async fn store(&self, data: &[u8], metadata: FileMetadata) -> Result<FileHandle> {
101 let data_size = data.len() as u64;
103 if data_size > self.config.max_file_size {
104 return Err(FileError::TooLarge(data_size, self.config.max_file_size));
105 }
106
107 let (processed_data, should_continue) =
112 self.hooks.run_before_store(data, &metadata).await?;
113
114 if !should_continue {
115 return Err(FileError::Storage("Storage stopped by hook".to_string()));
117 }
118
119 let final_data = if processed_data != data {
121 debug!(
123 "Storage hook modified data: {} -> {} bytes",
124 data.len(),
125 processed_data.len()
126 );
127 processed_data
128 } else {
129 data.to_vec()
130 };
131
132 let hash = compute_hash(&final_data);
134 let hash_id = format!("sha256:{}", hash);
135
136 self.hooks.run_validate_metadata(&metadata).await?;
140
141 if let Some(entry) = self.index.get(&hash_id, false).await? {
143 let handle = entry.to_handle();
145 let new_count = entry.ref_count + 1;
146
147 self.index.update_ref_count(&hash_id, 1).await?;
149 debug!(
150 "File with hash {} already exists, incrementing ref count to {}",
151 hash_id, new_count
152 );
153
154 return Ok(handle);
155 }
156
157 let relative_path = self.storage.store_data(&hash, &final_data).await?;
161
162 let entry = FileIndexEntry {
166 id: hash_id.clone(),
167 path: relative_path.clone(),
168 size: final_data.len() as u64,
169 ref_count: 1,
170 created_at: metadata.created_at,
171 last_accessed_at: Some(chrono::Utc::now()),
172 metadata: metadata.clone(),
173 };
174
175 self.index.insert(entry).await?;
176
177 let handle = FileHandle::new(hash_id.clone(), relative_path.clone(), metadata.clone());
179
180 let _ = self.hooks.run_after_store(&handle).await;
186
187 info!(
188 "Stored new file {} ({}, {} bytes)",
189 metadata.name,
190 hash_id,
191 final_data.len()
192 );
193
194 Ok(handle)
195 }
196
197 pub async fn store_from_path(
199 &self,
200 source_path: &PathBuf,
201 metadata: Option<FileMetadata>,
202 ) -> Result<FileHandle> {
203 let data = tokio::fs::read(source_path).await?;
204
205 let meta = metadata.unwrap_or_else(|| FileMetadata {
206 name: source_path
207 .file_name()
208 .and_then(|n| n.to_str())
209 .unwrap_or("unknown")
210 .to_string(),
211 size: data.len() as u64,
212 mime_type: None,
213 source: None,
214 created_at: chrono::Utc::now(),
215 last_accessed_at: None,
216 preview: None,
217 });
218
219 self.store(&data, meta).await
220 }
221
222 pub async fn get(&self, id: &str) -> Result<FileHandle> {
227 if let Some(entry) = self.index.get(id, false).await? {
229 let handle = entry.to_handle();
231
232 self.index.update_ref_count(id, 0).await?; return Ok(handle);
236 }
237
238 Err(FileError::NotFound(format!(
239 "File with ID {} not found",
240 id
241 )))
242 }
243
244 pub async fn clone_ref(&self, handle: &FileHandle) -> Result<FileHandle> {
246 if let Some(new_count) = self.index.update_ref_count(&handle.id, 1).await? {
247 let cloned = handle.clone();
248
249 debug!(
250 "Cloned file {} reference, new count: {}",
251 handle.id, new_count
252 );
253 Ok(cloned)
254 } else {
255 Err(FileError::InvalidHandle(format!(
256 "Handle {} not found in index",
257 handle.id
258 )))
259 }
260 }
261
262 pub async fn release(&self, handle: &FileHandle) -> Result<()> {
267 if let Some(new_count) = self.index.update_ref_count(&handle.id, -1).await? {
268 debug!(
269 "Released file {} reference, new count: {}",
270 handle.id, new_count
271 );
272
273 if self.config.cleanup.strategy == CleanupStrategy::Immediate && new_count == 0 {
275 self.cleanup_single(&handle.id).await?;
276 }
277
278 Ok(())
279 } else {
280 Err(FileError::InvalidHandle(format!(
281 "Handle {} not found in index",
282 handle.id
283 )))
284 }
285 }
286
287 pub async fn soft_delete(&self, id: &str, deleted_by: Option<&str>) -> Result<bool> {
313 let deleted = self.index.soft_delete(id, deleted_by).await?;
314
315 if deleted {
316 info!("Soft deleted file {} (by {:?})", id, deleted_by);
317 } else {
318 debug!("File {} not found or already deleted", id);
319 }
320
321 Ok(deleted)
322 }
323
324 pub async fn restore(&self, id: &str) -> Result<bool> {
336 let restored = self.index.restore(id).await?;
337
338 if restored {
339 info!("Restored file {}", id);
340 } else {
341 debug!("File {} not found or not in deleted state", id);
342 }
343
344 Ok(restored)
345 }
346
347 pub async fn list_deleted(&self) -> Result<Vec<FileIndexEntry>> {
355 let entries = self.index.list_deleted().await?;
356 debug!("Found {} soft-deleted files", entries.len());
357 Ok(entries)
358 }
359
360 pub async fn hard_delete(&self, id: &str) -> Result<Option<FileIndexEntry>> {
371 let entry = self.index.get(id, true).await?;
373
374 match entry {
375 Some(e) => {
376 if let Err(e) = self.storage.delete_data(&e.path).await {
378 warn!("Failed to delete physical file {}: {}", id, e);
379 }
381
382 self.index.hard_delete(id).await?;
384 info!(
385 "Hard deleted file {} (was deleted_at={:?})",
386 id, e.metadata.last_accessed_at
387 );
388
389 Ok(Some(e))
390 }
391 None => {
392 debug!("File {} not found for hard delete", id);
393 Ok(None)
394 }
395 }
396 }
397
398 pub async fn purge_expired(&self, retention_days: u32) -> Result<usize> {
409 let expired = self.index.get_expired_deletions(retention_days).await?;
410
411 let mut purged = 0;
412 for entry in expired {
413 if let Err(e) = self.storage.delete_data(&entry.path).await {
415 warn!("Failed to delete expired file {}: {}", entry.id, e);
416 continue;
417 }
418
419 self.index.hard_delete(&entry.id).await?;
421 purged += 1;
422
423 info!(
424 "Purged expired file {} (deleted at {:?})",
425 entry.id, entry.last_accessed_at
426 );
427 }
428
429 if purged > 0 {
430 info!(
431 "Purge completed: {} expired files permanently deleted",
432 purged
433 );
434 }
435
436 Ok(purged)
437 }
438
439 pub async fn is_deleted(&self, id: &str) -> Result<bool> {
447 let entry = self.index.get(id, true).await?;
449
450 match entry {
451 Some(_) => {
452 let deleted_entries = self.index.list_deleted().await?;
455 Ok(deleted_entries.iter().any(|e| e.id == id))
456 }
457 None => Ok(false),
458 }
459 }
460
461 pub async fn read(&self, handle: &FileHandle) -> Result<Vec<u8>> {
467 let should_read = self.hooks.run_before_read(&handle.id, None).await?;
472
473 if !should_read {
474 return Err(FileError::NotFound(format!(
475 "File {} read was blocked by hook",
476 handle.id
477 )));
478 }
479
480 let data = self.storage.read_data(&handle.path).await?;
484
485 let processed_data = self.hooks.run_after_read(&data).await?;
490
491 Ok(processed_data)
492 }
493
494 pub async fn read_string(&self, handle: &FileHandle) -> Result<String> {
496 let data = self.read(handle).await?;
497 String::from_utf8(data).map_err(|e| FileError::Storage(format!("Invalid UTF-8: {}", e)))
498 }
499
500 pub fn full_path(&self, handle: &FileHandle) -> PathBuf {
502 handle.full_path(&self.config.data_dir())
503 }
504
505 pub async fn exists(&self, id: &str) -> bool {
507 self.index.get(id, false).await.ok().flatten().is_some()
508 }
509
510 pub async fn metadata(&self, id: &str) -> Result<FileMetadata> {
514 self.index
515 .get(id, false)
516 .await?
517 .map(|e| e.metadata)
518 .ok_or_else(|| FileError::NotFound(format!("File {} not found", id)))
519 }
520
521 pub async fn cleanup(&self) -> Result<usize> {
529 let threshold = self.config.cleanup.min_ref_count;
530 let max_age_days = self.config.cleanup.max_age_days;
531
532 let candidates = self
533 .index
534 .get_candidates_for_cleanup(threshold, max_age_days)
535 .await?;
536
537 let mut deleted = 0;
538
539 let deleted_entries = self.index.list_deleted().await?;
542 let deleted_ids: std::collections::HashSet<_> =
543 deleted_entries.iter().map(|e| e.id.clone()).collect();
544
545 for entry in candidates {
546 if deleted_ids.contains(&entry.id) {
548 debug!("Skipping soft-deleted file {} in regular cleanup", entry.id);
549 continue;
550 }
551
552 let can_delete = match self.hooks.run_should_cleanup(&entry).await {
557 Ok(should) => should,
558 Err(e) => {
559 warn!("Cleanup hook error for {}: {}", entry.id, e);
560 continue;
561 }
562 };
563
564 if !can_delete {
565 debug!("Cleanup of {} was blocked by hook", entry.id);
566 continue;
567 }
568
569 if let Err(e) = self.storage.delete_data(&entry.path).await {
573 warn!("Failed to delete file {}: {}", entry.id, e);
574 continue;
575 }
576
577 if let Err(e) = self.index.remove(&entry.id).await {
581 warn!("Failed to remove {} from index: {}", entry.id, e);
582 }
584
585 let _ = self.hooks.run_after_cleanup(&entry).await;
590
591 deleted += 1;
592 info!("Cleaned up file {}", entry.id);
593 }
594
595 if deleted > 0 {
596 info!("Cleanup completed: {} files deleted", deleted);
597 }
598
599 Ok(deleted)
600 }
601
602 async fn cleanup_single(&self, id: &str) -> Result<()> {
606 if let Some(entry) = self.index.get(id, false).await? {
608 if entry.ref_count == 0 {
609 self.storage.delete_data(&entry.path).await?;
610 self.index.remove(id).await?;
611 info!("Immediately cleaned up file {}", id);
612 }
613 } else {
614 return Err(FileError::NotFound(format!("File {} not found", id)));
615 }
616
617 Ok(())
618 }
619
620 pub async fn stats(&self) -> Result<StorageStats> {
622 let mut stats = self.storage.stats().await?;
623
624 let index_stats = self.index.stats().await?;
626 stats.total_refs = index_stats.total_refs;
627
628 Ok(stats)
629 }
630
631 pub fn config(&self) -> &FileConfig {
633 &self.config
634 }
635
636 pub fn hooks(&self) -> &HookRegistry {
640 &self.hooks
641 }
642
643 pub fn hooks_mut(&mut self) -> &mut HookRegistry {
647 &mut self.hooks
648 }
649
650 pub fn start_cleanup_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
654 let interval_secs = self.config.cleanup.interval_secs;
655
656 tokio::spawn(async move {
657 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
658
659 loop {
660 interval.tick().await;
661
662 match self.cleanup().await {
663 Ok(count) => {
664 if count > 0 {
665 info!("Background cleanup removed {} files", count);
666 }
667 }
668 Err(e) => {
669 warn!("Background cleanup failed: {}", e);
670 }
671 }
672 }
673 })
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use tempfile::TempDir;
681
682 async fn create_test_manager() -> (FileManager, TempDir) {
683 let temp_dir = TempDir::new().unwrap();
684 let config = FileConfig::with_path(temp_dir.path().to_path_buf());
685 let manager = FileManager::new(config).await.unwrap();
686 (manager, temp_dir)
687 }
688
689 fn create_test_metadata(name: &str) -> FileMetadata {
690 FileMetadata {
691 name: name.to_string(),
692 size: 100,
693 mime_type: Some("text/plain".to_string()),
694 source: Some("test".to_string()),
695 created_at: chrono::Utc::now(),
696 last_accessed_at: None,
697 preview: None,
698 }
699 }
700
701 #[tokio::test]
702 async fn test_store_and_get() {
703 let (manager, _temp) = create_test_manager().await;
704
705 let data = b"hello world";
706 let metadata = create_test_metadata("test.txt");
707
708 let handle = manager.store(data, metadata.clone()).await.unwrap();
709 assert!(handle.id.starts_with("sha256:"));
710 assert_eq!(handle.metadata.name, "test.txt");
711
712 let handle2 = manager.get(&handle.id).await.unwrap();
714 assert_eq!(handle2.id, handle.id);
715
716 let read_data = manager.read(&handle2).await.unwrap();
718 assert_eq!(read_data, data);
719 }
720
721 #[tokio::test]
722 async fn test_deduplication() {
723 let (manager, _temp) = create_test_manager().await;
724
725 let data = b"test content for dedup";
726 let metadata1 = create_test_metadata("file1.txt");
727 let metadata2 = create_test_metadata("file2.txt");
728
729 let handle1 = manager.store(data, metadata1).await.unwrap();
730 let handle2 = manager.store(data, metadata2).await.unwrap();
731
732 assert_eq!(handle1.id, handle2.id);
734
735 let stats = manager.stats().await.unwrap();
737 assert_eq!(stats.total_files, 1);
738 assert_eq!(stats.total_refs, 2);
739 }
740
741 #[tokio::test]
742 async fn test_reference_counting() {
743 let (manager, _temp) = create_test_manager().await;
744
745 let data = b"ref counting test";
746 let metadata = create_test_metadata("test.txt");
747
748 let handle = manager.store(data, metadata).await.unwrap();
749
750 let cloned = manager.clone_ref(&handle).await.unwrap();
752 assert_eq!(cloned.ref_count(), 2);
753
754 manager.release(&cloned).await.unwrap();
756
757 let stats = manager.stats().await.unwrap();
759 assert_eq!(stats.total_refs, 1);
760 }
761
762 #[tokio::test]
763 async fn test_cleanup() {
764 let temp_dir = TempDir::new().unwrap();
765 let mut config = FileConfig::with_path(temp_dir.path().to_path_buf());
766 config.cleanup.max_age_days = 0; let manager = FileManager::new(config).await.unwrap();
769
770 let data = b"cleanup test";
771 let metadata = create_test_metadata("cleanup.txt");
772
773 let handle = manager.store(data, metadata).await.unwrap();
774
775 manager.release(&handle).await.unwrap();
777
778 let deleted = manager.cleanup().await.unwrap();
780 assert_eq!(deleted, 1);
781
782 assert!(!manager.exists(&handle.id).await);
784 }
785
786 #[tokio::test]
787 async fn test_exists() {
788 let (manager, _temp) = create_test_manager().await;
789
790 let data = b"exists test";
791 let metadata = create_test_metadata("exists.txt");
792
793 let handle = manager.store(data, metadata).await.unwrap();
794
795 assert!(manager.exists(&handle.id).await);
796 assert!(!manager.exists("sha256:nonexistent").await);
797 }
798
799 #[tokio::test]
800 async fn test_file_too_large() {
801 let temp_dir = TempDir::new().unwrap();
802 let mut config = FileConfig::with_path(temp_dir.path().to_path_buf());
803 config.max_file_size = 10; let manager = FileManager::new(config).await.unwrap();
806
807 let data = b"this is too large";
808 let metadata = create_test_metadata("large.txt");
809
810 let result = manager.store(data, metadata).await;
811 assert!(matches!(result, Err(FileError::TooLarge(_, _))));
812 }
813}