ng_repo/
block_storage.rs

1// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
2// All rights reserved.
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
5// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
6// at your option. All files in the project carrying such
7// notice may not be copied, modified, or distributed except
8// according to those terms.
9
10//! Storage of Blocks
11
12use std::sync::RwLock;
13use std::{
14    cmp::{max, min},
15    collections::HashMap,
16    mem::size_of_val,
17};
18
19use futures::StreamExt;
20
21use crate::errors::*;
22use crate::types::*;
23use crate::utils::Receiver;
24
25pub trait BlockStorage: Send + Sync {
26    /// Load a block from the storage.
27    fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError>;
28
29    // fetch a block from broker or core overlay
30    // pub async fn fetch(&self, id: &BlockId) -> Result<Block, StorageError> {
31    //     todo!();
32    // }
33
34    /// Save a block to the storage.
35    fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result<BlockId, StorageError>;
36
37    /// Delete a block from the storage.
38    fn del(&self, overlay: &OverlayId, id: &BlockId) -> Result<usize, StorageError>;
39
40    /// number of Blocks in the storage
41    fn len(&self) -> Result<usize, StorageError>;
42
43    fn has(&self, overlay: &OverlayId, id: &BlockId) -> Result<(), StorageError>;
44}
45
46/* LMDB values:
47
48const MIN_SIZE: usize = 4072;
49const PAGE_SIZE: usize = 4096;
50const HEADER: usize = PAGE_SIZE - MIN_SIZE;
51const MAX_FACTOR: usize = 512;
52
53/// Returns a valid/optimal value size for the entries of the storage backend.
54pub fn store_valid_value_size(size: usize) -> usize {
55    min(
56        ((size + HEADER) as f32 / PAGE_SIZE as f32).ceil() as usize,
57        MAX_FACTOR,
58    ) * PAGE_SIZE
59        - HEADER
60}
61
62/// Returns the maximum value size for the entries of the storage backend.
63pub const fn store_max_value_size() -> usize {
64    MAX_FACTOR * PAGE_SIZE - HEADER
65}
66*/
67
68// ROCKSDB values:
69
70const ONE_MEGA_BYTE: usize = 1024 * 1024;
71const DISK_BLOCK_SIZE: usize = 4096;
72// HDD block size at 4096, SSD page size at 4096, on openbsd FFS default is 16384
73// see Rocksdb integrated BlobDB https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
74// blob values should be multiple of 4096 because of the BlobCache of RocksDB that is in heap memory (so must align on mem page).
75const MAX_FACTOR: usize = 256;
76
77/// Returns a valid/optimal value size for the entries of the storage backend.
78pub fn store_valid_value_size(size: usize) -> usize {
79    min(
80        max(1, (size + DISK_BLOCK_SIZE - 1) / DISK_BLOCK_SIZE),
81        MAX_FACTOR,
82    ) * DISK_BLOCK_SIZE
83}
84
85/// Returns the maximum value size for the entries of the storage backend.
86pub const fn store_max_value_size() -> usize {
87    ONE_MEGA_BYTE
88}
89
90/// Store with a HashMap backend
91pub struct HashMapBlockStorage {
92    blocks: RwLock<HashMap<BlockId, Block>>,
93}
94
95impl HashMapBlockStorage {
96    pub fn new() -> HashMapBlockStorage {
97        HashMapBlockStorage {
98            blocks: RwLock::new(HashMap::new()),
99        }
100    }
101
102    pub async fn from_block_stream(overlay: &OverlayId, mut blockstream: Receiver<Block>) -> Self {
103        let this = Self::new();
104        while let Some(block) = blockstream.next().await {
105            this.put(overlay, &block, false).unwrap();
106        }
107        this
108    }
109
110    pub fn get_len(&self) -> usize {
111        self.blocks.read().unwrap().len()
112    }
113
114    pub fn get_all(&self) -> Vec<Block> {
115        self.blocks
116            .read()
117            .unwrap()
118            .values()
119            .map(|x| x.clone())
120            .collect()
121    }
122    pub fn put_local(&self, block: &Block) -> Result<BlockId, StorageError> {
123        let overlay = OverlayId::nil();
124        self.put(&overlay, block, false)
125    }
126}
127
128impl BlockStorage for HashMapBlockStorage {
129    fn get(&self, _overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError> {
130        match self.blocks.read().unwrap().get(id) {
131            Some(block) => {
132                let mut b = block.clone();
133                let i = b.get_and_save_id();
134                if *id == i {
135                    Ok(b)
136                } else {
137                    Err(StorageError::DataCorruption)
138                }
139            }
140            None => Err(StorageError::NotFound),
141        }
142    }
143
144    fn has(&self, _overlay: &OverlayId, id: &BlockId) -> Result<(), StorageError> {
145        if !self.blocks.read().unwrap().contains_key(id) {
146            return Err(StorageError::NotFound);
147        }
148        Ok(())
149    }
150
151    fn len(&self) -> Result<usize, StorageError> {
152        Ok(self.get_len())
153    }
154
155    fn put(
156        &self,
157        _overlay: &OverlayId,
158        block: &Block,
159        _lazy: bool,
160    ) -> Result<BlockId, StorageError> {
161        let id = block.id();
162        //log_debug!("PUTTING {}", id);
163        let mut b = block.clone();
164        b.set_key(None);
165        self.blocks.write().unwrap().insert(id, b);
166        Ok(id)
167    }
168
169    fn del(&self, _overlay: &OverlayId, id: &BlockId) -> Result<usize, StorageError> {
170        let block = self
171            .blocks
172            .write()
173            .unwrap()
174            .remove(id)
175            .ok_or(StorageError::NotFound)?;
176        let size = size_of_val(&block);
177        Ok(size)
178    }
179}