1use crate::config::EncryptionMode;
8use crate::FecError;
9use anyhow::Result;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15use tokio::fs;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct Cid([u8; 32]);
22
23impl Cid {
24 pub fn new(bytes: [u8; 32]) -> Self {
26 Self(bytes)
27 }
28
29 pub fn from_data(data: &[u8]) -> Self {
31 let hash = blake3::hash(data);
32 Self(*hash.as_bytes())
33 }
34
35 pub fn as_bytes(&self) -> &[u8; 32] {
37 &self.0
38 }
39
40 pub fn to_hex(&self) -> String {
42 hex::encode(self.0)
43 }
44}
45
46impl From<[u8; 32]> for Cid {
47 fn from(bytes: [u8; 32]) -> Self {
48 Self(bytes)
49 }
50}
51
52impl From<blake3::Hash> for Cid {
53 fn from(hash: blake3::Hash) -> Self {
54 Self(*hash.as_bytes())
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ShardHeader {
64 pub version: u8,
66 pub encryption_mode: EncryptionMode,
68 pub nspec: (u8, u8),
70 pub data_size: u32,
72 pub nonce: [u8; 32],
74 #[serde(with = "serde_bytes")]
76 pub reserved: Vec<u8>,
77}
78
79impl ShardHeader {
80 const SIZE: usize = 106; pub fn new(
84 encryption_mode: EncryptionMode,
85 nspec: (u8, u8),
86 data_size: u32,
87 nonce: [u8; 32],
88 ) -> Self {
89 Self {
90 version: 1,
91 encryption_mode,
92 nspec,
93 data_size,
94 nonce,
95 reserved: vec![0u8; 55],
96 }
97 }
98
99 pub fn to_bytes(&self) -> Result<[u8; Self::SIZE], FecError> {
101 let serialized = postcard::to_stdvec(self)
102 .map_err(|e| FecError::Backend(format!("Failed to serialize header: {}", e)))?;
103
104 let mut result = [0u8; Self::SIZE];
106 if serialized.len() > Self::SIZE {
107 return Err(FecError::Backend(format!(
108 "Header too large: {} > {}",
109 serialized.len(),
110 Self::SIZE
111 )));
112 }
113 result[..serialized.len()].copy_from_slice(&serialized);
114 result[Self::SIZE - 1] = serialized.len() as u8;
116 Ok(result)
117 }
118
119 pub fn from_bytes(bytes: &[u8]) -> Result<Self, FecError> {
121 if bytes.len() != Self::SIZE {
122 return Err(FecError::Backend(format!(
123 "Invalid header size: expected {}, got {}",
124 Self::SIZE,
125 bytes.len()
126 )));
127 }
128 let actual_len = bytes[Self::SIZE - 1] as usize;
130 if actual_len == 0 || actual_len > Self::SIZE - 1 {
131 return postcard::from_bytes(bytes)
133 .map_err(|e| FecError::Backend(format!("Failed to deserialize header: {}", e)));
134 }
135 postcard::from_bytes(&bytes[..actual_len])
136 .map_err(|e| FecError::Backend(format!("Failed to deserialize header: {}", e)))
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct Shard {
143 pub header: ShardHeader,
145 pub data: Vec<u8>,
147}
148
149impl Shard {
150 pub fn new(header: ShardHeader, data: Vec<u8>) -> Self {
152 Self { header, data }
153 }
154
155 pub fn cid(&self) -> Result<Cid, FecError> {
157 let header_bytes = self.header.to_bytes()?;
158 let mut hasher = blake3::Hasher::new();
159 hasher.update(&header_bytes);
160 hasher.update(&self.data);
161 Ok(Cid::from(hasher.finalize()))
162 }
163
164 pub fn to_bytes(&self) -> Result<Vec<u8>, FecError> {
166 let header_bytes = self.header.to_bytes()?;
167 let mut result = Vec::with_capacity(ShardHeader::SIZE + self.data.len());
168 result.extend_from_slice(&header_bytes);
169 result.extend_from_slice(&self.data);
170 Ok(result)
171 }
172
173 pub fn from_bytes(bytes: &[u8]) -> Result<Self, FecError> {
175 if bytes.len() < ShardHeader::SIZE {
176 return Err(FecError::Backend(
177 "Insufficient data for shard header".to_string(),
178 ));
179 }
180
181 let header = ShardHeader::from_bytes(&bytes[..ShardHeader::SIZE])?;
182 let data = bytes[ShardHeader::SIZE..].to_vec();
183
184 Ok(Self { header, data })
185 }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ChunkMeta {
191 pub nspec: (u8, u8),
193 pub mode: EncryptionMode,
195 pub shard_ids: Vec<String>,
197}
198
199impl ChunkMeta {
200 pub fn new(nspec: (u8, u8), mode: EncryptionMode, shard_ids: Vec<String>) -> Self {
202 Self {
203 nspec,
204 mode,
205 shard_ids,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct FileMetadata {
213 pub file_id: [u8; 32],
215 pub file_size: u64,
217 pub chunks: Vec<ChunkMeta>,
219 pub created_at: u64,
221 pub version: u8,
223}
224
225impl FileMetadata {
226 pub fn new(file_id: [u8; 32], file_size: u64, chunks: Vec<ChunkMeta>) -> Self {
228 let created_at = std::time::SystemTime::now()
229 .duration_since(std::time::UNIX_EPOCH)
230 .map(|d| d.as_secs())
231 .unwrap_or(0);
232
233 Self {
234 file_id,
235 file_size,
236 chunks,
237 created_at,
238 version: 1,
239 }
240 }
241}
242
243#[async_trait]
245pub trait StorageBackend: Send + Sync {
246 async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError>;
248
249 async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError>;
251
252 async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError>;
254
255 async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError>;
257
258 async fn list_shards(&self) -> Result<Vec<Cid>, FecError>;
260
261 async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError>;
263
264 async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError>;
266
267 async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError>;
269
270 async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError>;
272
273 async fn stats(&self) -> Result<StorageStats, FecError>;
275
276 async fn garbage_collect(&self) -> Result<GcReport, FecError>;
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct StorageStats {
283 pub total_shards: u64,
285 pub total_size: u64,
287 pub metadata_count: u64,
289 pub unreferenced_shards: u64,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct GcReport {
296 pub shards_deleted: u64,
298 pub bytes_freed: u64,
300 pub duration_ms: u64,
302}
303
304pub struct LocalStorage {
307 base_path: PathBuf,
309 metadata_path: PathBuf,
311 shard_levels: usize,
313}
314
315impl LocalStorage {
316 pub async fn new(base_path: PathBuf) -> Result<Self, FecError> {
318 let metadata_path = base_path.join("metadata");
319
320 fs::create_dir_all(&base_path).await.map_err(FecError::Io)?;
321 fs::create_dir_all(&metadata_path)
322 .await
323 .map_err(FecError::Io)?;
324
325 Ok(Self {
326 base_path,
327 metadata_path,
328 shard_levels: 2, })
330 }
331
332 fn shard_path(&self, cid: &Cid) -> PathBuf {
334 let hex = cid.to_hex();
335
336 let mut path = self.base_path.join("shards");
338
339 for level in 0..self.shard_levels {
340 if hex.len() > level * 2 + 2 {
341 path = path.join(&hex[level * 2..level * 2 + 2]);
342 }
343 }
344
345 path.join(format!("{}.shard", hex))
346 }
347
348 fn metadata_file_path(&self, file_id: &[u8; 32]) -> PathBuf {
350 let hex = hex::encode(file_id);
351 self.metadata_path.join(format!("{}.meta", hex))
352 }
353
354 async fn ensure_parent(&self, path: &Path) -> Result<(), FecError> {
356 if let Some(parent) = path.parent() {
357 fs::create_dir_all(parent).await.map_err(FecError::Io)?;
358 }
359 Ok(())
360 }
361}
362
363#[async_trait]
364impl StorageBackend for LocalStorage {
365 async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
366 let path = self.shard_path(cid);
367
368 self.ensure_parent(&path).await?;
370
371 let shard_bytes = shard.to_bytes()?;
373
374 let temp_path = path.with_extension("tmp");
376
377 let mut file = fs::File::create(&temp_path).await.map_err(FecError::Io)?;
378
379 file.write_all(&shard_bytes).await.map_err(FecError::Io)?;
380
381 file.sync_all().await.map_err(FecError::Io)?;
382
383 fs::rename(temp_path, path).await.map_err(FecError::Io)?;
385
386 Ok(())
387 }
388
389 async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
390 let path = self.shard_path(cid);
391
392 let mut file = fs::File::open(&path).await.map_err(|e| {
393 FecError::Backend(format!("Failed to open shard file {:?}: {}", path, e))
394 })?;
395
396 let mut data = Vec::new();
397 file.read_to_end(&mut data).await.map_err(FecError::Io)?;
398
399 Shard::from_bytes(&data)
400 }
401
402 async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
403 let path = self.shard_path(cid);
404
405 if path.exists() {
406 fs::remove_file(path).await.map_err(FecError::Io)?;
407 }
408
409 Ok(())
410 }
411
412 async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
413 let path = self.shard_path(cid);
414 Ok(path.exists())
415 }
416
417 async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
418 let mut shards = Vec::new();
419 let shards_dir = self.base_path.join("shards");
420
421 let mut stack = vec![shards_dir];
423
424 while let Some(dir) = stack.pop() {
425 if !dir.exists() {
426 continue;
427 }
428
429 let mut entries = fs::read_dir(&dir).await.map_err(|e| {
430 FecError::Backend(format!("Failed to read directory {:?}: {}", dir, e))
431 })?;
432
433 while let Some(entry) = entries.next_entry().await.map_err(FecError::Io)? {
434 let path = entry.path();
435
436 if path.is_dir() {
437 stack.push(path);
438 } else if let Some(name) = path.file_name() {
439 if let Some(name_str) = name.to_str() {
440 if name_str.ends_with(".shard") {
441 let hex = name_str.trim_end_matches(".shard");
443 if let Ok(cid_bytes) = hex::decode(hex) {
444 if cid_bytes.len() == 32 {
445 let mut cid_array = [0u8; 32];
446 cid_array.copy_from_slice(&cid_bytes);
447 shards.push(Cid::new(cid_array));
448 }
449 }
450 }
451 }
452 }
453 }
454 }
455 Ok(shards)
456 }
457
458 async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
459 let path = self.metadata_file_path(&metadata.file_id);
460
461 let serialized = postcard::to_stdvec(metadata)
462 .map_err(|e| FecError::Backend(format!("Failed to serialize metadata: {}", e)))?;
463
464 let temp_path = path.with_extension("tmp");
465
466 let mut file = fs::File::create(&temp_path).await.map_err(FecError::Io)?;
467
468 file.write_all(&serialized).await.map_err(FecError::Io)?;
469
470 file.sync_all().await.map_err(FecError::Io)?;
471
472 fs::rename(temp_path, path).await.map_err(FecError::Io)?;
474
475 Ok(())
476 }
477
478 async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
479 let path = self.metadata_file_path(file_id);
480
481 let data = fs::read(&path).await.map_err(|e| {
482 FecError::Backend(format!("Failed to read metadata file {:?}: {}", path, e))
483 })?;
484
485 postcard::from_bytes(&data)
486 .map_err(|e| FecError::Backend(format!("Failed to deserialize metadata: {}", e)))
487 }
488
489 async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
490 let path = self.metadata_file_path(file_id);
491
492 if path.exists() {
493 fs::remove_file(path).await.map_err(FecError::Io)?;
494 }
495
496 Ok(())
497 }
498
499 async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
500 let mut metadata_list = Vec::new();
501
502 let mut entries = fs::read_dir(&self.metadata_path)
503 .await
504 .map_err(FecError::Io)?;
505
506 while let Some(entry) = entries.next_entry().await.map_err(FecError::Io)? {
507 let path = entry.path();
508 if let Some(name) = path.file_name() {
509 if let Some(name_str) = name.to_str() {
510 if name_str.ends_with(".meta") {
511 let data = fs::read(&path).await.map_err(FecError::Io)?;
512 if let Ok(metadata) = postcard::from_bytes::<FileMetadata>(&data) {
513 metadata_list.push(metadata);
514 }
515 }
516 }
517 }
518 }
519
520 Ok(metadata_list)
521 }
522 async fn stats(&self) -> Result<StorageStats, FecError> {
523 let shards = self.list_shards().await?;
524 let metadata = self.list_metadata().await?;
525
526 let mut total_size = 0u64;
528 for cid in &shards {
529 if let Ok(shard) = self.get_shard(cid).await {
530 total_size += shard.data.len() as u64 + ShardHeader::SIZE as u64;
531 }
532 }
533
534 let mut referenced_cids = std::collections::HashSet::new();
536 for meta in &metadata {
537 for chunk in &meta.chunks {
538 for shard_id in &chunk.shard_ids {
539 if let Ok(cid_bytes) = hex::decode(shard_id) {
540 if cid_bytes.len() == 32 {
541 let mut cid_array = [0u8; 32];
542 cid_array.copy_from_slice(&cid_bytes);
543 referenced_cids.insert(Cid::new(cid_array));
544 }
545 }
546 }
547 }
548 }
549
550 let unreferenced_shards = shards
551 .iter()
552 .filter(|cid| !referenced_cids.contains(cid))
553 .count() as u64;
554
555 Ok(StorageStats {
556 total_shards: shards.len() as u64,
557 total_size,
558 metadata_count: metadata.len() as u64,
559 unreferenced_shards,
560 })
561 }
562
563 async fn garbage_collect(&self) -> Result<GcReport, FecError> {
564 let start_time = std::time::Instant::now();
565 let mut shards_deleted = 0u64;
566 let mut bytes_freed = 0u64;
567
568 let shards = self.list_shards().await?;
570 let metadata = self.list_metadata().await?;
571
572 let mut referenced_cids = std::collections::HashSet::new();
574 for meta in &metadata {
575 for chunk in &meta.chunks {
576 for shard_id in &chunk.shard_ids {
577 if let Ok(cid_bytes) = hex::decode(shard_id) {
578 if cid_bytes.len() == 32 {
579 let mut cid_array = [0u8; 32];
580 cid_array.copy_from_slice(&cid_bytes);
581 referenced_cids.insert(Cid::new(cid_array));
582 }
583 }
584 }
585 }
586 }
587
588 for cid in shards {
590 if !referenced_cids.contains(&cid) {
591 if let Ok(shard) = self.get_shard(&cid).await {
592 let shard_size = shard.data.len() as u64 + ShardHeader::SIZE as u64;
593 if self.delete_shard(&cid).await.is_ok() {
594 shards_deleted += 1;
595 bytes_freed += shard_size;
596 }
597 }
598 }
599 }
600
601 let duration_ms = start_time.elapsed().as_millis() as u64;
602
603 Ok(GcReport {
604 shards_deleted,
605 bytes_freed,
606 duration_ms,
607 })
608 }
609}
610
611pub struct MemoryStorage {
614 shards: Arc<RwLock<HashMap<Cid, Shard>>>,
616 metadata: Arc<RwLock<HashMap<[u8; 32], FileMetadata>>>,
618}
619
620impl MemoryStorage {
621 pub fn new() -> Self {
623 Self {
624 shards: Arc::new(RwLock::new(HashMap::new())),
625 metadata: Arc::new(RwLock::new(HashMap::new())),
626 }
627 }
628
629 pub fn clear(&self) {
631 match self.shards.write() {
633 Ok(mut guard) => guard.clear(),
634 Err(poisoned) => poisoned.into_inner().clear(),
635 }
636 match self.metadata.write() {
637 Ok(mut guard) => guard.clear(),
638 Err(poisoned) => poisoned.into_inner().clear(),
639 }
640 }
641
642 pub fn shard_count(&self) -> usize {
644 match self.shards.read() {
645 Ok(guard) => guard.len(),
646 Err(poisoned) => poisoned.into_inner().len(),
647 }
648 }
649
650 pub fn metadata_count(&self) -> usize {
652 match self.metadata.read() {
653 Ok(guard) => guard.len(),
654 Err(poisoned) => poisoned.into_inner().len(),
655 }
656 }
657}
658
659impl Default for MemoryStorage {
660 fn default() -> Self {
661 Self::new()
662 }
663}
664
665#[async_trait]
666impl StorageBackend for MemoryStorage {
667 async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
668 let mut shards = match self.shards.write() {
669 Ok(guard) => guard,
670 Err(poisoned) => poisoned.into_inner(),
671 };
672 shards.insert(*cid, shard.clone());
673 Ok(())
674 }
675
676 async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
677 let shards = match self.shards.read() {
678 Ok(guard) => guard,
679 Err(poisoned) => poisoned.into_inner(),
680 };
681 shards
682 .get(cid)
683 .cloned()
684 .ok_or_else(|| FecError::Backend(format!("Shard not found: {}", cid.to_hex())))
685 }
686
687 async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
688 let mut shards = match self.shards.write() {
689 Ok(guard) => guard,
690 Err(poisoned) => poisoned.into_inner(),
691 };
692 shards.remove(cid);
693 Ok(())
694 }
695
696 async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
697 let shards = match self.shards.read() {
698 Ok(guard) => guard,
699 Err(poisoned) => poisoned.into_inner(),
700 };
701 Ok(shards.contains_key(cid))
702 }
703
704 async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
705 let shards = match self.shards.read() {
706 Ok(guard) => guard,
707 Err(poisoned) => poisoned.into_inner(),
708 };
709 Ok(shards.keys().copied().collect())
710 }
711
712 async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
713 let mut metadata_store = match self.metadata.write() {
714 Ok(guard) => guard,
715 Err(poisoned) => poisoned.into_inner(),
716 };
717 metadata_store.insert(metadata.file_id, metadata.clone());
718 Ok(())
719 }
720
721 async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
722 let metadata_store = match self.metadata.read() {
723 Ok(guard) => guard,
724 Err(poisoned) => poisoned.into_inner(),
725 };
726 metadata_store.get(file_id).cloned().ok_or_else(|| {
727 FecError::Backend(format!("Metadata not found: {}", hex::encode(file_id)))
728 })
729 }
730
731 async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
732 let mut metadata_store = match self.metadata.write() {
733 Ok(guard) => guard,
734 Err(poisoned) => poisoned.into_inner(),
735 };
736 metadata_store.remove(file_id);
737 Ok(())
738 }
739
740 async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
741 let metadata_store = match self.metadata.read() {
742 Ok(guard) => guard,
743 Err(poisoned) => poisoned.into_inner(),
744 };
745 Ok(metadata_store.values().cloned().collect())
746 }
747
748 async fn stats(&self) -> Result<StorageStats, FecError> {
749 let shards = match self.shards.read() {
750 Ok(guard) => guard,
751 Err(poisoned) => poisoned.into_inner(),
752 };
753 let metadata = match self.metadata.read() {
754 Ok(guard) => guard,
755 Err(poisoned) => poisoned.into_inner(),
756 };
757
758 let total_size: u64 = shards
759 .values()
760 .map(|shard| shard.data.len() as u64 + ShardHeader::SIZE as u64)
761 .sum();
762
763 let mut referenced_cids = std::collections::HashSet::new();
765 for meta in metadata.values() {
766 for chunk in &meta.chunks {
767 for shard_id in &chunk.shard_ids {
768 if let Ok(cid_bytes) = hex::decode(shard_id) {
769 if cid_bytes.len() == 32 {
770 let mut cid_array = [0u8; 32];
771 cid_array.copy_from_slice(&cid_bytes);
772 referenced_cids.insert(Cid::new(cid_array));
773 }
774 }
775 }
776 }
777 }
778
779 let unreferenced_shards = shards
780 .keys()
781 .filter(|cid| !referenced_cids.contains(cid))
782 .count() as u64;
783
784 Ok(StorageStats {
785 total_shards: shards.len() as u64,
786 total_size,
787 metadata_count: metadata.len() as u64,
788 unreferenced_shards,
789 })
790 }
791
792 async fn garbage_collect(&self) -> Result<GcReport, FecError> {
793 let start_time = std::time::Instant::now();
794 let mut shards_deleted = 0u64;
795 let mut bytes_freed = 0u64;
796
797 let shards = match self.shards.read() {
799 Ok(guard) => guard.clone(),
800 Err(poisoned) => poisoned.into_inner().clone(),
801 };
802 let metadata = match self.metadata.read() {
803 Ok(guard) => guard.clone(),
804 Err(poisoned) => poisoned.into_inner().clone(),
805 };
806
807 let mut referenced_cids = std::collections::HashSet::new();
809 for meta in metadata.values() {
810 for chunk in &meta.chunks {
811 for shard_id in &chunk.shard_ids {
812 if let Ok(cid_bytes) = hex::decode(shard_id) {
813 if cid_bytes.len() == 32 {
814 let mut cid_array = [0u8; 32];
815 cid_array.copy_from_slice(&cid_bytes);
816 referenced_cids.insert(Cid::new(cid_array));
817 }
818 }
819 }
820 }
821 }
822
823 let mut shards_write = match self.shards.write() {
825 Ok(guard) => guard,
826 Err(poisoned) => poisoned.into_inner(),
827 };
828 for (cid, shard) in shards {
829 if !referenced_cids.contains(&cid) {
830 let shard_size = shard.data.len() as u64 + ShardHeader::SIZE as u64;
831 shards_write.remove(&cid);
832 shards_deleted += 1;
833 bytes_freed += shard_size;
834 }
835 }
836 drop(shards_write);
837
838 let duration_ms = start_time.elapsed().as_millis() as u64;
839
840 Ok(GcReport {
841 shards_deleted,
842 bytes_freed,
843 duration_ms,
844 })
845 }
846}
847
848#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
850pub struct NodeEndpoint {
851 pub address: String,
853 pub port: u16,
855 pub node_id: Option<[u8; 32]>,
857}
858
859pub struct NetworkStorage {
861 nodes: Vec<NodeEndpoint>,
863 replication: usize,
865}
866
867impl NetworkStorage {
868 pub fn new(nodes: Vec<NodeEndpoint>, replication: usize) -> Self {
870 Self { nodes, replication }
871 }
872
873 fn select_nodes(&self, shard_id: &[u8; 32]) -> Vec<&NodeEndpoint> {
875 let mut selected = Vec::new();
877 let target_count = self.replication.min(self.nodes.len());
878
879 for i in 0..target_count {
881 let hash_offset = i * 4;
882 let index = if hash_offset + 3 < shard_id.len() {
883 u32::from_le_bytes([
884 shard_id[hash_offset],
885 shard_id[hash_offset + 1],
886 shard_id[hash_offset + 2],
887 shard_id[hash_offset + 3],
888 ]) as usize
889 } else {
890 shard_id
892 .iter()
893 .enumerate()
894 .map(|(j, &b)| (j + i) * b as usize)
895 .sum::<usize>()
896 };
897
898 let mut node_index = index % self.nodes.len();
899 let mut attempts = 0;
900
901 while selected.iter().any(|n| *n == &self.nodes[node_index])
903 && attempts < self.nodes.len()
904 {
905 node_index = (node_index + 1) % self.nodes.len();
906 attempts += 1;
907 }
908
909 if attempts < self.nodes.len() {
910 selected.push(&self.nodes[node_index]);
911 }
912 }
913
914 selected
915 }
916}
917
918#[async_trait]
919impl StorageBackend for NetworkStorage {
920 async fn put_shard(&self, cid: &Cid, _shard: &Shard) -> Result<(), FecError> {
921 let nodes = self.select_nodes(cid.as_bytes());
922
923 if nodes.is_empty() {
924 return Err(FecError::Backend(
925 "No nodes available for storage".to_string(),
926 ));
927 }
928
929 let mut success_count = 0;
931
932 for node in nodes {
933 tracing::debug!(
936 "Storing shard {} to node: {}:{}",
937 cid.to_hex(),
938 node.address,
939 node.port
940 );
941 success_count += 1;
942 }
943
944 if success_count == 0 {
945 return Err(FecError::Backend(
946 "Failed to store shard to any node".to_string(),
947 ));
948 }
949
950 Ok(())
951 }
952
953 async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
954 let nodes = self.select_nodes(cid.as_bytes());
955
956 if let Some(node) = nodes.into_iter().next() {
957 tracing::debug!(
960 "Retrieving shard {} from node: {}:{}",
961 cid.to_hex(),
962 node.address,
963 node.port
964 );
965
966 let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 1024, [0u8; 32]);
968 let shard = Shard::new(header, vec![0u8; 1024]);
969 return Ok(shard);
970 }
971
972 Err(FecError::Backend("Shard not found on any node".to_string()))
973 }
974
975 async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
976 let nodes = self.select_nodes(cid.as_bytes());
977
978 for node in nodes {
979 tracing::debug!(
981 "Deleting shard {} from node: {}:{}",
982 cid.to_hex(),
983 node.address,
984 node.port
985 );
986 }
987
988 Ok(())
989 }
990
991 async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
992 let nodes = self.select_nodes(cid.as_bytes());
993
994 if let Some(node) = nodes.into_iter().next() {
995 tracing::debug!(
997 "Checking shard {} on node: {}:{}",
998 cid.to_hex(),
999 node.address,
1000 node.port
1001 );
1002 return Ok(true); }
1004
1005 Ok(false)
1006 }
1007
1008 async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
1009 Ok(Vec::new())
1012 }
1013
1014 async fn put_metadata(&self, _metadata: &FileMetadata) -> Result<(), FecError> {
1015 Ok(())
1018 }
1019
1020 async fn get_metadata(&self, _file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
1021 Err(FecError::Backend(
1024 "Network metadata retrieval not implemented".to_string(),
1025 ))
1026 }
1027
1028 async fn delete_metadata(&self, _file_id: &[u8; 32]) -> Result<(), FecError> {
1029 Ok(())
1031 }
1032
1033 async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
1034 Ok(Vec::new())
1036 }
1037
1038 async fn stats(&self) -> Result<StorageStats, FecError> {
1039 Ok(StorageStats {
1041 total_shards: 0,
1042 total_size: 0,
1043 metadata_count: 0,
1044 unreferenced_shards: 0,
1045 })
1046 }
1047
1048 async fn garbage_collect(&self) -> Result<GcReport, FecError> {
1049 Ok(GcReport {
1051 shards_deleted: 0,
1052 bytes_freed: 0,
1053 duration_ms: 0,
1054 })
1055 }
1056}
1057
1058pub struct MultiStorage {
1061 backends: Vec<Arc<dyn StorageBackend>>,
1063 strategy: MultiStorageStrategy,
1065}
1066
1067#[derive(Debug, Clone)]
1069pub enum MultiStorageStrategy {
1070 Redundant,
1072 LoadBalance,
1074 Failover,
1076}
1077
1078impl MultiStorage {
1079 pub fn new(backends: Vec<Arc<dyn StorageBackend>>) -> Self {
1081 Self {
1082 backends,
1083 strategy: MultiStorageStrategy::Redundant,
1084 }
1085 }
1086
1087 pub fn with_strategy(
1089 backends: Vec<Arc<dyn StorageBackend>>,
1090 strategy: MultiStorageStrategy,
1091 ) -> Self {
1092 Self { backends, strategy }
1093 }
1094
1095 pub fn add_backend(&mut self, backend: Arc<dyn StorageBackend>) {
1097 self.backends.push(backend);
1098 }
1099
1100 pub fn remove_backend(&mut self, index: usize) -> Option<Arc<dyn StorageBackend>> {
1102 if index < self.backends.len() {
1103 Some(self.backends.remove(index))
1104 } else {
1105 None
1106 }
1107 }
1108
1109 pub fn backend_count(&self) -> usize {
1111 self.backends.len()
1112 }
1113}
1114
1115#[async_trait]
1116impl StorageBackend for MultiStorage {
1117 async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
1118 match self.strategy {
1119 MultiStorageStrategy::Redundant => {
1120 let mut success_count = 0;
1122 let mut last_error = None;
1123
1124 for backend in &self.backends {
1125 match backend.put_shard(cid, shard).await {
1126 Ok(()) => success_count += 1,
1127 Err(e) => {
1128 tracing::warn!("Failed to store shard in backend: {}", e);
1129 last_error = Some(e);
1130 }
1131 }
1132 }
1133
1134 if success_count > 0 {
1135 Ok(())
1136 } else if let Some(e) = last_error {
1137 Err(e)
1138 } else {
1139 Err(FecError::Backend("No backends available".to_string()))
1140 }
1141 }
1142 MultiStorageStrategy::LoadBalance => {
1143 let index = cid.as_bytes()[0] as usize % self.backends.len();
1145 self.backends[index].put_shard(cid, shard).await
1146 }
1147 MultiStorageStrategy::Failover => {
1148 for backend in &self.backends {
1150 match backend.put_shard(cid, shard).await {
1151 Ok(()) => return Ok(()),
1152 Err(e) => {
1153 tracing::warn!("Backend failed, trying next: {}", e);
1154 }
1155 }
1156 }
1157 Err(FecError::Backend("All backends failed".to_string()))
1158 }
1159 }
1160 }
1161
1162 async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
1163 for backend in &self.backends {
1165 match backend.get_shard(cid).await {
1166 Ok(shard) => return Ok(shard),
1167 Err(e) => {
1168 tracing::debug!("Backend failed to get shard: {}", e);
1169 }
1170 }
1171 }
1172
1173 Err(FecError::Backend(
1174 "Shard not found in any backend".to_string(),
1175 ))
1176 }
1177
1178 async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
1179 for backend in &self.backends {
1181 if let Err(e) = backend.delete_shard(cid).await {
1182 tracing::warn!("Failed to delete shard from backend: {}", e);
1183 }
1184 }
1185 Ok(())
1186 }
1187
1188 async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
1189 for backend in &self.backends {
1191 if backend.has_shard(cid).await? {
1192 return Ok(true);
1193 }
1194 }
1195 Ok(false)
1196 }
1197
1198 async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
1199 let mut all_shards = std::collections::HashSet::new();
1200
1201 for backend in &self.backends {
1203 if let Ok(shards) = backend.list_shards().await {
1204 all_shards.extend(shards);
1205 }
1206 }
1207
1208 Ok(all_shards.into_iter().collect())
1209 }
1210
1211 async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
1212 match self.strategy {
1213 MultiStorageStrategy::Redundant => {
1214 let mut success_count = 0;
1216 let mut last_error = None;
1217
1218 for backend in &self.backends {
1219 match backend.put_metadata(metadata).await {
1220 Ok(()) => success_count += 1,
1221 Err(e) => {
1222 tracing::warn!("Failed to store metadata in backend: {}", e);
1223 last_error = Some(e);
1224 }
1225 }
1226 }
1227
1228 if success_count > 0 {
1229 Ok(())
1230 } else if let Some(e) = last_error {
1231 Err(e)
1232 } else {
1233 Err(FecError::Backend("No backends available".to_string()))
1234 }
1235 }
1236 MultiStorageStrategy::LoadBalance => {
1237 let index = metadata.file_id[0] as usize % self.backends.len();
1239 self.backends[index].put_metadata(metadata).await
1240 }
1241 MultiStorageStrategy::Failover => {
1242 for backend in &self.backends {
1244 match backend.put_metadata(metadata).await {
1245 Ok(()) => return Ok(()),
1246 Err(e) => {
1247 tracing::warn!("Backend failed, trying next: {}", e);
1248 }
1249 }
1250 }
1251 Err(FecError::Backend("All backends failed".to_string()))
1252 }
1253 }
1254 }
1255
1256 async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
1257 for backend in &self.backends {
1259 match backend.get_metadata(file_id).await {
1260 Ok(metadata) => return Ok(metadata),
1261 Err(e) => {
1262 tracing::debug!("Backend failed to get metadata: {}", e);
1263 }
1264 }
1265 }
1266
1267 Err(FecError::Backend(
1268 "Metadata not found in any backend".to_string(),
1269 ))
1270 }
1271
1272 async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
1273 for backend in &self.backends {
1275 if let Err(e) = backend.delete_metadata(file_id).await {
1276 tracing::warn!("Failed to delete metadata from backend: {}", e);
1277 }
1278 }
1279 Ok(())
1280 }
1281
1282 async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
1283 let mut all_metadata = std::collections::HashMap::new();
1284
1285 for backend in &self.backends {
1287 if let Ok(metadata_list) = backend.list_metadata().await {
1288 for metadata in metadata_list {
1289 all_metadata.insert(metadata.file_id, metadata);
1290 }
1291 }
1292 }
1293
1294 Ok(all_metadata.into_values().collect())
1295 }
1296
1297 async fn stats(&self) -> Result<StorageStats, FecError> {
1298 let mut combined_stats = StorageStats {
1299 total_shards: 0,
1300 total_size: 0,
1301 metadata_count: 0,
1302 unreferenced_shards: 0,
1303 };
1304
1305 for backend in &self.backends {
1307 if let Ok(stats) = backend.stats().await {
1308 combined_stats.total_shards += stats.total_shards;
1309 combined_stats.total_size += stats.total_size;
1310 combined_stats.metadata_count += stats.metadata_count;
1311 combined_stats.unreferenced_shards += stats.unreferenced_shards;
1312 }
1313 }
1314
1315 Ok(combined_stats)
1316 }
1317
1318 async fn garbage_collect(&self) -> Result<GcReport, FecError> {
1319 let mut combined_report = GcReport {
1320 shards_deleted: 0,
1321 bytes_freed: 0,
1322 duration_ms: 0,
1323 };
1324
1325 let start_time = std::time::Instant::now();
1326
1327 for backend in &self.backends {
1329 if let Ok(report) = backend.garbage_collect().await {
1330 combined_report.shards_deleted += report.shards_deleted;
1331 combined_report.bytes_freed += report.bytes_freed;
1332 }
1333 }
1334
1335 combined_report.duration_ms = start_time.elapsed().as_millis() as u64;
1336
1337 Ok(combined_report)
1338 }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343 use super::*;
1344 use tempfile::TempDir;
1345
1346 #[tokio::test]
1347 async fn test_local_storage_roundtrip() {
1348 let temp_dir = TempDir::new().unwrap();
1349 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
1350 .await
1351 .unwrap();
1352
1353 let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 13, [1u8; 32]);
1354 let shard = Shard::new(header, b"Hello, World!".to_vec());
1355 let cid = shard.cid().unwrap();
1356
1357 storage.put_shard(&cid, &shard).await.unwrap();
1359
1360 assert!(storage.has_shard(&cid).await.unwrap());
1362
1363 let retrieved = storage.get_shard(&cid).await.unwrap();
1365 assert_eq!(retrieved.data, shard.data);
1366
1367 storage.delete_shard(&cid).await.unwrap();
1369 assert!(!storage.has_shard(&cid).await.unwrap());
1370 }
1371
1372 #[tokio::test]
1373 async fn test_local_storage_list() {
1374 let temp_dir = TempDir::new().unwrap();
1375 let storage = LocalStorage::new(temp_dir.path().to_path_buf())
1376 .await
1377 .unwrap();
1378
1379 let mut shards = Vec::new();
1381 let mut cids = Vec::new();
1382
1383 for i in 1..=3 {
1384 let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 4, [i; 32]);
1385 let shard = Shard::new(header, b"data".to_vec());
1386 let cid = shard.cid().unwrap();
1387 storage.put_shard(&cid, &shard).await.unwrap();
1388 shards.push(shard);
1389 cids.push(cid);
1390 }
1391
1392 let listed = storage.list_shards().await.unwrap();
1394 assert_eq!(listed.len(), 3);
1395
1396 for cid in cids {
1397 assert!(listed.contains(&cid));
1398 }
1399 }
1400
1401 #[test]
1402 fn test_network_storage_node_selection() {
1403 let nodes = vec![
1404 NodeEndpoint {
1405 address: "node1".to_string(),
1406 port: 8080,
1407 node_id: None,
1408 },
1409 NodeEndpoint {
1410 address: "node2".to_string(),
1411 port: 8080,
1412 node_id: None,
1413 },
1414 NodeEndpoint {
1415 address: "node3".to_string(),
1416 port: 8080,
1417 node_id: None,
1418 },
1419 ];
1420
1421 let storage = NetworkStorage::new(nodes, 2);
1422
1423 let shard_id = [42u8; 32];
1424 let selected = storage.select_nodes(&shard_id);
1425
1426 assert_eq!(selected.len(), 2);
1427
1428 let selected2 = storage.select_nodes(&shard_id);
1430 assert_eq!(selected, selected2);
1431
1432 let shard_id2 = [99u8; 32];
1434 let selected3 = storage.select_nodes(&shard_id2);
1435 assert_eq!(selected3.len(), 2);
1437 }
1438
1439 #[tokio::test]
1440 async fn test_multi_storage() {
1441 let temp_dir1 = TempDir::new().unwrap();
1442 let temp_dir2 = TempDir::new().unwrap();
1443
1444 let backend1 = Arc::new(
1445 LocalStorage::new(temp_dir1.path().to_path_buf())
1446 .await
1447 .unwrap(),
1448 );
1449 let backend2 = Arc::new(
1450 LocalStorage::new(temp_dir2.path().to_path_buf())
1451 .await
1452 .unwrap(),
1453 );
1454
1455 let multi = MultiStorage::new(vec![backend1.clone(), backend2.clone()]);
1456
1457 let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 9, [42u8; 32]);
1458 let shard = Shard::new(header, b"Test data".to_vec());
1459 let cid = shard.cid().unwrap();
1460
1461 multi.put_shard(&cid, &shard).await.unwrap();
1463
1464 assert!(backend1.has_shard(&cid).await.unwrap());
1466 assert!(backend2.has_shard(&cid).await.unwrap());
1467
1468 backend1.delete_shard(&cid).await.unwrap();
1470
1471 let retrieved = multi.get_shard(&cid).await.unwrap();
1473 assert_eq!(retrieved.data, shard.data);
1474 }
1475
1476 #[tokio::test]
1477 async fn test_memory_storage() {
1478 let storage = MemoryStorage::new();
1479
1480 let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 11, [1u8; 32]);
1481 let shard = Shard::new(header, b"Memory test".to_vec());
1482 let cid = shard.cid().unwrap();
1483
1484 storage.put_shard(&cid, &shard).await.unwrap();
1486
1487 assert!(storage.has_shard(&cid).await.unwrap());
1489 assert_eq!(storage.shard_count(), 1);
1490
1491 let retrieved = storage.get_shard(&cid).await.unwrap();
1493 assert_eq!(retrieved.data, shard.data);
1494
1495 let metadata = FileMetadata::new(
1497 [1u8; 32],
1498 1024,
1499 vec![ChunkMeta::new(
1500 (16, 4),
1501 EncryptionMode::Convergent,
1502 vec![cid.to_hex()],
1503 )],
1504 );
1505
1506 storage.put_metadata(&metadata).await.unwrap();
1507 assert_eq!(storage.metadata_count(), 1);
1508
1509 let retrieved_meta = storage.get_metadata(&metadata.file_id).await.unwrap();
1510 assert_eq!(retrieved_meta.file_id, metadata.file_id);
1511
1512 storage.clear();
1514 assert_eq!(storage.shard_count(), 0);
1515 assert_eq!(storage.metadata_count(), 0);
1516 }
1517
1518 #[tokio::test]
1519 async fn test_garbage_collection() {
1520 let storage = MemoryStorage::new();
1521
1522 let header1 = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 10, [1u8; 32]);
1524 let shard1 = Shard::new(header1, b"Unreferenced".to_vec());
1525 let cid1 = shard1.cid().unwrap();
1526 storage.put_shard(&cid1, &shard1).await.unwrap();
1527
1528 let header2 = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 10, [2u8; 32]);
1530 let shard2 = Shard::new(header2, b"Referenced".to_vec());
1531 let cid2 = shard2.cid().unwrap();
1532 storage.put_shard(&cid2, &shard2).await.unwrap();
1533
1534 let metadata = FileMetadata::new(
1536 [1u8; 32],
1537 1024,
1538 vec![ChunkMeta::new(
1539 (16, 4),
1540 EncryptionMode::Convergent,
1541 vec![cid2.to_hex()],
1542 )],
1543 );
1544 storage.put_metadata(&metadata).await.unwrap();
1545
1546 let gc_report = storage.garbage_collect().await.unwrap();
1548
1549 assert_eq!(gc_report.shards_deleted, 1);
1550 assert!(gc_report.bytes_freed > 0);
1551 assert!(!storage.has_shard(&cid1).await.unwrap()); assert!(storage.has_shard(&cid2).await.unwrap()); }
1554
1555 #[test]
1556 fn test_shard_header_serialization() {
1557 let header = ShardHeader::new(
1558 EncryptionMode::ConvergentWithSecret,
1559 (20, 5),
1560 2048,
1561 [42u8; 32],
1562 );
1563
1564 let bytes = header.to_bytes().unwrap();
1565 assert_eq!(bytes.len(), ShardHeader::SIZE);
1566
1567 let deserialized = ShardHeader::from_bytes(&bytes).unwrap();
1568 assert_eq!(deserialized.version, header.version);
1569 assert_eq!(deserialized.encryption_mode, header.encryption_mode);
1570 assert_eq!(deserialized.nspec, header.nspec);
1571 assert_eq!(deserialized.data_size, header.data_size);
1572 assert_eq!(deserialized.nonce, header.nonce);
1573 }
1574
1575 #[test]
1576 fn test_shard_cid_calculation() {
1577 let header = ShardHeader::new(EncryptionMode::RandomKey, (16, 4), 1024, [0u8; 32]);
1578 let shard = Shard::new(header, vec![1, 2, 3, 4, 5]);
1579
1580 let cid1 = shard.cid().unwrap();
1581 let cid2 = shard.cid().unwrap();
1582
1583 assert_eq!(cid1, cid2);
1585
1586 let shard2 = Shard::new(shard.header.clone(), vec![1, 2, 3, 4, 6]);
1588 let cid3 = shard2.cid().unwrap();
1589 assert_ne!(cid1, cid3);
1590 }
1591
1592 #[test]
1593 fn test_multi_storage_strategies() {
1594 let backend1 = Arc::new(MemoryStorage::new());
1595 let backend2 = Arc::new(MemoryStorage::new());
1596
1597 let redundant = MultiStorage::with_strategy(
1599 vec![backend1.clone(), backend2.clone()],
1600 MultiStorageStrategy::Redundant,
1601 );
1602
1603 let load_balance = MultiStorage::with_strategy(
1604 vec![backend1.clone(), backend2.clone()],
1605 MultiStorageStrategy::LoadBalance,
1606 );
1607
1608 let failover =
1609 MultiStorage::with_strategy(vec![backend1, backend2], MultiStorageStrategy::Failover);
1610
1611 assert_eq!(redundant.backend_count(), 2);
1612 assert_eq!(load_balance.backend_count(), 2);
1613 assert_eq!(failover.backend_count(), 2);
1614 }
1615
1616 #[test]
1617 fn test_cid_operations() {
1618 let data = b"test data";
1619 let cid1 = Cid::from_data(data);
1620 let cid2 = Cid::from_data(data);
1621
1622 assert_eq!(cid1, cid2);
1624
1625 let cid3 = Cid::from_data(b"different data");
1627 assert_ne!(cid1, cid3);
1628
1629 let hex = cid1.to_hex();
1631 assert_eq!(hex.len(), 64); let bytes = cid1.as_bytes();
1635 let cid4 = Cid::new(*bytes);
1636 assert_eq!(cid1, cid4);
1637 }
1638}