fai_protocol/storage/
mod.rs

1//! Storage layer for FAI Protocol
2//!
3//! Handles content-addressed storage of AI models and metadata management.
4
5use anyhow::{anyhow, Result};
6use blake3::Hasher;
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9use std::fs;
10use std::path::PathBuf;
11use std::sync::{Arc, Mutex};
12
13/// Chunk size for large files (1MB)
14const CHUNK_SIZE: usize = 1024 * 1024;
15
16/// Manifest file structure for multi-chunk files
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FileManifest {
19    /// Total file size in bytes
20    pub total_size: u64,
21    /// List of chunk hashes in order
22    pub chunks: Vec<String>,
23    /// Original file name (optional)
24    pub filename: Option<String>,
25}
26
27/// Metadata for a stored AI model
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ModelMetadata {
30    /// Content hash using BLAKE3
31    pub hash: String,
32    /// Model name/identifier
33    pub name: String,
34    /// Model version
35    pub version: String,
36    /// Size in bytes
37    pub size: u64,
38    /// Creation timestamp
39    pub created_at: chrono::DateTime<chrono::Utc>,
40}
41
42/// Information about a commit for P2P transfer
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CommitInfo {
45    /// Commit hash
46    pub hash: String,
47    /// Commit message
48    pub message: String,
49    /// Timestamp as Unix epoch
50    pub timestamp: i64,
51    /// List of file hashes included in this commit
52    pub file_hashes: Vec<String>,
53}
54
55/// Storage manager for AI models
56#[derive(Clone)]
57pub struct StorageManager {
58    /// Root path to .fai directory
59    root_path: PathBuf,
60    /// SQLite database connection for metadata
61    db: Arc<Mutex<Connection>>,
62}
63
64impl StorageManager {
65    /// Create a new storage manager instance with the specified root path
66    pub fn new(root: PathBuf) -> Result<Self> {
67        // Ensure the .fai directory exists
68        fs::create_dir_all(&root)?;
69
70        // Initialize metadata database
71        let db = Connection::open(root.join("metadata.db"))?;
72
73        // Create models table
74        db.execute(
75            "CREATE TABLE IF NOT EXISTS models (
76                hash TEXT PRIMARY KEY,
77                name TEXT NOT NULL,
78                version TEXT NOT NULL,
79                size INTEGER NOT NULL,
80                created_at TEXT NOT NULL
81            )",
82            [],
83        )?;
84
85        // Create commits table for version control
86        db.execute(
87            "CREATE TABLE IF NOT EXISTS commits (
88                hash TEXT PRIMARY KEY,
89                message TEXT NOT NULL,
90                timestamp INTEGER NOT NULL
91            )",
92            [],
93        )?;
94
95        // Create commit_files table to track files in each commit
96        db.execute(
97            "CREATE TABLE IF NOT EXISTS commit_files (
98                commit_hash TEXT NOT NULL,
99                file_hash TEXT NOT NULL,
100                PRIMARY KEY (commit_hash, file_hash),
101                FOREIGN KEY (commit_hash) REFERENCES commits(hash)
102            )",
103            [],
104        )?;
105
106        // Create staging table for files to be committed
107        db.execute(
108            "CREATE TABLE IF NOT EXISTS staging (
109                file_path TEXT PRIMARY KEY,
110                file_hash TEXT NOT NULL,
111                file_size INTEGER NOT NULL
112            )",
113            [],
114        )?;
115
116        Ok(Self {
117            root_path: root,
118            db: Arc::new(Mutex::new(db)),
119        })
120    }
121
122    /// Store data and return its content hash
123    ///
124    /// # Arguments
125    /// * `data` - The data to store
126    ///
127    /// # Returns
128    /// The BLAKE3 hash of the stored data as a hex string
129    /// For large files (>1MB), returns the manifest hash
130    pub fn store(&self, data: &[u8]) -> Result<String> {
131        println!(
132            "DEBUG: StorageManager::store called with {} bytes of data",
133            data.len()
134        );
135        println!("DEBUG: CHUNK_SIZE = {} bytes", CHUNK_SIZE);
136
137        // Check if file needs to be chunked
138        if data.len() > CHUNK_SIZE {
139            let total_chunks = (data.len() + CHUNK_SIZE - 1) / CHUNK_SIZE;
140            println!(
141                "SPLITTING: Large file detected ({} bytes > {} bytes)",
142                data.len(),
143                CHUNK_SIZE
144            );
145            println!(
146                "SPLITTING: Will create {} chunks of {} bytes each",
147                total_chunks, CHUNK_SIZE
148            );
149            println!("Splitting file into {} chunks...", total_chunks);
150
151            // Chunk the file
152            let chunks = self.chunk_file(data)?;
153            println!("SPLITTING: Created {} chunks total", chunks.len());
154
155            // Store each chunk
156            for (i, (chunk_hash, chunk_data)) in chunks.iter().enumerate() {
157                println!(
158                    "CHUNK {}: Storing chunk {}/{} (hash: {}, size: {} bytes)",
159                    i,
160                    i + 1,
161                    chunks.len(),
162                    &chunk_hash[..16],
163                    chunk_data.len()
164                );
165                let stored_hash = self.store_single_object(chunk_data)?;
166                println!(
167                    "Stored chunk {}: {} ({} bytes)",
168                    i,
169                    chunk_hash,
170                    chunk_data.len()
171                );
172                println!("CHUNK {}: Stored with hash: {}", i, &stored_hash[..16]);
173            }
174
175            // Create and store manifest
176            println!("MANIFEST: Creating manifest for {} chunks", chunks.len());
177            let manifest_hash = self.create_manifest(&chunks, None)?;
178            println!("Created manifest: {}", manifest_hash);
179            println!(
180                "MANIFEST: Created manifest with hash: {}",
181                &manifest_hash[..16]
182            );
183            println!(
184                "MANIFEST: Stored large file successfully ({} chunks -> manifest)",
185                chunks.len()
186            );
187
188            Ok(manifest_hash)
189        } else {
190            // Small file - store as single object
191            println!(
192                "SINGLE: Small file detected ({} bytes <= {} bytes)",
193                data.len(),
194                CHUNK_SIZE
195            );
196            println!("SINGLE: Storing as single object");
197            let hash = self.store_single_object(data)?;
198            println!("SINGLE: Stored with hash: {}", &hash[..16]);
199            Ok(hash)
200        }
201    }
202
203    /// Retrieve data by its content hash
204    ///
205    /// # Arguments
206    /// * `hash` - The BLAKE3 hash of the data to retrieve
207    ///
208    /// # Returns
209    /// The stored data as bytes
210    pub fn retrieve(&self, hash: &str) -> Result<Vec<u8>> {
211        println!("DEBUG: StorageManager::retrieve called with hash: {}", hash);
212
213        if hash.len() < 2 {
214            println!("DEBUG: Invalid hash length: {}", hash.len());
215            return Err(anyhow!("Invalid hash length"));
216        }
217
218        let prefix = &hash[..2];
219        let suffix = &hash[2..];
220        let object_path = self.root_path.join("objects").join(prefix).join(suffix);
221
222        println!("DEBUG: Looking for object at path: {:?}", object_path);
223        println!("DEBUG: Object exists: {}", object_path.exists());
224
225        match fs::read(&object_path) {
226            Ok(data) => {
227                println!(
228                    "DEBUG: Successfully retrieved {} bytes for hash: {}",
229                    data.len(),
230                    hash
231                );
232
233                // Check if this is a manifest file (JSON)
234                if let Ok(manifest_str) = std::str::from_utf8(&data) {
235                    if manifest_str.trim_start().starts_with('{') {
236                        println!("DEBUG: Detected manifest file, reconstructing from chunks");
237                        return self.reconstruct_from_manifest(manifest_str);
238                    }
239                }
240
241                // Regular file, return as-is
242                Ok(data)
243            }
244            Err(e) => {
245                println!("DEBUG: Failed to retrieve object {}: {}", hash, e);
246                Err(anyhow!("Object not found: {}", hash))
247            }
248        }
249    }
250
251    /// Reconstruct file data from manifest
252    ///
253    /// # Arguments
254    /// * `manifest_str` - JSON manifest string
255    ///
256    /// # Returns
257    /// Reconstructed file data
258    fn reconstruct_from_manifest(&self, manifest_str: &str) -> Result<Vec<u8>> {
259        println!("DEBUG: Starting manifest reconstruction");
260        println!("DEBUG: Manifest JSON: {}", manifest_str);
261
262        let manifest: FileManifest = serde_json::from_str(manifest_str)?;
263        println!(
264            "DEBUG: Parsed manifest: {} chunks, total size: {} bytes",
265            manifest.chunks.len(),
266            manifest.total_size
267        );
268
269        let mut reconstructed_data = Vec::with_capacity(manifest.total_size as usize);
270        println!(
271            "DEBUG: Allocated reconstruction buffer with capacity: {} bytes",
272            manifest.total_size
273        );
274
275        for (i, chunk_hash) in manifest.chunks.iter().enumerate() {
276            println!(
277                "DEBUG: Retrieving chunk {}/{} (hash: {})",
278                i + 1,
279                manifest.chunks.len(),
280                &chunk_hash[..16]
281            );
282
283            let chunk_data = self.retrieve_single_chunk(chunk_hash)?;
284            println!(
285                "DEBUG: Retrieved chunk {} (size: {} bytes)",
286                i + 1,
287                chunk_data.len()
288            );
289
290            reconstructed_data.extend_from_slice(&chunk_data);
291            println!(
292                "DEBUG: Reconstruction progress: {}/{} chunks, current size: {} bytes",
293                i + 1,
294                manifest.chunks.len(),
295                reconstructed_data.len()
296            );
297        }
298
299        println!(
300            "DEBUG: Successfully reconstructed {} bytes from {} chunks",
301            reconstructed_data.len(),
302            manifest.chunks.len()
303        );
304
305        if reconstructed_data.len() != manifest.total_size as usize {
306            println!(
307                "DEBUG: WARNING: Reconstructed size mismatch! Expected: {}, Got: {}",
308                manifest.total_size,
309                reconstructed_data.len()
310            );
311        }
312
313        Ok(reconstructed_data)
314    }
315
316    /// Retrieve a single chunk by hash
317    ///
318    /// # Arguments
319    /// * `hash` - The chunk hash
320    ///
321    /// # Returns
322    /// The chunk data
323    fn retrieve_single_chunk(&self, hash: &str) -> Result<Vec<u8>> {
324        if hash.len() < 2 {
325            return Err(anyhow!("Invalid chunk hash length"));
326        }
327
328        let prefix = &hash[..2];
329        let suffix = &hash[2..];
330        let object_path = self.root_path.join("objects").join(prefix).join(suffix);
331
332        match fs::read(&object_path) {
333            Ok(data) => Ok(data),
334            Err(e) => Err(anyhow!("Chunk not found: {} - {}", hash, e)),
335        }
336    }
337
338    /// Check if a hash exists in storage
339    ///
340    /// # Arguments
341    /// * `hash` - The BLAKE3 hash to check
342    ///
343    /// # Returns
344    /// true if the hash exists, false otherwise
345    pub fn exists(&self, hash: &str) -> bool {
346        if hash.len() < 2 {
347            return false;
348        }
349
350        let prefix = &hash[..2];
351        let suffix = &hash[2..];
352        let object_path = self.root_path.join("objects").join(prefix).join(suffix);
353
354        object_path.exists()
355    }
356
357    /// Chunk file data into smaller pieces
358    ///
359    /// # Arguments
360    /// * `data` - The file data to chunk
361    ///
362    /// # Returns
363    /// Vector of tuples containing (chunk_hash, chunk_data)
364    fn chunk_file(&self, data: &[u8]) -> Result<Vec<(String, Vec<u8>)>> {
365        let mut chunks = Vec::new();
366
367        for (i, chunk_data) in data.chunks(CHUNK_SIZE).enumerate() {
368            let mut hasher = Hasher::new();
369            hasher.update(chunk_data);
370            let hash = hasher.finalize().to_hex().to_string();
371            println!(
372                "DEBUG: Created chunk {} ({} bytes, hash: {})",
373                i,
374                chunk_data.len(),
375                &hash[..16]
376            );
377            chunks.push((hash, chunk_data.to_vec()));
378        }
379
380        println!("DEBUG: Chunked file into {} chunks", chunks.len());
381        Ok(chunks)
382    }
383
384    /// Create a manifest file for chunks
385    ///
386    /// # Arguments
387    /// * `chunks` - Vector of chunk tuples (hash, data)
388    /// * `filename` - Optional original filename
389    ///
390    /// # Returns
391    /// The manifest hash as a hex string
392    fn create_manifest(
393        &self,
394        chunks: &[(String, Vec<u8>)],
395        filename: Option<String>,
396    ) -> Result<String> {
397        let total_size: u64 = chunks.iter().map(|(_, data)| data.len() as u64).sum();
398        let chunk_hashes: Vec<String> = chunks.iter().map(|(hash, _)| hash.clone()).collect();
399
400        println!("MANIFEST: Building manifest with {} chunks", chunks.len());
401        for (i, (hash, data)) in chunks.iter().enumerate() {
402            println!(
403                "MANIFEST:   Chunk {} -> {} ({} bytes)",
404                i,
405                &hash[..16],
406                data.len()
407            );
408        }
409
410        let manifest = FileManifest {
411            total_size,
412            chunks: chunk_hashes,
413            filename,
414        };
415
416        // Serialize manifest to JSON
417        let manifest_json = serde_json::to_string_pretty(&manifest)?;
418        println!("MANIFEST: JSON size: {} bytes", manifest_json.len());
419        println!(
420            "MANIFEST: Total file size: {} bytes ({:.2} MB)",
421            total_size,
422            total_size as f64 / 1_048_576.0
423        );
424
425        // Store manifest as a regular object
426        self.store_single_object(manifest_json.as_bytes())
427    }
428
429    /// Store a single chunk/object
430    ///
431    /// # Arguments
432    /// * `hash` - The hash of the data
433    /// * `data` - The data to store
434    ///
435    /// # Returns
436    /// Ok(()) if successful
437    fn store_single_object(&self, data: &[u8]) -> Result<String> {
438        // Compute BLAKE3 hash
439        let mut hasher = Hasher::new();
440        hasher.update(data);
441        let hash = hasher.finalize().to_hex().to_string();
442
443        println!("DEBUG: Storing object with hash: {}", hash);
444
445        // Create directory structure: .fai/objects/[first-2-chars]/
446        if hash.len() < 2 {
447            return Err(anyhow!("Invalid hash length"));
448        }
449
450        let prefix = &hash[..2];
451        let suffix = &hash[2..];
452        let object_dir = self.root_path.join("objects").join(prefix);
453
454        println!("DEBUG: Creating object directory: {:?}", object_dir);
455        fs::create_dir_all(&object_dir)?;
456
457        // Write data to: .fai/objects/[first-2-chars]/[rest-of-hash]
458        let object_path = object_dir.join(suffix);
459
460        println!("DEBUG: Object path: {:?}", object_path);
461        println!("DEBUG: Object already exists: {}", object_path.exists());
462
463        // Only write if file doesn't already exist (idempotent operation)
464        if !object_path.exists() {
465            println!("DEBUG: Writing {} bytes to object file", data.len());
466            fs::write(&object_path, data)?;
467            println!("DEBUG: Successfully wrote object file");
468        } else {
469            println!("DEBUG: Object file already exists, skipping write");
470        }
471
472        Ok(hash)
473    }
474
475    /// Store metadata for a model
476    ///
477    /// # Arguments
478    /// * `metadata` - The metadata to store
479    ///
480    /// # Returns
481    /// Ok(()) if successful, Err otherwise
482    pub fn store_metadata(&self, metadata: &ModelMetadata) -> Result<()> {
483        let conn = self.db.lock().unwrap();
484        conn.execute(
485            "INSERT OR REPLACE INTO models (hash, name, version, size, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
486            [
487                &metadata.hash,
488                &metadata.name,
489                &metadata.version,
490                &metadata.size.to_string(),
491                &metadata.created_at.to_rfc3339(),
492            ],
493        )?;
494        Ok(())
495    }
496
497    /// Retrieve metadata for a model
498    ///
499    /// # Arguments
500    /// * `hash` - The BLAKE3 hash of the model
501    ///
502    /// # Returns
503    /// The metadata if found, None otherwise
504    pub fn get_metadata(&self, hash: &str) -> Result<Option<ModelMetadata>> {
505        let conn = self.db.lock().unwrap();
506        let mut stmt = conn
507            .prepare("SELECT hash, name, version, size, created_at FROM models WHERE hash = ?1")?;
508
509        let mut rows = stmt.query([hash])?;
510        if let Some(row) = rows.next()? {
511            Ok(Some(ModelMetadata {
512                hash: row.get(0)?,
513                name: row.get(1)?,
514                version: row.get(2)?,
515                size: row.get(3)?,
516                created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)?
517                    .with_timezone(&chrono::Utc),
518            }))
519        } else {
520            Ok(None)
521        }
522    }
523
524    /// Get all commits from the database
525    ///
526    /// # Returns
527    /// Vector of CommitInfo for all commits
528    pub fn get_all_commits(&self) -> Result<Vec<CommitInfo>> {
529        let conn = self.db.lock().unwrap();
530
531        // Debug: Check what's in the database
532        println!("DEBUG: get_all_commits called, checking database contents");
533        let count: i64 = conn.query_row("SELECT COUNT(*) FROM commits", [], |row| row.get(0))?;
534        println!("DEBUG: Found {} commits in database", count);
535
536        let mut stmt = conn.prepare(
537            "SELECT c.hash, c.message, c.timestamp 
538             FROM commits c 
539             ORDER BY c.timestamp DESC",
540        )?;
541
542        let commit_iter = stmt.query_map([], |row| {
543            Ok(CommitInfo {
544                hash: row.get(0)?,
545                message: row.get(1)?,
546                timestamp: row.get(2)?,
547                file_hashes: Vec::new(), // Will be filled separately
548            })
549        })?;
550
551        let mut commits = Vec::new();
552        for commit_result in commit_iter {
553            let mut commit = commit_result?;
554
555            // Get file hashes for this commit
556            let mut file_stmt =
557                conn.prepare("SELECT file_hash FROM commit_files WHERE commit_hash = ?")?;
558            let file_hashes: Result<Vec<String>, _> = file_stmt
559                .query_map([&commit.hash], |row| row.get(0))?
560                .collect();
561            commit.file_hashes = file_hashes?;
562
563            commits.push(commit);
564        }
565
566        Ok(commits)
567    }
568
569    /// Get a specific commit by hash
570    ///
571    /// # Arguments
572    /// * `hash` - The commit hash
573    ///
574    /// # Returns
575    /// The CommitInfo if found, None otherwise
576    pub fn get_commit(&self, hash: &str) -> Result<Option<CommitInfo>> {
577        let conn = self.db.lock().unwrap();
578
579        // First get basic commit info
580        let mut stmt =
581            conn.prepare("SELECT hash, message, timestamp FROM commits WHERE hash = ?1")?;
582
583        let mut rows = stmt.query([hash])?;
584        if let Some(row) = rows.next()? {
585            let hash: String = row.get(0)?;
586            let message: String = row.get(1)?;
587            let timestamp: i64 = row.get(2)?;
588
589            // Now get file hashes for this commit
590            let mut file_stmt =
591                conn.prepare("SELECT file_hash FROM commit_files WHERE commit_hash = ?1")?;
592
593            let mut file_rows = file_stmt.query([&hash])?;
594            let mut file_hashes = Vec::new();
595            while let Some(row) = file_rows.next()? {
596                file_hashes.push(row.get(0)?);
597            }
598
599            Ok(Some(CommitInfo {
600                hash: hash.clone(),
601                message,
602                timestamp,
603                file_hashes,
604            }))
605        } else {
606            Ok(None)
607        }
608    }
609
610    /// Save a commit received from a remote peer
611    ///
612    /// # Arguments
613    /// * `commit` - The commit information to save
614    ///
615    /// # Returns
616    /// Ok(()) if successful
617    pub fn save_remote_commit(&self, commit: &CommitInfo) -> Result<()> {
618        let mut conn = self.db.lock().unwrap();
619
620        // Start transaction
621        let tx = conn.transaction()?;
622
623        // Insert commit (ignore if exists)
624        tx.execute(
625            "INSERT OR IGNORE INTO commits (hash, message, timestamp) VALUES (?1, ?2, ?3)",
626            [&commit.hash, &commit.message, &commit.timestamp.to_string()],
627        )?;
628
629        // Insert file associations
630        for file_hash in &commit.file_hashes {
631            tx.execute(
632                "INSERT OR IGNORE INTO commit_files (commit_hash, file_hash) VALUES (?1, ?2)",
633                [&commit.hash, file_hash],
634            )?;
635        }
636
637        // Commit transaction
638        tx.commit()?;
639
640        Ok(())
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use tempfile::TempDir;
648
649    fn create_temp_storage() -> (StorageManager, TempDir) {
650        let temp_dir = TempDir::new().unwrap();
651        let storage = StorageManager::new(temp_dir.path().to_path_buf()).unwrap();
652        (storage, temp_dir)
653    }
654
655    #[test]
656    fn test_store_and_retrieve() {
657        let (storage, _temp_dir) = create_temp_storage();
658        let data = b"Hello, FAI Protocol!";
659
660        let hash = storage.store(data).unwrap();
661        let retrieved = storage.retrieve(&hash).unwrap();
662
663        assert_eq!(data.to_vec(), retrieved);
664    }
665
666    #[test]
667    fn test_store_twice_same_hash() {
668        let (storage, _temp_dir) = create_temp_storage();
669        let data = b"Test data for idempotency";
670
671        let hash1 = storage.store(data).unwrap();
672        let hash2 = storage.store(data).unwrap();
673
674        assert_eq!(hash1, hash2);
675    }
676
677    #[test]
678    fn test_exists() {
679        let (storage, _temp_dir) = create_temp_storage();
680        let data = b"Existence test data";
681
682        let hash = storage.store(data).unwrap();
683
684        assert!(storage.exists(&hash));
685        assert!(!storage.exists("nonexistenthash123456789"));
686    }
687
688    #[test]
689    fn test_retrieve_nonexistent() {
690        let (storage, _temp_dir) = create_temp_storage();
691
692        let result = storage.retrieve("nonexistenthash123456789");
693        assert!(result.is_err());
694    }
695
696    #[test]
697    fn test_invalid_hash_length() {
698        let (storage, _temp_dir) = create_temp_storage();
699
700        let result = storage.retrieve("");
701        assert!(result.is_err());
702
703        let result = storage.retrieve("a");
704        assert!(result.is_err());
705
706        assert!(!storage.exists(""));
707        assert!(!storage.exists("a"));
708    }
709}