Skip to main content

agent_diva_files/
backend.rs

1//! Storage backend trait and implementations
2//!
3//! This module provides the `StorageBackend` trait for pluggable storage backends.
4//! The default implementation is `LocalStorageBackend` which uses the local filesystem.
5//!
6//! # Example
7//! ```rust,no_run
8//! use agent_diva_files::backend::{StorageBackend, LocalStorageBackend};
9//! use std::path::PathBuf;
10//!
11//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
12//! let backend = LocalStorageBackend::new(PathBuf::from("./data"));
13//! backend.initialize().await?;
14//!
15//! let data = b"hello world";
16//! let path = backend.write("abc123", data).await?;
17//! let read_data = backend.read(&path).await?;
18//! # Ok(())
19//! # }
20//! ```
21
22use async_trait::async_trait;
23use std::path::{Path, PathBuf};
24
25use crate::{FileError, Result};
26
27/// Trait for storage backends
28///
29/// Implement this trait to add support for different storage backends
30/// such as S3, Azure Blob, Google Cloud Storage, etc.
31#[async_trait]
32pub trait StorageBackend: Send + Sync {
33    /// Initialize the storage backend
34    ///
35    /// This should create necessary directories, buckets, or connections.
36    async fn initialize(&self) -> Result<()>;
37
38    /// Write data to storage and return a relative path/identifier
39    ///
40    /// # Arguments
41    /// * `key` - A unique identifier for the data (typically a hash)
42    /// * `data` - The data to store
43    ///
44    /// # Returns
45    /// A relative path or identifier that can be used to read the data later
46    async fn write(&self, key: &str, data: &[u8]) -> Result<PathBuf>;
47
48    /// Read data from storage
49    ///
50    /// # Arguments
51    /// * `path` - The relative path returned by `write`
52    ///
53    /// # Returns
54    /// The stored data as a byte vector
55    async fn read(&self, path: &Path) -> Result<Vec<u8>>;
56
57    /// Delete data from storage
58    ///
59    /// # Arguments
60    /// * `path` - The relative path returned by `write`
61    async fn delete(&self, path: &Path) -> Result<()>;
62
63    /// Check if data exists
64    ///
65    /// # Arguments
66    /// * `key` - The unique identifier for the data
67    async fn exists(&self, key: &str) -> bool;
68
69    /// Get the full path/URI for a relative path
70    ///
71    /// This is used to construct absolute paths for local files
72    /// or URIs for remote storage.
73    fn full_path(&self, relative_path: &Path) -> PathBuf;
74
75    /// Get storage statistics
76    ///
77    /// Returns information about the storage usage.
78    async fn stats(&self) -> Result<BackendStats>;
79}
80
81/// Statistics for a storage backend
82#[derive(Debug, Clone, Default)]
83pub struct BackendStats {
84    /// Total number of stored objects
85    pub total_objects: usize,
86    /// Total size in bytes
87    pub total_size: u64,
88    /// Available space (if applicable)
89    pub available_space: Option<u64>,
90}
91
92/// Local filesystem storage backend
93///
94/// This is the default storage backend that stores files on the local filesystem
95/// using a content-addressed structure based on hash prefixes.
96pub struct LocalStorageBackend {
97    /// Base directory for storage
98    data_dir: PathBuf,
99}
100
101impl LocalStorageBackend {
102    /// Create a new local storage backend
103    pub fn new(data_dir: PathBuf) -> Self {
104        Self { data_dir }
105    }
106
107    /// Get the data directory
108    pub fn data_dir(&self) -> &Path {
109        &self.data_dir
110    }
111
112    /// Convert a hash to a storage path
113    ///
114    /// Example: "abcdef123..." -> "ab/cdef123..."
115    fn hash_to_path(&self, hash: &str) -> PathBuf {
116        if hash.len() < 4 {
117            return PathBuf::from(hash);
118        }
119        let prefix = &hash[0..2];
120        let rest = &hash[2..];
121        PathBuf::from(prefix).join(rest)
122    }
123}
124
125#[async_trait]
126impl StorageBackend for LocalStorageBackend {
127    async fn initialize(&self) -> Result<()> {
128        use tokio::fs;
129
130        // Create data directory
131        fs::create_dir_all(&self.data_dir).await?;
132
133        // Create hash-based subdirectories (aa, ab, ac, ..., ff)
134        // This prevents having too many files in a single directory
135        for first in 'a'..='f' {
136            for second in '0'..='9' {
137                let subdir = self.data_dir.join(format!("{}{}", first, second));
138                fs::create_dir_all(&subdir).await?;
139            }
140            for second in 'a'..='f' {
141                let subdir = self.data_dir.join(format!("{}{}", first, second));
142                fs::create_dir_all(&subdir).await?;
143            }
144        }
145
146        tracing::info!("Local storage backend initialized at {:?}", self.data_dir);
147        Ok(())
148    }
149
150    async fn write(&self, key: &str, data: &[u8]) -> Result<PathBuf> {
151        use tokio::fs;
152
153        let relative_path = self.hash_to_path(key);
154        let full_path = self.data_dir.join(&relative_path);
155
156        // Check if file already exists (deduplication)
157        if full_path.exists() {
158            tracing::debug!("File with key {} already exists, skipping write", key);
159            return Ok(relative_path);
160        }
161
162        // Ensure parent directory exists
163        if let Some(parent) = full_path.parent() {
164            fs::create_dir_all(parent).await?;
165        }
166
167        // Write file atomically using temp file + rename
168        let temp_path = full_path.with_extension("tmp");
169        fs::write(&temp_path, data).await?;
170
171        // Atomic rename
172        fs::rename(&temp_path, &full_path).await?;
173
174        tracing::debug!("Stored file with key {} at {:?}", key, full_path);
175        Ok(relative_path)
176    }
177
178    async fn read(&self, relative_path: &Path) -> Result<Vec<u8>> {
179        use tokio::fs;
180
181        let full_path = self.data_dir.join(relative_path);
182
183        if !full_path.exists() {
184            return Err(FileError::NotFound(format!(
185                "File not found at {:?}",
186                full_path
187            )));
188        }
189
190        Ok(fs::read(&full_path).await?)
191    }
192
193    async fn delete(&self, relative_path: &Path) -> Result<()> {
194        use tokio::fs;
195
196        let full_path = self.data_dir.join(relative_path);
197
198        if full_path.exists() {
199            fs::remove_file(&full_path).await?;
200            tracing::debug!("Deleted file at {:?}", full_path);
201        }
202
203        Ok(())
204    }
205
206    async fn exists(&self, key: &str) -> bool {
207        let path = self.data_dir.join(self.hash_to_path(key));
208        path.exists()
209    }
210
211    fn full_path(&self, relative_path: &Path) -> PathBuf {
212        self.data_dir.join(relative_path)
213    }
214
215    async fn stats(&self) -> Result<BackendStats> {
216        let mut stats = BackendStats::default();
217
218        // Walk the directory and count files
219        let mut entries = tokio::fs::read_dir(&self.data_dir).await?;
220
221        while let Some(entry) = entries.next_entry().await? {
222            let path = entry.path();
223            if path.is_dir() {
224                // Count files in subdirectories
225                let mut sub_entries = tokio::fs::read_dir(&path).await?;
226                while let Some(sub_entry) = sub_entries.next_entry().await? {
227                    let sub_path = sub_entry.path();
228                    if sub_path.is_file() {
229                        stats.total_objects += 1;
230                        if let Ok(metadata) = sub_entry.metadata().await {
231                            stats.total_size += metadata.len();
232                        }
233                    }
234                }
235            } else if path.is_file() {
236                stats.total_objects += 1;
237                if let Ok(metadata) = entry.metadata().await {
238                    stats.total_size += metadata.len();
239                }
240            }
241        }
242
243        Ok(stats)
244    }
245}
246
247/// S3-compatible storage backend (placeholder for future implementation)
248///
249/// This is a stub that shows how to implement a remote storage backend.
250/// Uncomment and implement when needed.
251/*
252pub struct S3StorageBackend {
253    bucket: String,
254    prefix: String,
255    client: aws_sdk_s3::Client,
256}
257
258#[async_trait]
259impl StorageBackend for S3StorageBackend {
260    async fn initialize(&self) -> Result<()> {
261        // Ensure bucket exists or create it
262        todo!("Implement S3 initialization")
263    }
264
265    async fn write(&self, key: &str, data: &[u8]) -> Result<PathBuf> {
266        // Upload to S3
267        let path = PathBuf::from(format!("{}/{}", self.prefix, key));
268        todo!("Implement S3 upload")
269    }
270
271    async fn read(&self, path: &Path) -> Result<Vec<u8>> {
272        // Download from S3
273        todo!("Implement S3 download")
274    }
275
276    async fn delete(&self, path: &Path) -> Result<()> {
277        // Delete from S3
278        todo!("Implement S3 delete")
279    }
280
281    async fn exists(&self, key: &str) -> bool {
282        // Check S3 head object
283        todo!("Implement S3 exists check")
284    }
285
286    fn full_path(&self, relative_path: &Path) -> PathBuf {
287        // Return S3 URI
288        PathBuf::from(format!("s3://{}/{}", self.bucket, relative_path.display()))
289    }
290
291    async fn stats(&self) -> Result<BackendStats> {
292        // Get S3 bucket stats
293        todo!("Implement S3 stats")
294    }
295}
296*/
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use tempfile::TempDir;
302
303    #[tokio::test]
304    async fn test_local_storage_backend() {
305        let temp_dir = TempDir::new().unwrap();
306        let backend = LocalStorageBackend::new(temp_dir.path().to_path_buf());
307
308        // Initialize
309        backend.initialize().await.unwrap();
310
311        // Write
312        let data = b"test content";
313        let path = backend.write("abc123", data).await.unwrap();
314        assert_eq!(path, PathBuf::from("ab/c123"));
315
316        // Exists
317        assert!(backend.exists("abc123").await);
318        assert!(!backend.exists("xyz789").await);
319
320        // Read
321        let read_data = backend.read(&path).await.unwrap();
322        assert_eq!(read_data, data);
323
324        // Full path
325        let full = backend.full_path(&path);
326        assert!(full.to_string_lossy().contains("ab"));
327        assert!(full.to_string_lossy().contains("c123"));
328
329        // Delete
330        backend.delete(&path).await.unwrap();
331        assert!(!backend.exists("abc123").await);
332    }
333
334    #[tokio::test]
335    async fn test_deduplication() {
336        let temp_dir = TempDir::new().unwrap();
337        let backend = LocalStorageBackend::new(temp_dir.path().to_path_buf());
338        backend.initialize().await.unwrap();
339
340        let data = b"duplicate content";
341
342        // Write twice with same key
343        let path1 = backend.write("same_key", data).await.unwrap();
344        let path2 = backend.write("same_key", data).await.unwrap();
345
346        // Should return same path
347        assert_eq!(path1, path2);
348
349        // Should only have one file
350        let stats = backend.stats().await.unwrap();
351        assert_eq!(stats.total_objects, 1);
352    }
353
354    #[test]
355    fn test_hash_to_path() {
356        let backend = LocalStorageBackend::new(PathBuf::from("/tmp"));
357
358        assert_eq!(
359            backend.hash_to_path("abcdef123456"),
360            PathBuf::from("ab/cdef123456")
361        );
362        assert_eq!(backend.hash_to_path("ab"), PathBuf::from("ab"));
363        assert_eq!(backend.hash_to_path("a"), PathBuf::from("a"));
364        assert_eq!(backend.hash_to_path(""), PathBuf::from(""));
365    }
366}