ipfrs_storage/
paritydb.rs1use crate::traits::BlockStore;
8use async_trait::async_trait;
9use ipfrs_core::{Block, Cid, Error, Result};
10use parity_db::{Db, Options};
11use std::path::PathBuf;
12use std::sync::Arc;
13
14const BLOCKS_COLUMN: u8 = 0;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ParityDbPreset {
20 FastWrite,
22 Balanced,
24 LowMemory,
26}
27
28#[derive(Debug, Clone)]
30pub struct ParityDbConfig {
31 pub path: PathBuf,
33 pub preset: ParityDbPreset,
35 pub custom_options: Option<Options>,
37}
38
39impl ParityDbConfig {
40 pub fn new(path: PathBuf, preset: ParityDbPreset) -> Self {
42 Self {
43 path,
44 preset,
45 custom_options: None,
46 }
47 }
48
49 pub fn fast_write(path: PathBuf) -> Self {
51 Self::new(path, ParityDbPreset::FastWrite)
52 }
53
54 pub fn balanced(path: PathBuf) -> Self {
56 Self::new(path, ParityDbPreset::Balanced)
57 }
58
59 pub fn low_memory(path: PathBuf) -> Self {
61 Self::new(path, ParityDbPreset::LowMemory)
62 }
63
64 fn build_options(&self) -> Options {
66 if let Some(ref custom) = self.custom_options {
67 return custom.clone();
68 }
69
70 let mut options = Options::with_columns(&self.path, 1);
71
72 match self.preset {
73 ParityDbPreset::FastWrite => {
74 options.columns[BLOCKS_COLUMN as usize].btree_index = true;
77 options.columns[BLOCKS_COLUMN as usize].compression =
78 parity_db::CompressionType::Lz4;
79 options.sync_wal = false; options.sync_data = false; }
82 ParityDbPreset::Balanced => {
83 options.columns[BLOCKS_COLUMN as usize].btree_index = true;
85 options.columns[BLOCKS_COLUMN as usize].compression =
86 parity_db::CompressionType::Lz4;
87 options.sync_wal = true;
88 options.sync_data = false;
89 }
90 ParityDbPreset::LowMemory => {
91 options.columns[BLOCKS_COLUMN as usize].btree_index = false; options.columns[BLOCKS_COLUMN as usize].compression =
94 parity_db::CompressionType::Lz4;
95 options.sync_wal = true;
96 options.sync_data = true;
97 }
98 }
99
100 options
101 }
102}
103
104impl Default for ParityDbConfig {
105 fn default() -> Self {
106 Self::balanced(PathBuf::from(".ipfrs/blocks-paritydb"))
107 }
108}
109
110pub struct ParityDbBlockStore {
112 db: Arc<Db>,
113}
114
115impl ParityDbBlockStore {
116 pub fn new(config: ParityDbConfig) -> Result<Self> {
118 if let Some(parent) = config.path.parent() {
120 std::fs::create_dir_all(parent)
121 .map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
122 }
123
124 let options = config.build_options();
125
126 let db = Db::open_or_create(&options)
127 .map_err(|e| Error::Storage(format!("Failed to open ParityDB: {e}")))?;
128
129 Ok(Self { db: Arc::new(db) })
130 }
131
132 pub fn db(&self) -> &Arc<Db> {
134 &self.db
135 }
136}
137
138#[async_trait]
139impl BlockStore for ParityDbBlockStore {
140 async fn put(&self, block: &Block) -> Result<()> {
142 let key = block.cid().to_bytes();
143 let value = block.data().to_vec();
144
145 let transaction = vec![(BLOCKS_COLUMN, key, Some(value))];
146
147 self.db
148 .commit(transaction)
149 .map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
150
151 Ok(())
152 }
153
154 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
156 let key = cid.to_bytes();
157
158 match self.db.get(BLOCKS_COLUMN, &key) {
159 Ok(Some(value)) => {
160 let data = bytes::Bytes::from(value);
161 Ok(Some(Block::from_parts(*cid, data)))
162 }
163 Ok(None) => Ok(None),
164 Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
165 }
166 }
167
168 async fn has(&self, cid: &Cid) -> Result<bool> {
170 let key = cid.to_bytes();
171 match self.db.get(BLOCKS_COLUMN, &key) {
172 Ok(Some(_)) => Ok(true),
173 Ok(None) => Ok(false),
174 Err(e) => Err(Error::Storage(format!("Failed to check block: {e}"))),
175 }
176 }
177
178 async fn delete(&self, cid: &Cid) -> Result<()> {
180 let key = cid.to_bytes();
181
182 let transaction = vec![(BLOCKS_COLUMN, key, None)];
183
184 self.db
185 .commit(transaction)
186 .map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
187
188 Ok(())
189 }
190
191 fn len(&self) -> usize {
193 0
198 }
199
200 fn is_empty(&self) -> bool {
202 false
205 }
206
207 fn list_cids(&self) -> Result<Vec<Cid>> {
209 let mut cids = Vec::new();
210
211 let mut iter = self
212 .db
213 .iter(BLOCKS_COLUMN)
214 .map_err(|e| Error::Storage(format!("Failed to create iterator: {e}")))?;
215
216 while let Some((key, _value)) = iter
217 .next()
218 .map_err(|e| Error::Storage(format!("Iterator error: {e}")))?
219 {
220 let cid = Cid::try_from(key.to_vec())
222 .map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
223
224 cids.push(cid);
225 }
226
227 Ok(cids)
228 }
229
230 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
232 let mut transaction = Vec::new();
233
234 for block in blocks {
235 let key = block.cid().to_bytes();
236 let value = block.data().to_vec();
237 transaction.push((BLOCKS_COLUMN, key, Some(value)));
238 }
239
240 self.db
241 .commit(transaction)
242 .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
243
244 Ok(())
245 }
246
247 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
249 let mut results = Vec::with_capacity(cids.len());
250
251 for cid in cids {
252 let key = cid.to_bytes();
253 match self.db.get(BLOCKS_COLUMN, &key) {
254 Ok(Some(value)) => {
255 let data = bytes::Bytes::from(value);
256 results.push(Some(Block::from_parts(*cid, data)));
257 }
258 Ok(None) => results.push(None),
259 Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
260 }
261 }
262
263 Ok(results)
264 }
265
266 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
268 let mut results = Vec::with_capacity(cids.len());
269
270 for cid in cids {
271 let key = cid.to_bytes();
272 match self.db.get(BLOCKS_COLUMN, &key) {
273 Ok(Some(_)) => results.push(true),
274 Ok(None) => results.push(false),
275 Err(e) => return Err(Error::Storage(format!("Failed to check block: {e}"))),
276 }
277 }
278
279 Ok(results)
280 }
281
282 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
284 let mut transaction = Vec::new();
285
286 for cid in cids {
287 let key = cid.to_bytes();
288 transaction.push((BLOCKS_COLUMN, key, None));
289 }
290
291 self.db
292 .commit(transaction)
293 .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
294
295 Ok(())
296 }
297
298 async fn flush(&self) -> Result<()> {
300 Ok(())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use bytes::Bytes;
310
311 #[tokio::test]
312 async fn test_paritydb_put_get_block() {
313 let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb"));
314
315 let _ = std::fs::remove_dir_all(&config.path);
317
318 let store = ParityDbBlockStore::new(config).unwrap();
319 let data = Bytes::from("hello paritydb");
320 let block = Block::new(data.clone()).unwrap();
321
322 store.put(&block).await.unwrap();
324
325 let retrieved = store.get(block.cid()).await.unwrap();
327 assert!(retrieved.is_some());
328 assert_eq!(retrieved.unwrap().data(), &data);
329
330 assert!(store.has(block.cid()).await.unwrap());
332
333 store.delete(block.cid()).await.unwrap();
335 assert!(!store.has(block.cid()).await.unwrap());
336 }
337
338 #[tokio::test]
339 async fn test_paritydb_batch_operations() {
340 let config = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-batch"));
341
342 let _ = std::fs::remove_dir_all(&config.path);
344
345 let store = ParityDbBlockStore::new(config).unwrap();
346
347 let blocks: Vec<Block> = (0..10)
349 .map(|i| {
350 let data = Bytes::from(format!("block {}", i));
351 Block::new(data).unwrap()
352 })
353 .collect();
354
355 store.put_many(&blocks).await.unwrap();
357
358 let cids: Vec<Cid> = blocks.iter().map(|b| *b.cid()).collect();
360 let exists = store.has_many(&cids).await.unwrap();
361 assert!(exists.iter().all(|&x| x));
362
363 let retrieved = store.get_many(&cids).await.unwrap();
365 assert_eq!(retrieved.len(), blocks.len());
366 for (i, opt_block) in retrieved.iter().enumerate() {
367 assert!(opt_block.is_some());
368 assert_eq!(opt_block.as_ref().unwrap().data(), blocks[i].data());
369 }
370
371 store.delete_many(&cids).await.unwrap();
373 let exists = store.has_many(&cids).await.unwrap();
374 assert!(exists.iter().all(|&x| !x));
375 }
376
377 #[tokio::test]
378 async fn test_paritydb_presets() {
379 let config1 = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-fast"));
381 let _ = std::fs::remove_dir_all(&config1.path);
382 assert_eq!(config1.preset, ParityDbPreset::FastWrite);
383 let _store1 = ParityDbBlockStore::new(config1).unwrap();
384
385 let config2 = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-balanced"));
387 let _ = std::fs::remove_dir_all(&config2.path);
388 assert_eq!(config2.preset, ParityDbPreset::Balanced);
389 let _store2 = ParityDbBlockStore::new(config2).unwrap();
390
391 let config3 = ParityDbConfig::low_memory(PathBuf::from("/tmp/ipfrs-test-paritydb-lowmem"));
393 let _ = std::fs::remove_dir_all(&config3.path);
394 assert_eq!(config3.preset, ParityDbPreset::LowMemory);
395 let _store3 = ParityDbBlockStore::new(config3).unwrap();
396 }
397
398 #[tokio::test]
399 async fn test_paritydb_list_cids() {
400 let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-list"));
401
402 let _ = std::fs::remove_dir_all(&config.path);
404
405 let store = ParityDbBlockStore::new(config).unwrap();
406
407 let blocks: Vec<Block> = (0..5)
409 .map(|i| {
410 let data = Bytes::from(format!("block {}", i));
411 Block::new(data).unwrap()
412 })
413 .collect();
414
415 store.put_many(&blocks).await.unwrap();
416
417 let cids = store.list_cids().unwrap();
419 assert_eq!(cids.len(), 5);
420
421 for block in &blocks {
423 assert!(cids.contains(block.cid()));
424 }
425 }
426}