ipfrs_storage/
paritydb.rs

1//! Block storage implementation using ParityDB
2//!
3//! ParityDB is optimized for SSD storage with better write amplification
4//! compared to Sled. It uses column-based storage layout and is designed
5//! for high-throughput workloads.
6
7use crate::traits::BlockStore;
8use async_trait::async_trait;
9use ipfrs_core::{Block, Cid, Error, Result};
10use parity_db::{Db, Options};
11use std::path::PathBuf;
12use std::sync::Arc;
13
14/// Column for storing blocks
15const BLOCKS_COLUMN: u8 = 0;
16
17/// Configuration preset types for ParityDB
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ParityDbPreset {
20    /// Optimized for write-heavy ingestion workloads
21    FastWrite,
22    /// Balanced configuration for general purpose use
23    Balanced,
24    /// Low memory usage for constrained devices
25    LowMemory,
26}
27
28/// ParityDB block store configuration
29#[derive(Debug, Clone)]
30pub struct ParityDbConfig {
31    /// Path to the database directory
32    pub path: PathBuf,
33    /// Configuration preset
34    pub preset: ParityDbPreset,
35    /// Custom column options (overrides preset if provided)
36    pub custom_options: Option<Options>,
37}
38
39impl ParityDbConfig {
40    /// Create a new configuration with a preset
41    pub fn new(path: PathBuf, preset: ParityDbPreset) -> Self {
42        Self {
43            path,
44            preset,
45            custom_options: None,
46        }
47    }
48
49    /// Create configuration optimized for fast writes
50    pub fn fast_write(path: PathBuf) -> Self {
51        Self::new(path, ParityDbPreset::FastWrite)
52    }
53
54    /// Create configuration for balanced workloads
55    pub fn balanced(path: PathBuf) -> Self {
56        Self::new(path, ParityDbPreset::Balanced)
57    }
58
59    /// Create configuration for low memory usage
60    pub fn low_memory(path: PathBuf) -> Self {
61        Self::new(path, ParityDbPreset::LowMemory)
62    }
63
64    /// Build ParityDB Options from configuration
65    fn build_options(&self) -> Options {
66        if let Some(ref custom) = self.custom_options {
67            return custom.clone();
68        }
69
70        let mut options = Options::with_columns(&self.path, 1);
71
72        match self.preset {
73            ParityDbPreset::FastWrite => {
74                // Optimize for write throughput
75                // Larger write buffer, more aggressive compression
76                options.columns[BLOCKS_COLUMN as usize].btree_index = true;
77                options.columns[BLOCKS_COLUMN as usize].compression =
78                    parity_db::CompressionType::Lz4;
79                options.sync_wal = false; // Async WAL for better write performance
80                options.sync_data = false; // Async data writes
81            }
82            ParityDbPreset::Balanced => {
83                // Balanced settings
84                options.columns[BLOCKS_COLUMN as usize].btree_index = true;
85                options.columns[BLOCKS_COLUMN as usize].compression =
86                    parity_db::CompressionType::Lz4;
87                options.sync_wal = true;
88                options.sync_data = false;
89            }
90            ParityDbPreset::LowMemory => {
91                // Minimize memory usage
92                options.columns[BLOCKS_COLUMN as usize].btree_index = false; // No index to save memory
93                options.columns[BLOCKS_COLUMN as usize].compression =
94                    parity_db::CompressionType::Lz4;
95                options.sync_wal = true;
96                options.sync_data = true;
97            }
98        }
99
100        options
101    }
102}
103
104impl Default for ParityDbConfig {
105    fn default() -> Self {
106        Self::balanced(PathBuf::from(".ipfrs/blocks-paritydb"))
107    }
108}
109
110/// Block storage using ParityDB
111pub struct ParityDbBlockStore {
112    db: Arc<Db>,
113}
114
115impl ParityDbBlockStore {
116    /// Create a new ParityDB block store
117    pub fn new(config: ParityDbConfig) -> Result<Self> {
118        // Create parent directory if it doesn't exist
119        if let Some(parent) = config.path.parent() {
120            std::fs::create_dir_all(parent)
121                .map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
122        }
123
124        let options = config.build_options();
125
126        let db = Db::open_or_create(&options)
127            .map_err(|e| Error::Storage(format!("Failed to open ParityDB: {e}")))?;
128
129        Ok(Self { db: Arc::new(db) })
130    }
131
132    /// Get reference to underlying database
133    pub fn db(&self) -> &Arc<Db> {
134        &self.db
135    }
136}
137
138#[async_trait]
139impl BlockStore for ParityDbBlockStore {
140    /// Store a block
141    async fn put(&self, block: &Block) -> Result<()> {
142        let key = block.cid().to_bytes();
143        let value = block.data().to_vec();
144
145        let transaction = vec![(BLOCKS_COLUMN, key, Some(value))];
146
147        self.db
148            .commit(transaction)
149            .map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
150
151        Ok(())
152    }
153
154    /// Retrieve a block by CID
155    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
156        let key = cid.to_bytes();
157
158        match self.db.get(BLOCKS_COLUMN, &key) {
159            Ok(Some(value)) => {
160                let data = bytes::Bytes::from(value);
161                Ok(Some(Block::from_parts(*cid, data)))
162            }
163            Ok(None) => Ok(None),
164            Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
165        }
166    }
167
168    /// Check if a block exists
169    async fn has(&self, cid: &Cid) -> Result<bool> {
170        let key = cid.to_bytes();
171        match self.db.get(BLOCKS_COLUMN, &key) {
172            Ok(Some(_)) => Ok(true),
173            Ok(None) => Ok(false),
174            Err(e) => Err(Error::Storage(format!("Failed to check block: {e}"))),
175        }
176    }
177
178    /// Delete a block
179    async fn delete(&self, cid: &Cid) -> Result<()> {
180        let key = cid.to_bytes();
181
182        let transaction = vec![(BLOCKS_COLUMN, key, None)];
183
184        self.db
185            .commit(transaction)
186            .map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
187
188        Ok(())
189    }
190
191    /// Get the number of blocks stored
192    fn len(&self) -> usize {
193        // ParityDB doesn't have a direct len() method
194        // We need to iterate to count (expensive operation)
195        // For performance, return 0 and users should track this separately
196        // or use a separate counter if needed
197        0
198    }
199
200    /// Check if the store is empty
201    fn is_empty(&self) -> bool {
202        // Since len() is not efficient, we can't reliably check emptiness
203        // Return false as a safe default
204        false
205    }
206
207    /// Get all CIDs in the store
208    fn list_cids(&self) -> Result<Vec<Cid>> {
209        let mut cids = Vec::new();
210
211        let mut iter = self
212            .db
213            .iter(BLOCKS_COLUMN)
214            .map_err(|e| Error::Storage(format!("Failed to create iterator: {e}")))?;
215
216        while let Some((key, _value)) = iter
217            .next()
218            .map_err(|e| Error::Storage(format!("Iterator error: {e}")))?
219        {
220            // Parse CID from key bytes
221            let cid = Cid::try_from(key.to_vec())
222                .map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
223
224            cids.push(cid);
225        }
226
227        Ok(cids)
228    }
229
230    /// Store multiple blocks atomically
231    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
232        let mut transaction = Vec::new();
233
234        for block in blocks {
235            let key = block.cid().to_bytes();
236            let value = block.data().to_vec();
237            transaction.push((BLOCKS_COLUMN, key, Some(value)));
238        }
239
240        self.db
241            .commit(transaction)
242            .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
243
244        Ok(())
245    }
246
247    /// Retrieve multiple blocks efficiently
248    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
249        let mut results = Vec::with_capacity(cids.len());
250
251        for cid in cids {
252            let key = cid.to_bytes();
253            match self.db.get(BLOCKS_COLUMN, &key) {
254                Ok(Some(value)) => {
255                    let data = bytes::Bytes::from(value);
256                    results.push(Some(Block::from_parts(*cid, data)));
257                }
258                Ok(None) => results.push(None),
259                Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
260            }
261        }
262
263        Ok(results)
264    }
265
266    /// Check if multiple blocks exist efficiently
267    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
268        let mut results = Vec::with_capacity(cids.len());
269
270        for cid in cids {
271            let key = cid.to_bytes();
272            match self.db.get(BLOCKS_COLUMN, &key) {
273                Ok(Some(_)) => results.push(true),
274                Ok(None) => results.push(false),
275                Err(e) => return Err(Error::Storage(format!("Failed to check block: {e}"))),
276            }
277        }
278
279        Ok(results)
280    }
281
282    /// Delete multiple blocks atomically
283    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
284        let mut transaction = Vec::new();
285
286        for cid in cids {
287            let key = cid.to_bytes();
288            transaction.push((BLOCKS_COLUMN, key, None));
289        }
290
291        self.db
292            .commit(transaction)
293            .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
294
295        Ok(())
296    }
297
298    /// Flush pending writes to disk
299    async fn flush(&self) -> Result<()> {
300        // ParityDB commits are already durable
301        // No explicit flush needed
302        Ok(())
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use bytes::Bytes;
310
311    #[tokio::test]
312    async fn test_paritydb_put_get_block() {
313        let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb"));
314
315        // Clean up from previous test
316        let _ = std::fs::remove_dir_all(&config.path);
317
318        let store = ParityDbBlockStore::new(config).unwrap();
319        let data = Bytes::from("hello paritydb");
320        let block = Block::new(data.clone()).unwrap();
321
322        // Put block
323        store.put(&block).await.unwrap();
324
325        // Get block
326        let retrieved = store.get(block.cid()).await.unwrap();
327        assert!(retrieved.is_some());
328        assert_eq!(retrieved.unwrap().data(), &data);
329
330        // Check has
331        assert!(store.has(block.cid()).await.unwrap());
332
333        // Delete block
334        store.delete(block.cid()).await.unwrap();
335        assert!(!store.has(block.cid()).await.unwrap());
336    }
337
338    #[tokio::test]
339    async fn test_paritydb_batch_operations() {
340        let config = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-batch"));
341
342        // Clean up from previous test
343        let _ = std::fs::remove_dir_all(&config.path);
344
345        let store = ParityDbBlockStore::new(config).unwrap();
346
347        // Create multiple blocks
348        let blocks: Vec<Block> = (0..10)
349            .map(|i| {
350                let data = Bytes::from(format!("block {}", i));
351                Block::new(data).unwrap()
352            })
353            .collect();
354
355        // Batch put
356        store.put_many(&blocks).await.unwrap();
357
358        // Check all exist
359        let cids: Vec<Cid> = blocks.iter().map(|b| *b.cid()).collect();
360        let exists = store.has_many(&cids).await.unwrap();
361        assert!(exists.iter().all(|&x| x));
362
363        // Batch get
364        let retrieved = store.get_many(&cids).await.unwrap();
365        assert_eq!(retrieved.len(), blocks.len());
366        for (i, opt_block) in retrieved.iter().enumerate() {
367            assert!(opt_block.is_some());
368            assert_eq!(opt_block.as_ref().unwrap().data(), blocks[i].data());
369        }
370
371        // Batch delete
372        store.delete_many(&cids).await.unwrap();
373        let exists = store.has_many(&cids).await.unwrap();
374        assert!(exists.iter().all(|&x| !x));
375    }
376
377    #[tokio::test]
378    async fn test_paritydb_presets() {
379        // Test fast_write preset
380        let config1 = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-fast"));
381        let _ = std::fs::remove_dir_all(&config1.path);
382        assert_eq!(config1.preset, ParityDbPreset::FastWrite);
383        let _store1 = ParityDbBlockStore::new(config1).unwrap();
384
385        // Test balanced preset
386        let config2 = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-balanced"));
387        let _ = std::fs::remove_dir_all(&config2.path);
388        assert_eq!(config2.preset, ParityDbPreset::Balanced);
389        let _store2 = ParityDbBlockStore::new(config2).unwrap();
390
391        // Test low_memory preset
392        let config3 = ParityDbConfig::low_memory(PathBuf::from("/tmp/ipfrs-test-paritydb-lowmem"));
393        let _ = std::fs::remove_dir_all(&config3.path);
394        assert_eq!(config3.preset, ParityDbPreset::LowMemory);
395        let _store3 = ParityDbBlockStore::new(config3).unwrap();
396    }
397
398    #[tokio::test]
399    async fn test_paritydb_list_cids() {
400        let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-list"));
401
402        // Clean up from previous test
403        let _ = std::fs::remove_dir_all(&config.path);
404
405        let store = ParityDbBlockStore::new(config).unwrap();
406
407        // Create and store blocks
408        let blocks: Vec<Block> = (0..5)
409            .map(|i| {
410                let data = Bytes::from(format!("block {}", i));
411                Block::new(data).unwrap()
412            })
413            .collect();
414
415        store.put_many(&blocks).await.unwrap();
416
417        // List CIDs
418        let cids = store.list_cids().unwrap();
419        assert_eq!(cids.len(), 5);
420
421        // Verify all CIDs are present
422        for block in &blocks {
423            assert!(cids.contains(block.cid()));
424        }
425    }
426}