1use crate::ant_protocol::XorName;
13use crate::error::{Error, Result};
14use std::path::{Path, PathBuf};
15use tokio::fs;
16use tokio::io::AsyncWriteExt;
17use tracing::{debug, trace, warn};
18
19#[derive(Debug, Clone)]
21pub struct DiskStorageConfig {
22 pub root_dir: PathBuf,
24 pub verify_on_read: bool,
26 pub max_chunks: usize,
28}
29
30impl Default for DiskStorageConfig {
31 fn default() -> Self {
32 Self {
33 root_dir: PathBuf::from(".saorsa/chunks"),
34 verify_on_read: true,
35 max_chunks: 0,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct StorageStats {
43 pub chunks_stored: u64,
45 pub chunks_retrieved: u64,
47 pub bytes_stored: u64,
49 pub bytes_retrieved: u64,
51 pub duplicates: u64,
53 pub verification_failures: u64,
55}
56
57pub struct DiskStorage {
64 config: DiskStorageConfig,
66 stats: parking_lot::RwLock<StorageStats>,
68}
69
70impl DiskStorage {
71 pub async fn new(config: DiskStorageConfig) -> Result<Self> {
77 let chunks_dir = config.root_dir.join("chunks");
79 fs::create_dir_all(&chunks_dir)
80 .await
81 .map_err(|e| Error::Storage(format!("Failed to create chunks directory: {e}")))?;
82
83 debug!("Initialized disk storage at {:?}", config.root_dir);
84
85 Ok(Self {
86 config,
87 stats: parking_lot::RwLock::new(StorageStats::default()),
88 })
89 }
90
91 pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
108 let computed = Self::compute_address(content);
110 if computed != *address {
111 return Err(Error::Storage(format!(
112 "Content address mismatch: expected {}, computed {}",
113 hex::encode(address),
114 hex::encode(computed)
115 )));
116 }
117
118 let chunk_path = self.chunk_path(address);
119
120 if chunk_path.exists() {
122 trace!("Chunk {} already exists", hex::encode(address));
123 {
124 let mut stats = self.stats.write();
125 stats.duplicates += 1;
126 }
127 return Ok(false);
128 }
129
130 if self.config.max_chunks > 0 {
132 let chunks_stored = self.stats.read().chunks_stored;
133 if chunks_stored >= self.config.max_chunks as u64 {
134 return Err(Error::Storage(format!(
135 "Storage capacity reached: {} chunks stored, max is {}",
136 chunks_stored, self.config.max_chunks
137 )));
138 }
139 }
140
141 if let Some(parent) = chunk_path.parent() {
143 fs::create_dir_all(parent)
144 .await
145 .map_err(|e| Error::Storage(format!("Failed to create shard directory: {e}")))?;
146 }
147
148 let temp_path = chunk_path.with_extension("tmp");
150 let mut file = fs::File::create(&temp_path)
151 .await
152 .map_err(|e| Error::Storage(format!("Failed to create temp file: {e}")))?;
153
154 file.write_all(content)
155 .await
156 .map_err(|e| Error::Storage(format!("Failed to write chunk: {e}")))?;
157
158 file.flush()
159 .await
160 .map_err(|e| Error::Storage(format!("Failed to flush chunk: {e}")))?;
161
162 fs::rename(&temp_path, &chunk_path)
164 .await
165 .map_err(|e| Error::Storage(format!("Failed to rename temp file: {e}")))?;
166
167 {
168 let mut stats = self.stats.write();
169 stats.chunks_stored += 1;
170 stats.bytes_stored += content.len() as u64;
171 }
172
173 debug!(
174 "Stored chunk {} ({} bytes)",
175 hex::encode(address),
176 content.len()
177 );
178
179 Ok(true)
180 }
181
182 pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
196 let chunk_path = self.chunk_path(address);
197
198 if !chunk_path.exists() {
199 trace!("Chunk {} not found", hex::encode(address));
200 return Ok(None);
201 }
202
203 let content = fs::read(&chunk_path)
204 .await
205 .map_err(|e| Error::Storage(format!("Failed to read chunk: {e}")))?;
206
207 if self.config.verify_on_read {
209 let computed = Self::compute_address(&content);
210 if computed != *address {
211 {
212 let mut stats = self.stats.write();
213 stats.verification_failures += 1;
214 }
215 warn!(
216 "Chunk verification failed: expected {}, computed {}",
217 hex::encode(address),
218 hex::encode(computed)
219 );
220 return Err(Error::Storage(format!(
221 "Chunk verification failed for {}",
222 hex::encode(address)
223 )));
224 }
225 }
226
227 {
228 let mut stats = self.stats.write();
229 stats.chunks_retrieved += 1;
230 stats.bytes_retrieved += content.len() as u64;
231 }
232
233 debug!(
234 "Retrieved chunk {} ({} bytes)",
235 hex::encode(address),
236 content.len()
237 );
238
239 Ok(Some(content))
240 }
241
242 #[must_use]
244 pub fn exists(&self, address: &XorName) -> bool {
245 self.chunk_path(address).exists()
246 }
247
248 pub async fn delete(&self, address: &XorName) -> Result<bool> {
254 let chunk_path = self.chunk_path(address);
255
256 if !chunk_path.exists() {
257 return Ok(false);
258 }
259
260 fs::remove_file(&chunk_path)
261 .await
262 .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
263
264 debug!("Deleted chunk {}", hex::encode(address));
265
266 Ok(true)
267 }
268
269 #[must_use]
271 pub fn stats(&self) -> StorageStats {
272 self.stats.read().clone()
273 }
274
275 fn chunk_path(&self, address: &XorName) -> PathBuf {
277 let shard1 = format!("{:02x}", address[0]);
279 let shard2 = format!("{:02x}", address[1]);
280 let filename = format!("{}.chunk", hex::encode(address));
281
282 self.config
283 .root_dir
284 .join("chunks")
285 .join(shard1)
286 .join(shard2)
287 .join(filename)
288 }
289
290 #[must_use]
292 pub fn compute_address(content: &[u8]) -> XorName {
293 crate::client::compute_address(content)
294 }
295
296 #[must_use]
298 pub fn root_dir(&self) -> &Path {
299 &self.config.root_dir
300 }
301}
302
303#[cfg(test)]
304#[allow(clippy::unwrap_used, clippy::expect_used)]
305mod tests {
306 use super::*;
307 use tempfile::TempDir;
308
309 async fn create_test_storage() -> (DiskStorage, TempDir) {
310 let temp_dir = TempDir::new().expect("create temp dir");
311 let config = DiskStorageConfig {
312 root_dir: temp_dir.path().to_path_buf(),
313 verify_on_read: true,
314 max_chunks: 0,
315 };
316 let storage = DiskStorage::new(config).await.expect("create storage");
317 (storage, temp_dir)
318 }
319
320 #[tokio::test]
321 async fn test_put_and_get() {
322 let (storage, _temp) = create_test_storage().await;
323
324 let content = b"hello world";
325 let address = DiskStorage::compute_address(content);
326
327 let is_new = storage.put(&address, content).await.expect("put");
329 assert!(is_new);
330
331 let retrieved = storage.get(&address).await.expect("get");
333 assert_eq!(retrieved, Some(content.to_vec()));
334 }
335
336 #[tokio::test]
337 async fn test_put_duplicate() {
338 let (storage, _temp) = create_test_storage().await;
339
340 let content = b"test data";
341 let address = DiskStorage::compute_address(content);
342
343 let is_new1 = storage.put(&address, content).await.expect("put 1");
345 assert!(is_new1);
346
347 let is_new2 = storage.put(&address, content).await.expect("put 2");
349 assert!(!is_new2);
350
351 let stats = storage.stats();
353 assert_eq!(stats.chunks_stored, 1);
354 assert_eq!(stats.duplicates, 1);
355 }
356
357 #[tokio::test]
358 async fn test_get_not_found() {
359 let (storage, _temp) = create_test_storage().await;
360
361 let address = [0xAB; 32];
362 let result = storage.get(&address).await.expect("get");
363 assert!(result.is_none());
364 }
365
366 #[tokio::test]
367 async fn test_exists() {
368 let (storage, _temp) = create_test_storage().await;
369
370 let content = b"exists test";
371 let address = DiskStorage::compute_address(content);
372
373 assert!(!storage.exists(&address));
374
375 storage.put(&address, content).await.expect("put");
376
377 assert!(storage.exists(&address));
378 }
379
380 #[tokio::test]
381 async fn test_delete() {
382 let (storage, _temp) = create_test_storage().await;
383
384 let content = b"delete test";
385 let address = DiskStorage::compute_address(content);
386
387 storage.put(&address, content).await.expect("put");
389 assert!(storage.exists(&address));
390
391 let deleted = storage.delete(&address).await.expect("delete");
393 assert!(deleted);
394 assert!(!storage.exists(&address));
395
396 let deleted2 = storage.delete(&address).await.expect("delete 2");
398 assert!(!deleted2);
399 }
400
401 #[tokio::test]
402 async fn test_max_chunks_enforced() {
403 let temp_dir = TempDir::new().expect("create temp dir");
404 let config = DiskStorageConfig {
405 root_dir: temp_dir.path().to_path_buf(),
406 verify_on_read: true,
407 max_chunks: 2,
408 };
409 let storage = DiskStorage::new(config).await.expect("create storage");
410
411 let content1 = b"chunk one";
412 let content2 = b"chunk two";
413 let content3 = b"chunk three";
414 let addr1 = DiskStorage::compute_address(content1);
415 let addr2 = DiskStorage::compute_address(content2);
416 let addr3 = DiskStorage::compute_address(content3);
417
418 assert!(storage.put(&addr1, content1).await.is_ok());
420 assert!(storage.put(&addr2, content2).await.is_ok());
421
422 let result = storage.put(&addr3, content3).await;
424 assert!(result.is_err());
425 assert!(result.unwrap_err().to_string().contains("capacity reached"));
426 }
427
428 #[tokio::test]
429 async fn test_address_mismatch() {
430 let (storage, _temp) = create_test_storage().await;
431
432 let content = b"some content";
433 let wrong_address = [0xFF; 32]; let result = storage.put(&wrong_address, content).await;
436 assert!(result.is_err());
437 assert!(result.unwrap_err().to_string().contains("mismatch"));
438 }
439
440 #[tokio::test]
441 async fn test_chunk_path_sharding() {
442 let (storage, _temp) = create_test_storage().await;
443
444 let mut address = [0u8; 32];
446 address[0] = 0xAB;
447 address[1] = 0xCD;
448
449 let path = storage.chunk_path(&address);
450 let path_str = path.to_string_lossy();
451
452 assert!(path_str.contains("ab"));
454 assert!(path_str.contains("cd"));
455 assert!(path_str.ends_with(".chunk"));
456 }
457
458 #[test]
459 fn test_compute_address() {
460 let content = b"hello world";
462 let address = DiskStorage::compute_address(content);
463
464 let expected_hex = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9";
465 assert_eq!(hex::encode(address), expected_hex);
466 }
467
468 #[tokio::test]
469 async fn test_stats() {
470 let (storage, _temp) = create_test_storage().await;
471
472 let content1 = b"content 1";
473 let content2 = b"content 2";
474 let address1 = DiskStorage::compute_address(content1);
475 let address2 = DiskStorage::compute_address(content2);
476
477 storage.put(&address1, content1).await.expect("put 1");
479 storage.put(&address2, content2).await.expect("put 2");
480
481 storage.get(&address1).await.expect("get");
483
484 let stats = storage.stats();
485 assert_eq!(stats.chunks_stored, 2);
486 assert_eq!(stats.chunks_retrieved, 1);
487 assert_eq!(
488 stats.bytes_stored,
489 content1.len() as u64 + content2.len() as u64
490 );
491 assert_eq!(stats.bytes_retrieved, content1.len() as u64);
492 }
493}