Skip to main content

saorsa_node/storage/
disk.rs

1//! Content-addressed disk storage with sharded directories.
2//!
3//! Provides persistent storage for chunks using a two-level directory structure
4//! to avoid large directory listings:
5//!
6//! ```text
7//! {root}/chunks/{xx}/{yy}/{address}.chunk
8//! ```
9//!
10//! Where `xx` and `yy` are the first two bytes of the address in hex.
11
12use crate::ant_protocol::XorName;
13use crate::error::{Error, Result};
14use std::path::{Path, PathBuf};
15use tokio::fs;
16use tokio::io::AsyncWriteExt;
17use tracing::{debug, trace, warn};
18
19/// Configuration for disk storage.
20#[derive(Debug, Clone)]
21pub struct DiskStorageConfig {
22    /// Root directory for chunk storage.
23    pub root_dir: PathBuf,
24    /// Whether to verify content on read (compares hash to address).
25    pub verify_on_read: bool,
26    /// Maximum number of chunks to store (0 = unlimited).
27    pub max_chunks: usize,
28}
29
30impl Default for DiskStorageConfig {
31    fn default() -> Self {
32        Self {
33            root_dir: PathBuf::from(".saorsa/chunks"),
34            verify_on_read: true,
35            max_chunks: 0,
36        }
37    }
38}
39
40/// Statistics about storage operations.
41#[derive(Debug, Clone, Default)]
42pub struct StorageStats {
43    /// Total number of chunks stored.
44    pub chunks_stored: u64,
45    /// Total number of chunks retrieved.
46    pub chunks_retrieved: u64,
47    /// Total bytes stored.
48    pub bytes_stored: u64,
49    /// Total bytes retrieved.
50    pub bytes_retrieved: u64,
51    /// Number of duplicate writes (already exists).
52    pub duplicates: u64,
53    /// Number of verification failures on read.
54    pub verification_failures: u64,
55}
56
57/// Content-addressed disk storage.
58///
59/// Uses a sharded directory structure for efficient storage:
60/// ```text
61/// {root}/chunks/{xx}/{yy}/{address}.chunk
62/// ```
63pub struct DiskStorage {
64    /// Storage configuration.
65    config: DiskStorageConfig,
66    /// Operation statistics.
67    stats: parking_lot::RwLock<StorageStats>,
68}
69
70impl DiskStorage {
71    /// Create a new disk storage instance.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the root directory cannot be created.
76    pub async fn new(config: DiskStorageConfig) -> Result<Self> {
77        // Ensure root directory exists
78        let chunks_dir = config.root_dir.join("chunks");
79        fs::create_dir_all(&chunks_dir)
80            .await
81            .map_err(|e| Error::Storage(format!("Failed to create chunks directory: {e}")))?;
82
83        debug!("Initialized disk storage at {:?}", config.root_dir);
84
85        Ok(Self {
86            config,
87            stats: parking_lot::RwLock::new(StorageStats::default()),
88        })
89    }
90
91    /// Store a chunk.
92    ///
93    /// Uses atomic write (temp file + rename) for crash safety.
94    ///
95    /// # Arguments
96    ///
97    /// * `address` - Content address (should be SHA256 of content)
98    /// * `content` - Chunk data
99    ///
100    /// # Returns
101    ///
102    /// Returns `true` if the chunk was newly stored, `false` if it already existed.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the write fails or content doesn't match address.
107    pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
108        // Verify content address
109        let computed = Self::compute_address(content);
110        if computed != *address {
111            return Err(Error::Storage(format!(
112                "Content address mismatch: expected {}, computed {}",
113                hex::encode(address),
114                hex::encode(computed)
115            )));
116        }
117
118        let chunk_path = self.chunk_path(address);
119
120        // Check if already exists
121        if chunk_path.exists() {
122            trace!("Chunk {} already exists", hex::encode(address));
123            {
124                let mut stats = self.stats.write();
125                stats.duplicates += 1;
126            }
127            return Ok(false);
128        }
129
130        // Enforce max_chunks capacity limit (0 = unlimited)
131        if self.config.max_chunks > 0 {
132            let chunks_stored = self.stats.read().chunks_stored;
133            if chunks_stored >= self.config.max_chunks as u64 {
134                return Err(Error::Storage(format!(
135                    "Storage capacity reached: {} chunks stored, max is {}",
136                    chunks_stored, self.config.max_chunks
137                )));
138            }
139        }
140
141        // Ensure parent directories exist
142        if let Some(parent) = chunk_path.parent() {
143            fs::create_dir_all(parent)
144                .await
145                .map_err(|e| Error::Storage(format!("Failed to create shard directory: {e}")))?;
146        }
147
148        // Atomic write: temp file + rename
149        let temp_path = chunk_path.with_extension("tmp");
150        let mut file = fs::File::create(&temp_path)
151            .await
152            .map_err(|e| Error::Storage(format!("Failed to create temp file: {e}")))?;
153
154        file.write_all(content)
155            .await
156            .map_err(|e| Error::Storage(format!("Failed to write chunk: {e}")))?;
157
158        file.flush()
159            .await
160            .map_err(|e| Error::Storage(format!("Failed to flush chunk: {e}")))?;
161
162        // Rename for atomic commit
163        fs::rename(&temp_path, &chunk_path)
164            .await
165            .map_err(|e| Error::Storage(format!("Failed to rename temp file: {e}")))?;
166
167        {
168            let mut stats = self.stats.write();
169            stats.chunks_stored += 1;
170            stats.bytes_stored += content.len() as u64;
171        }
172
173        debug!(
174            "Stored chunk {} ({} bytes)",
175            hex::encode(address),
176            content.len()
177        );
178
179        Ok(true)
180    }
181
182    /// Retrieve a chunk.
183    ///
184    /// # Arguments
185    ///
186    /// * `address` - Content address to retrieve
187    ///
188    /// # Returns
189    ///
190    /// Returns `Some(content)` if found, `None` if not found.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if read fails or verification fails.
195    pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
196        let chunk_path = self.chunk_path(address);
197
198        if !chunk_path.exists() {
199            trace!("Chunk {} not found", hex::encode(address));
200            return Ok(None);
201        }
202
203        let content = fs::read(&chunk_path)
204            .await
205            .map_err(|e| Error::Storage(format!("Failed to read chunk: {e}")))?;
206
207        // Verify content if configured
208        if self.config.verify_on_read {
209            let computed = Self::compute_address(&content);
210            if computed != *address {
211                {
212                    let mut stats = self.stats.write();
213                    stats.verification_failures += 1;
214                }
215                warn!(
216                    "Chunk verification failed: expected {}, computed {}",
217                    hex::encode(address),
218                    hex::encode(computed)
219                );
220                return Err(Error::Storage(format!(
221                    "Chunk verification failed for {}",
222                    hex::encode(address)
223                )));
224            }
225        }
226
227        {
228            let mut stats = self.stats.write();
229            stats.chunks_retrieved += 1;
230            stats.bytes_retrieved += content.len() as u64;
231        }
232
233        debug!(
234            "Retrieved chunk {} ({} bytes)",
235            hex::encode(address),
236            content.len()
237        );
238
239        Ok(Some(content))
240    }
241
242    /// Check if a chunk exists.
243    #[must_use]
244    pub fn exists(&self, address: &XorName) -> bool {
245        self.chunk_path(address).exists()
246    }
247
248    /// Delete a chunk.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if deletion fails.
253    pub async fn delete(&self, address: &XorName) -> Result<bool> {
254        let chunk_path = self.chunk_path(address);
255
256        if !chunk_path.exists() {
257            return Ok(false);
258        }
259
260        fs::remove_file(&chunk_path)
261            .await
262            .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
263
264        debug!("Deleted chunk {}", hex::encode(address));
265
266        Ok(true)
267    }
268
269    /// Get storage statistics.
270    #[must_use]
271    pub fn stats(&self) -> StorageStats {
272        self.stats.read().clone()
273    }
274
275    /// Get the path for a chunk.
276    fn chunk_path(&self, address: &XorName) -> PathBuf {
277        // Two-level sharding using first two bytes
278        let shard1 = format!("{:02x}", address[0]);
279        let shard2 = format!("{:02x}", address[1]);
280        let filename = format!("{}.chunk", hex::encode(address));
281
282        self.config
283            .root_dir
284            .join("chunks")
285            .join(shard1)
286            .join(shard2)
287            .join(filename)
288    }
289
290    /// Compute content address (SHA256 hash).
291    #[must_use]
292    pub fn compute_address(content: &[u8]) -> XorName {
293        crate::client::compute_address(content)
294    }
295
296    /// Get the root directory.
297    #[must_use]
298    pub fn root_dir(&self) -> &Path {
299        &self.config.root_dir
300    }
301}
302
303#[cfg(test)]
304#[allow(clippy::unwrap_used, clippy::expect_used)]
305mod tests {
306    use super::*;
307    use tempfile::TempDir;
308
309    async fn create_test_storage() -> (DiskStorage, TempDir) {
310        let temp_dir = TempDir::new().expect("create temp dir");
311        let config = DiskStorageConfig {
312            root_dir: temp_dir.path().to_path_buf(),
313            verify_on_read: true,
314            max_chunks: 0,
315        };
316        let storage = DiskStorage::new(config).await.expect("create storage");
317        (storage, temp_dir)
318    }
319
320    #[tokio::test]
321    async fn test_put_and_get() {
322        let (storage, _temp) = create_test_storage().await;
323
324        let content = b"hello world";
325        let address = DiskStorage::compute_address(content);
326
327        // Store chunk
328        let is_new = storage.put(&address, content).await.expect("put");
329        assert!(is_new);
330
331        // Retrieve chunk
332        let retrieved = storage.get(&address).await.expect("get");
333        assert_eq!(retrieved, Some(content.to_vec()));
334    }
335
336    #[tokio::test]
337    async fn test_put_duplicate() {
338        let (storage, _temp) = create_test_storage().await;
339
340        let content = b"test data";
341        let address = DiskStorage::compute_address(content);
342
343        // First store
344        let is_new1 = storage.put(&address, content).await.expect("put 1");
345        assert!(is_new1);
346
347        // Duplicate store
348        let is_new2 = storage.put(&address, content).await.expect("put 2");
349        assert!(!is_new2);
350
351        // Check stats
352        let stats = storage.stats();
353        assert_eq!(stats.chunks_stored, 1);
354        assert_eq!(stats.duplicates, 1);
355    }
356
357    #[tokio::test]
358    async fn test_get_not_found() {
359        let (storage, _temp) = create_test_storage().await;
360
361        let address = [0xAB; 32];
362        let result = storage.get(&address).await.expect("get");
363        assert!(result.is_none());
364    }
365
366    #[tokio::test]
367    async fn test_exists() {
368        let (storage, _temp) = create_test_storage().await;
369
370        let content = b"exists test";
371        let address = DiskStorage::compute_address(content);
372
373        assert!(!storage.exists(&address));
374
375        storage.put(&address, content).await.expect("put");
376
377        assert!(storage.exists(&address));
378    }
379
380    #[tokio::test]
381    async fn test_delete() {
382        let (storage, _temp) = create_test_storage().await;
383
384        let content = b"delete test";
385        let address = DiskStorage::compute_address(content);
386
387        // Store
388        storage.put(&address, content).await.expect("put");
389        assert!(storage.exists(&address));
390
391        // Delete
392        let deleted = storage.delete(&address).await.expect("delete");
393        assert!(deleted);
394        assert!(!storage.exists(&address));
395
396        // Delete again (already deleted)
397        let deleted2 = storage.delete(&address).await.expect("delete 2");
398        assert!(!deleted2);
399    }
400
401    #[tokio::test]
402    async fn test_max_chunks_enforced() {
403        let temp_dir = TempDir::new().expect("create temp dir");
404        let config = DiskStorageConfig {
405            root_dir: temp_dir.path().to_path_buf(),
406            verify_on_read: true,
407            max_chunks: 2,
408        };
409        let storage = DiskStorage::new(config).await.expect("create storage");
410
411        let content1 = b"chunk one";
412        let content2 = b"chunk two";
413        let content3 = b"chunk three";
414        let addr1 = DiskStorage::compute_address(content1);
415        let addr2 = DiskStorage::compute_address(content2);
416        let addr3 = DiskStorage::compute_address(content3);
417
418        // First two should succeed
419        assert!(storage.put(&addr1, content1).await.is_ok());
420        assert!(storage.put(&addr2, content2).await.is_ok());
421
422        // Third should be rejected
423        let result = storage.put(&addr3, content3).await;
424        assert!(result.is_err());
425        assert!(result.unwrap_err().to_string().contains("capacity reached"));
426    }
427
428    #[tokio::test]
429    async fn test_address_mismatch() {
430        let (storage, _temp) = create_test_storage().await;
431
432        let content = b"some content";
433        let wrong_address = [0xFF; 32]; // Wrong address
434
435        let result = storage.put(&wrong_address, content).await;
436        assert!(result.is_err());
437        assert!(result.unwrap_err().to_string().contains("mismatch"));
438    }
439
440    #[tokio::test]
441    async fn test_chunk_path_sharding() {
442        let (storage, _temp) = create_test_storage().await;
443
444        // Address starting with 0xAB, 0xCD...
445        let mut address = [0u8; 32];
446        address[0] = 0xAB;
447        address[1] = 0xCD;
448
449        let path = storage.chunk_path(&address);
450        let path_str = path.to_string_lossy();
451
452        // Should contain sharded directories
453        assert!(path_str.contains("ab"));
454        assert!(path_str.contains("cd"));
455        assert!(path_str.ends_with(".chunk"));
456    }
457
458    #[test]
459    fn test_compute_address() {
460        // Known SHA256 hash of "hello world"
461        let content = b"hello world";
462        let address = DiskStorage::compute_address(content);
463
464        let expected_hex = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9";
465        assert_eq!(hex::encode(address), expected_hex);
466    }
467
468    #[tokio::test]
469    async fn test_stats() {
470        let (storage, _temp) = create_test_storage().await;
471
472        let content1 = b"content 1";
473        let content2 = b"content 2";
474        let address1 = DiskStorage::compute_address(content1);
475        let address2 = DiskStorage::compute_address(content2);
476
477        // Store two chunks
478        storage.put(&address1, content1).await.expect("put 1");
479        storage.put(&address2, content2).await.expect("put 2");
480
481        // Retrieve one
482        storage.get(&address1).await.expect("get");
483
484        let stats = storage.stats();
485        assert_eq!(stats.chunks_stored, 2);
486        assert_eq!(stats.chunks_retrieved, 1);
487        assert_eq!(
488            stats.bytes_stored,
489            content1.len() as u64 + content2.len() as u64
490        );
491        assert_eq!(stats.bytes_retrieved, content1.len() as u64);
492    }
493}