1use anyhow::{anyhow, Result};
6use blake3::Hasher;
7use rusqlite::Connection;
8use serde::{Deserialize, Serialize};
9use std::fs;
10use std::path::PathBuf;
11use std::sync::{Arc, Mutex};
12
13const CHUNK_SIZE: usize = 1024 * 1024;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FileManifest {
19 pub total_size: u64,
21 pub chunks: Vec<String>,
23 pub filename: Option<String>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ModelMetadata {
30 pub hash: String,
32 pub name: String,
34 pub version: String,
36 pub size: u64,
38 pub created_at: chrono::DateTime<chrono::Utc>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CommitInfo {
45 pub hash: String,
47 pub message: String,
49 pub timestamp: i64,
51 pub file_hashes: Vec<String>,
53}
54
55#[derive(Clone)]
57pub struct StorageManager {
58 root_path: PathBuf,
60 db: Arc<Mutex<Connection>>,
62}
63
64impl StorageManager {
65 pub fn new(root: PathBuf) -> Result<Self> {
67 fs::create_dir_all(&root)?;
69
70 let db = Connection::open(root.join("metadata.db"))?;
72
73 db.execute(
75 "CREATE TABLE IF NOT EXISTS models (
76 hash TEXT PRIMARY KEY,
77 name TEXT NOT NULL,
78 version TEXT NOT NULL,
79 size INTEGER NOT NULL,
80 created_at TEXT NOT NULL
81 )",
82 [],
83 )?;
84
85 db.execute(
87 "CREATE TABLE IF NOT EXISTS commits (
88 hash TEXT PRIMARY KEY,
89 message TEXT NOT NULL,
90 timestamp INTEGER NOT NULL
91 )",
92 [],
93 )?;
94
95 db.execute(
97 "CREATE TABLE IF NOT EXISTS commit_files (
98 commit_hash TEXT NOT NULL,
99 file_hash TEXT NOT NULL,
100 PRIMARY KEY (commit_hash, file_hash),
101 FOREIGN KEY (commit_hash) REFERENCES commits(hash)
102 )",
103 [],
104 )?;
105
106 db.execute(
108 "CREATE TABLE IF NOT EXISTS staging (
109 file_path TEXT PRIMARY KEY,
110 file_hash TEXT NOT NULL,
111 file_size INTEGER NOT NULL
112 )",
113 [],
114 )?;
115
116 Ok(Self {
117 root_path: root,
118 db: Arc::new(Mutex::new(db)),
119 })
120 }
121
122 pub fn store(&self, data: &[u8]) -> Result<String> {
131 println!(
132 "DEBUG: StorageManager::store called with {} bytes of data",
133 data.len()
134 );
135 println!("DEBUG: CHUNK_SIZE = {} bytes", CHUNK_SIZE);
136
137 if data.len() > CHUNK_SIZE {
139 let total_chunks = (data.len() + CHUNK_SIZE - 1) / CHUNK_SIZE;
140 println!(
141 "SPLITTING: Large file detected ({} bytes > {} bytes)",
142 data.len(),
143 CHUNK_SIZE
144 );
145 println!(
146 "SPLITTING: Will create {} chunks of {} bytes each",
147 total_chunks, CHUNK_SIZE
148 );
149 println!("Splitting file into {} chunks...", total_chunks);
150
151 let chunks = self.chunk_file(data)?;
153 println!("SPLITTING: Created {} chunks total", chunks.len());
154
155 for (i, (chunk_hash, chunk_data)) in chunks.iter().enumerate() {
157 println!(
158 "CHUNK {}: Storing chunk {}/{} (hash: {}, size: {} bytes)",
159 i,
160 i + 1,
161 chunks.len(),
162 &chunk_hash[..16],
163 chunk_data.len()
164 );
165 let stored_hash = self.store_single_object(chunk_data)?;
166 println!(
167 "Stored chunk {}: {} ({} bytes)",
168 i,
169 chunk_hash,
170 chunk_data.len()
171 );
172 println!("CHUNK {}: Stored with hash: {}", i, &stored_hash[..16]);
173 }
174
175 println!("MANIFEST: Creating manifest for {} chunks", chunks.len());
177 let manifest_hash = self.create_manifest(&chunks, None)?;
178 println!("Created manifest: {}", manifest_hash);
179 println!(
180 "MANIFEST: Created manifest with hash: {}",
181 &manifest_hash[..16]
182 );
183 println!(
184 "MANIFEST: Stored large file successfully ({} chunks -> manifest)",
185 chunks.len()
186 );
187
188 Ok(manifest_hash)
189 } else {
190 println!(
192 "SINGLE: Small file detected ({} bytes <= {} bytes)",
193 data.len(),
194 CHUNK_SIZE
195 );
196 println!("SINGLE: Storing as single object");
197 let hash = self.store_single_object(data)?;
198 println!("SINGLE: Stored with hash: {}", &hash[..16]);
199 Ok(hash)
200 }
201 }
202
203 pub fn retrieve(&self, hash: &str) -> Result<Vec<u8>> {
211 println!("DEBUG: StorageManager::retrieve called with hash: {}", hash);
212
213 if hash.len() < 2 {
214 println!("DEBUG: Invalid hash length: {}", hash.len());
215 return Err(anyhow!("Invalid hash length"));
216 }
217
218 let prefix = &hash[..2];
219 let suffix = &hash[2..];
220 let object_path = self.root_path.join("objects").join(prefix).join(suffix);
221
222 println!("DEBUG: Looking for object at path: {:?}", object_path);
223 println!("DEBUG: Object exists: {}", object_path.exists());
224
225 match fs::read(&object_path) {
226 Ok(data) => {
227 println!(
228 "DEBUG: Successfully retrieved {} bytes for hash: {}",
229 data.len(),
230 hash
231 );
232
233 if let Ok(manifest_str) = std::str::from_utf8(&data) {
235 if manifest_str.trim_start().starts_with('{') {
236 println!("DEBUG: Detected manifest file, reconstructing from chunks");
237 return self.reconstruct_from_manifest(manifest_str);
238 }
239 }
240
241 Ok(data)
243 }
244 Err(e) => {
245 println!("DEBUG: Failed to retrieve object {}: {}", hash, e);
246 Err(anyhow!("Object not found: {}", hash))
247 }
248 }
249 }
250
251 fn reconstruct_from_manifest(&self, manifest_str: &str) -> Result<Vec<u8>> {
259 println!("DEBUG: Starting manifest reconstruction");
260 println!("DEBUG: Manifest JSON: {}", manifest_str);
261
262 let manifest: FileManifest = serde_json::from_str(manifest_str)?;
263 println!(
264 "DEBUG: Parsed manifest: {} chunks, total size: {} bytes",
265 manifest.chunks.len(),
266 manifest.total_size
267 );
268
269 let mut reconstructed_data = Vec::with_capacity(manifest.total_size as usize);
270 println!(
271 "DEBUG: Allocated reconstruction buffer with capacity: {} bytes",
272 manifest.total_size
273 );
274
275 for (i, chunk_hash) in manifest.chunks.iter().enumerate() {
276 println!(
277 "DEBUG: Retrieving chunk {}/{} (hash: {})",
278 i + 1,
279 manifest.chunks.len(),
280 &chunk_hash[..16]
281 );
282
283 let chunk_data = self.retrieve_single_chunk(chunk_hash)?;
284 println!(
285 "DEBUG: Retrieved chunk {} (size: {} bytes)",
286 i + 1,
287 chunk_data.len()
288 );
289
290 reconstructed_data.extend_from_slice(&chunk_data);
291 println!(
292 "DEBUG: Reconstruction progress: {}/{} chunks, current size: {} bytes",
293 i + 1,
294 manifest.chunks.len(),
295 reconstructed_data.len()
296 );
297 }
298
299 println!(
300 "DEBUG: Successfully reconstructed {} bytes from {} chunks",
301 reconstructed_data.len(),
302 manifest.chunks.len()
303 );
304
305 if reconstructed_data.len() != manifest.total_size as usize {
306 println!(
307 "DEBUG: WARNING: Reconstructed size mismatch! Expected: {}, Got: {}",
308 manifest.total_size,
309 reconstructed_data.len()
310 );
311 }
312
313 Ok(reconstructed_data)
314 }
315
316 fn retrieve_single_chunk(&self, hash: &str) -> Result<Vec<u8>> {
324 if hash.len() < 2 {
325 return Err(anyhow!("Invalid chunk hash length"));
326 }
327
328 let prefix = &hash[..2];
329 let suffix = &hash[2..];
330 let object_path = self.root_path.join("objects").join(prefix).join(suffix);
331
332 match fs::read(&object_path) {
333 Ok(data) => Ok(data),
334 Err(e) => Err(anyhow!("Chunk not found: {} - {}", hash, e)),
335 }
336 }
337
338 pub fn exists(&self, hash: &str) -> bool {
346 if hash.len() < 2 {
347 return false;
348 }
349
350 let prefix = &hash[..2];
351 let suffix = &hash[2..];
352 let object_path = self.root_path.join("objects").join(prefix).join(suffix);
353
354 object_path.exists()
355 }
356
357 fn chunk_file(&self, data: &[u8]) -> Result<Vec<(String, Vec<u8>)>> {
365 let mut chunks = Vec::new();
366
367 for (i, chunk_data) in data.chunks(CHUNK_SIZE).enumerate() {
368 let mut hasher = Hasher::new();
369 hasher.update(chunk_data);
370 let hash = hasher.finalize().to_hex().to_string();
371 println!(
372 "DEBUG: Created chunk {} ({} bytes, hash: {})",
373 i,
374 chunk_data.len(),
375 &hash[..16]
376 );
377 chunks.push((hash, chunk_data.to_vec()));
378 }
379
380 println!("DEBUG: Chunked file into {} chunks", chunks.len());
381 Ok(chunks)
382 }
383
384 fn create_manifest(
393 &self,
394 chunks: &[(String, Vec<u8>)],
395 filename: Option<String>,
396 ) -> Result<String> {
397 let total_size: u64 = chunks.iter().map(|(_, data)| data.len() as u64).sum();
398 let chunk_hashes: Vec<String> = chunks.iter().map(|(hash, _)| hash.clone()).collect();
399
400 println!("MANIFEST: Building manifest with {} chunks", chunks.len());
401 for (i, (hash, data)) in chunks.iter().enumerate() {
402 println!(
403 "MANIFEST: Chunk {} -> {} ({} bytes)",
404 i,
405 &hash[..16],
406 data.len()
407 );
408 }
409
410 let manifest = FileManifest {
411 total_size,
412 chunks: chunk_hashes,
413 filename,
414 };
415
416 let manifest_json = serde_json::to_string_pretty(&manifest)?;
418 println!("MANIFEST: JSON size: {} bytes", manifest_json.len());
419 println!(
420 "MANIFEST: Total file size: {} bytes ({:.2} MB)",
421 total_size,
422 total_size as f64 / 1_048_576.0
423 );
424
425 self.store_single_object(manifest_json.as_bytes())
427 }
428
429 fn store_single_object(&self, data: &[u8]) -> Result<String> {
438 let mut hasher = Hasher::new();
440 hasher.update(data);
441 let hash = hasher.finalize().to_hex().to_string();
442
443 println!("DEBUG: Storing object with hash: {}", hash);
444
445 if hash.len() < 2 {
447 return Err(anyhow!("Invalid hash length"));
448 }
449
450 let prefix = &hash[..2];
451 let suffix = &hash[2..];
452 let object_dir = self.root_path.join("objects").join(prefix);
453
454 println!("DEBUG: Creating object directory: {:?}", object_dir);
455 fs::create_dir_all(&object_dir)?;
456
457 let object_path = object_dir.join(suffix);
459
460 println!("DEBUG: Object path: {:?}", object_path);
461 println!("DEBUG: Object already exists: {}", object_path.exists());
462
463 if !object_path.exists() {
465 println!("DEBUG: Writing {} bytes to object file", data.len());
466 fs::write(&object_path, data)?;
467 println!("DEBUG: Successfully wrote object file");
468 } else {
469 println!("DEBUG: Object file already exists, skipping write");
470 }
471
472 Ok(hash)
473 }
474
475 pub fn store_metadata(&self, metadata: &ModelMetadata) -> Result<()> {
483 let conn = self.db.lock().unwrap();
484 conn.execute(
485 "INSERT OR REPLACE INTO models (hash, name, version, size, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
486 [
487 &metadata.hash,
488 &metadata.name,
489 &metadata.version,
490 &metadata.size.to_string(),
491 &metadata.created_at.to_rfc3339(),
492 ],
493 )?;
494 Ok(())
495 }
496
497 pub fn get_metadata(&self, hash: &str) -> Result<Option<ModelMetadata>> {
505 let conn = self.db.lock().unwrap();
506 let mut stmt = conn
507 .prepare("SELECT hash, name, version, size, created_at FROM models WHERE hash = ?1")?;
508
509 let mut rows = stmt.query([hash])?;
510 if let Some(row) = rows.next()? {
511 Ok(Some(ModelMetadata {
512 hash: row.get(0)?,
513 name: row.get(1)?,
514 version: row.get(2)?,
515 size: row.get(3)?,
516 created_at: chrono::DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)?
517 .with_timezone(&chrono::Utc),
518 }))
519 } else {
520 Ok(None)
521 }
522 }
523
524 pub fn get_all_commits(&self) -> Result<Vec<CommitInfo>> {
529 let conn = self.db.lock().unwrap();
530
531 println!("DEBUG: get_all_commits called, checking database contents");
533 let count: i64 = conn.query_row("SELECT COUNT(*) FROM commits", [], |row| row.get(0))?;
534 println!("DEBUG: Found {} commits in database", count);
535
536 let mut stmt = conn.prepare(
537 "SELECT c.hash, c.message, c.timestamp
538 FROM commits c
539 ORDER BY c.timestamp DESC",
540 )?;
541
542 let commit_iter = stmt.query_map([], |row| {
543 Ok(CommitInfo {
544 hash: row.get(0)?,
545 message: row.get(1)?,
546 timestamp: row.get(2)?,
547 file_hashes: Vec::new(), })
549 })?;
550
551 let mut commits = Vec::new();
552 for commit_result in commit_iter {
553 let mut commit = commit_result?;
554
555 let mut file_stmt =
557 conn.prepare("SELECT file_hash FROM commit_files WHERE commit_hash = ?")?;
558 let file_hashes: Result<Vec<String>, _> = file_stmt
559 .query_map([&commit.hash], |row| row.get(0))?
560 .collect();
561 commit.file_hashes = file_hashes?;
562
563 commits.push(commit);
564 }
565
566 Ok(commits)
567 }
568
569 pub fn get_commit(&self, hash: &str) -> Result<Option<CommitInfo>> {
577 let conn = self.db.lock().unwrap();
578
579 let mut stmt =
581 conn.prepare("SELECT hash, message, timestamp FROM commits WHERE hash = ?1")?;
582
583 let mut rows = stmt.query([hash])?;
584 if let Some(row) = rows.next()? {
585 let hash: String = row.get(0)?;
586 let message: String = row.get(1)?;
587 let timestamp: i64 = row.get(2)?;
588
589 let mut file_stmt =
591 conn.prepare("SELECT file_hash FROM commit_files WHERE commit_hash = ?1")?;
592
593 let mut file_rows = file_stmt.query([&hash])?;
594 let mut file_hashes = Vec::new();
595 while let Some(row) = file_rows.next()? {
596 file_hashes.push(row.get(0)?);
597 }
598
599 Ok(Some(CommitInfo {
600 hash: hash.clone(),
601 message,
602 timestamp,
603 file_hashes,
604 }))
605 } else {
606 Ok(None)
607 }
608 }
609
610 pub fn save_remote_commit(&self, commit: &CommitInfo) -> Result<()> {
618 let mut conn = self.db.lock().unwrap();
619
620 let tx = conn.transaction()?;
622
623 tx.execute(
625 "INSERT OR IGNORE INTO commits (hash, message, timestamp) VALUES (?1, ?2, ?3)",
626 [&commit.hash, &commit.message, &commit.timestamp.to_string()],
627 )?;
628
629 for file_hash in &commit.file_hashes {
631 tx.execute(
632 "INSERT OR IGNORE INTO commit_files (commit_hash, file_hash) VALUES (?1, ?2)",
633 [&commit.hash, file_hash],
634 )?;
635 }
636
637 tx.commit()?;
639
640 Ok(())
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use tempfile::TempDir;
648
649 fn create_temp_storage() -> (StorageManager, TempDir) {
650 let temp_dir = TempDir::new().unwrap();
651 let storage = StorageManager::new(temp_dir.path().to_path_buf()).unwrap();
652 (storage, temp_dir)
653 }
654
655 #[test]
656 fn test_store_and_retrieve() {
657 let (storage, _temp_dir) = create_temp_storage();
658 let data = b"Hello, FAI Protocol!";
659
660 let hash = storage.store(data).unwrap();
661 let retrieved = storage.retrieve(&hash).unwrap();
662
663 assert_eq!(data.to_vec(), retrieved);
664 }
665
666 #[test]
667 fn test_store_twice_same_hash() {
668 let (storage, _temp_dir) = create_temp_storage();
669 let data = b"Test data for idempotency";
670
671 let hash1 = storage.store(data).unwrap();
672 let hash2 = storage.store(data).unwrap();
673
674 assert_eq!(hash1, hash2);
675 }
676
677 #[test]
678 fn test_exists() {
679 let (storage, _temp_dir) = create_temp_storage();
680 let data = b"Existence test data";
681
682 let hash = storage.store(data).unwrap();
683
684 assert!(storage.exists(&hash));
685 assert!(!storage.exists("nonexistenthash123456789"));
686 }
687
688 #[test]
689 fn test_retrieve_nonexistent() {
690 let (storage, _temp_dir) = create_temp_storage();
691
692 let result = storage.retrieve("nonexistenthash123456789");
693 assert!(result.is_err());
694 }
695
696 #[test]
697 fn test_invalid_hash_length() {
698 let (storage, _temp_dir) = create_temp_storage();
699
700 let result = storage.retrieve("");
701 assert!(result.is_err());
702
703 let result = storage.retrieve("a");
704 assert!(result.is_err());
705
706 assert!(!storage.exists(""));
707 assert!(!storage.exists("a"));
708 }
709}