1use std::sync::RwLock;
13use std::{
14 cmp::{max, min},
15 collections::HashMap,
16 mem::size_of_val,
17};
18
19use futures::StreamExt;
20
21use crate::errors::*;
22use crate::types::*;
23use crate::utils::Receiver;
24
25pub trait BlockStorage: Send + Sync {
26 fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError>;
28
29 fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result<BlockId, StorageError>;
36
37 fn del(&self, overlay: &OverlayId, id: &BlockId) -> Result<usize, StorageError>;
39
40 fn len(&self) -> Result<usize, StorageError>;
42
43 fn has(&self, overlay: &OverlayId, id: &BlockId) -> Result<(), StorageError>;
44}
45
46const ONE_MEGA_BYTE: usize = 1024 * 1024;
71const DISK_BLOCK_SIZE: usize = 4096;
72const MAX_FACTOR: usize = 256;
76
77pub fn store_valid_value_size(size: usize) -> usize {
79 min(
80 max(1, (size + DISK_BLOCK_SIZE - 1) / DISK_BLOCK_SIZE),
81 MAX_FACTOR,
82 ) * DISK_BLOCK_SIZE
83}
84
85pub const fn store_max_value_size() -> usize {
87 ONE_MEGA_BYTE
88}
89
90pub struct HashMapBlockStorage {
92 blocks: RwLock<HashMap<BlockId, Block>>,
93}
94
95impl HashMapBlockStorage {
96 pub fn new() -> HashMapBlockStorage {
97 HashMapBlockStorage {
98 blocks: RwLock::new(HashMap::new()),
99 }
100 }
101
102 pub async fn from_block_stream(overlay: &OverlayId, mut blockstream: Receiver<Block>) -> Self {
103 let this = Self::new();
104 while let Some(block) = blockstream.next().await {
105 this.put(overlay, &block, false).unwrap();
106 }
107 this
108 }
109
110 pub fn get_len(&self) -> usize {
111 self.blocks.read().unwrap().len()
112 }
113
114 pub fn get_all(&self) -> Vec<Block> {
115 self.blocks
116 .read()
117 .unwrap()
118 .values()
119 .map(|x| x.clone())
120 .collect()
121 }
122 pub fn put_local(&self, block: &Block) -> Result<BlockId, StorageError> {
123 let overlay = OverlayId::nil();
124 self.put(&overlay, block, false)
125 }
126}
127
128impl BlockStorage for HashMapBlockStorage {
129 fn get(&self, _overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError> {
130 match self.blocks.read().unwrap().get(id) {
131 Some(block) => {
132 let mut b = block.clone();
133 let i = b.get_and_save_id();
134 if *id == i {
135 Ok(b)
136 } else {
137 Err(StorageError::DataCorruption)
138 }
139 }
140 None => Err(StorageError::NotFound),
141 }
142 }
143
144 fn has(&self, _overlay: &OverlayId, id: &BlockId) -> Result<(), StorageError> {
145 if !self.blocks.read().unwrap().contains_key(id) {
146 return Err(StorageError::NotFound);
147 }
148 Ok(())
149 }
150
151 fn len(&self) -> Result<usize, StorageError> {
152 Ok(self.get_len())
153 }
154
155 fn put(
156 &self,
157 _overlay: &OverlayId,
158 block: &Block,
159 _lazy: bool,
160 ) -> Result<BlockId, StorageError> {
161 let id = block.id();
162 let mut b = block.clone();
164 b.set_key(None);
165 self.blocks.write().unwrap().insert(id, b);
166 Ok(id)
167 }
168
169 fn del(&self, _overlay: &OverlayId, id: &BlockId) -> Result<usize, StorageError> {
170 let block = self
171 .blocks
172 .write()
173 .unwrap()
174 .remove(id)
175 .ok_or(StorageError::NotFound)?;
176 let size = size_of_val(&block);
177 Ok(size)
178 }
179}