1use std::collections::HashMap;
2use std::hash::{BuildHasher, Hash};
3use std::io::{Error, ErrorKind, Read, Result as IoResult, Write};
4use std::ops::Deref;
5use std::path::{Path, PathBuf};
6use std::sync::{mpsc, Arc};
7use thiserror::Error as ThisError;
8
9use kindelia_common::{crypto, Name, U120};
10use kindelia_lang::ast::Func;
11
12use crate::bits::ProtoSerialize;
13use crate::node::{self, HashedBlock};
14use crate::runtime::functions::{compile_func, CompFunc};
15use crate::util::{self, bitvec_to_bytes, FileSystemError};
16
17pub trait DiskSer
23where
24 Self: Sized,
25{
26 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize>;
27 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>>;
28}
29
30impl DiskSer for u8 {
31 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
32 sink.write(&self.to_le_bytes())
33 }
34 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u8>> {
35 let mut buf = [0; 1];
36 let bytes_read = source.read(&mut buf)?;
37 match bytes_read {
38 0 => Ok(None),
39 _ => Ok(Some(u8::from_le_bytes(buf))),
40 }
41 }
42}
43impl DiskSer for i128 {
44 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
45 sink.write(&self.to_le_bytes())
46 }
47 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<i128>> {
48 const BYTES: usize = (i128::BITS / 8) as usize;
49 const AT_MOST: usize = BYTES - 1;
50 let mut buf = [0; BYTES];
51 let bytes_read = source.read(&mut buf)?;
52 match bytes_read {
53 0 => Ok(None),
54 1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
55 _ => Ok(Some(i128::from_le_bytes(buf))),
56 }
57 }
58}
59
60impl DiskSer for u128 {
64 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
65 sink.write(&self.to_le_bytes())
66 }
67 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u128>> {
68 const BYTES: usize = (u128::BITS / 8) as usize;
69 const AT_MOST: usize = BYTES - 1;
70 let mut buf = [0; BYTES];
71 let bytes_read = source.read(&mut buf)?;
72 match bytes_read {
73 0 => Ok(None),
74 1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
75 _ => Ok(Some(u128::from_le_bytes(buf))),
76 }
77 }
78}
79
80impl DiskSer for u64 {
81 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
82 sink.write(&self.to_le_bytes())
83 }
84 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u64>> {
85 const BYTES: usize = (u64::BITS / 8) as usize;
86 const AT_MOST: usize = BYTES - 1;
87 let mut buf = [0; BYTES];
88 let bytes_read = source.read(&mut buf)?;
89 match bytes_read {
90 0 => Ok(None),
91 1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
92 _ => Ok(Some(u64::from_le_bytes(buf))),
93 }
94 }
95}
96
97impl<K, V, H> DiskSer for HashMap<K, V, H>
100where
101 K: DiskSer + Eq + Hash,
102 V: DiskSer,
103 H: BuildHasher + Default,
104{
105 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
106 let mut total_written = 0;
107 for (k, v) in self {
108 let key_size = k.disk_serialize(sink)?;
109 let val_size = v.disk_serialize(sink)?;
110 total_written += key_size + val_size;
111 }
112 Ok(total_written)
113 }
114 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
115 let mut slf = HashMap::with_hasher(H::default());
116 while let Some(key) = K::disk_deserialize(source)? {
117 let val = V::disk_deserialize(source)?;
118 if let Some(val) = val {
119 slf.insert(key, val);
120 } else {
121 return Err(Error::from(ErrorKind::UnexpectedEof));
122 }
123 }
124 Ok(Some(slf))
125 }
126}
127
128impl<K> DiskSer for Vec<K>
129where
130 K: DiskSer,
131{
132 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
133 let mut total_written = 0;
134 for elem in self {
135 let elem_size = elem.disk_serialize(sink)?;
136 total_written += elem_size;
137 }
138 Ok(total_written)
139 }
140 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
141 let mut res = Vec::new();
142 while let Some(elem) = K::disk_deserialize(source)? {
143 res.push(elem);
144 }
145 Ok(Some(res))
146 }
147}
148
149impl<T> DiskSer for Arc<T>
150where
151 T: DiskSer,
152{
153 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
154 let t = Arc::deref(self);
155 t.disk_serialize(sink)
156 }
157 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
158 let t = T::disk_deserialize(source)?;
159 Ok(t.map(Arc::new))
160 }
161}
162
163impl DiskSer for CompFunc {
164 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
165 let func_buff = self.func.proto_serialized().to_bytes();
166 let size = func_buff.len() as u128;
167 let written1 = size.disk_serialize(sink)?;
168 let written2 = func_buff.disk_serialize(sink)?;
169 Ok(written1 + written2)
170 }
171 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
172 if let Some(len) = u128::disk_deserialize(source)? {
174 let len = len as usize;
175 let mut buf = vec![0; len];
176 let read_bytes = source.read(&mut buf)?;
177 if read_bytes != len {
178 return Err(Error::from(ErrorKind::UnexpectedEof));
179 }
180 let func = &Func::proto_deserialized(&bit_vec::BitVec::from_bytes(&buf))
181 .ok_or_else(|| Error::from(ErrorKind::InvalidData))?; let func = compile_func(func, false)
183 .map_err(|_| Error::from(ErrorKind::InvalidData))?; Ok(Some(func))
185 } else {
186 Ok(None)
187 }
188 }
189}
190
191impl<T: DiskSer + Default + std::marker::Copy, const N: usize> DiskSer
192 for [T; N]
193{
194 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
195 let mut total_written = 0;
196 for elem in self {
197 let elem_size = elem.disk_serialize(sink)?;
198 total_written += elem_size;
199 }
200 Ok(total_written)
201 }
202 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
203 let mut res: [T; N] = [T::default(); N];
204 for (i, e) in res.iter_mut().take(N).enumerate() {
205 let read = T::disk_deserialize(source)?;
206 match (i, read) {
207 (_, Some(elem)) => *e = elem,
208 (0, None) => return Ok(None),
209 (_, None) => return Err(Error::from(ErrorKind::UnexpectedEof)),
210 }
211 }
212 Ok(Some(res))
213 }
214}
215
216impl DiskSer for crypto::Hash {
217 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
218 self.0.disk_serialize(sink)
219 }
220 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
221 let hash = <[u8; 32]>::disk_deserialize(source)?;
222 Ok(hash.map(crypto::Hash))
223 }
224}
225
226impl DiskSer for crate::runtime::RawCell {
227 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
228 (**self).disk_serialize(sink)
229 }
230 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
231 let cell = u128::disk_deserialize(source)?;
232 match cell {
233 None => Ok(None),
234 Some(num) => {
235 let rawcell = crate::runtime::RawCell::new(num);
236 match rawcell {
237 Some(rawcell) => Ok(Some(rawcell)),
238 None => Err(Error::from(ErrorKind::InvalidData)),
239 }
240 }
241 }
242 }
243}
244
245impl DiskSer for crate::runtime::Loc {
246 fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
247 (**self).disk_serialize(sink)
248 }
249 fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
250 let loc = u64::disk_deserialize(source)?;
251 match loc {
252 None => Ok(None),
253 Some(num) => {
254 let loc = crate::runtime::Loc::new(num);
255 match loc {
256 Some(loc) => Ok(Some(loc)),
257 None => Err(Error::from(ErrorKind::InvalidData)),
258 }
259 }
260 }
261 }
262}
263
264impl DiskSer for U120 {
265 fn disk_serialize<W: std::io::Write>(
266 &self,
267 sink: &mut W,
268 ) -> std::io::Result<usize> {
269 self.0.disk_serialize(sink)
270 }
271 fn disk_deserialize<R: std::io::Read>(
272 source: &mut R,
273 ) -> std::io::Result<Option<Self>> {
274 let num = u128::disk_deserialize(source)?;
275 match num {
276 None => Ok(None),
277 Some(num) => {
278 if num >> 120 == 0 {
279 Ok(Some(U120(num)))
280 } else {
281 Err(std::io::Error::from(std::io::ErrorKind::InvalidData))
282 }
283 }
284 }
285 }
286}
287
288impl DiskSer for Name {
289 fn disk_serialize<W: std::io::Write>(
290 &self,
291 sink: &mut W,
292 ) -> std::io::Result<usize> {
293 self.0.disk_serialize(sink)
294 }
295 fn disk_deserialize<R: std::io::Read>(
296 source: &mut R,
297 ) -> std::io::Result<Option<Self>> {
298 let num = u128::disk_deserialize(source)?;
299 match num {
300 None => Ok(None),
301 Some(num) => Ok(Name::new(num)),
302 }
303 }
304}
305
306pub const BLOCKS_DIR: &str = "blocks";
310
311#[derive(ThisError, Debug)]
316pub enum BlockStorageError {
317 #[error("Block storage write error")]
318 Write { source: Box<dyn std::error::Error + Send + Sync + 'static> },
319 #[error("Block storage read error")]
320 Read { source: Box<dyn std::error::Error + Send + Sync + 'static> },
321 #[error("Block storage serialization error")]
322 Serialization {
323 source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
324 },
325}
326
327pub trait BlockStorage
330where
331 Self: Clone,
332{
333 fn write_block(
334 &self,
335 height: u128,
336 block: HashedBlock,
337 ) -> Result<(), BlockStorageError>;
338 fn read_blocks<F: FnMut((node::Block, PathBuf))>(
339 &self,
340 then: F,
341 ) -> Result<(), BlockStorageError>;
342 fn disable(&mut self);
343 fn enable(&mut self);
344}
345
346type FileWritterChannelInfo = (u128, HashedBlock);
348
349#[derive(Clone)]
350pub struct SimpleFileStorage {
352 tx: mpsc::Sender<FileWritterChannelInfo>,
353 path: PathBuf,
354 enabled: bool,
355}
356
357impl SimpleFileStorage {
358 pub fn new(path: PathBuf) -> Result<Self, BlockStorageError> {
366 let (tx, rx) = mpsc::channel::<FileWritterChannelInfo>();
368 let blocks_path = path.join(BLOCKS_DIR);
370 std::fs::create_dir_all(&blocks_path).map_err(|e| {
371 BlockStorageError::Write {
372 source: Box::new(FileSystemError {
373 path: blocks_path.clone(),
374 source: e,
375 context: "creating blocks directory".to_string(),
376 }),
377 }
378 })?;
379
380 let moved_path = blocks_path.clone();
381 std::thread::spawn(move || {
383 while let Ok((height, block)) = rx.recv() {
385 let file_path =
387 moved_path.join(format!("{:0>16x}.kindelia_block.bin", height));
388 let file_buff = bitvec_to_bytes(&block.proto_serialized());
390 if let Err(e) = std::fs::write(&file_path, file_buff).map_err(|e| {
392 BlockStorageError::Write {
393 source: Box::new(FileSystemError {
394 path: file_path,
395 source: e,
396 context: "writing block to file".to_string(),
397 }),
398 }
399 }) {
400 eprintln!("Couldn't save block to disk.\n{}", e);
401 }
402 }
403 });
404
405 Ok(SimpleFileStorage { tx, path: blocks_path, enabled: true })
406 }
407}
408
409impl BlockStorage for SimpleFileStorage {
410 fn write_block(
411 &self,
412 height: u128,
413 block: HashedBlock,
414 ) -> Result<(), BlockStorageError> {
415 if self.enabled {
417 if let Err(err) = self.tx.send((height, block)) {
420 eprintln!("Could not save block of height {}: {}", height, err);
421 return Err(BlockStorageError::Write { source: Box::new(err) });
422 }
423 }
424 Ok(())
425 }
426 fn read_blocks<F: FnMut((node::Block, PathBuf))>(
427 &self,
428 then: F,
429 ) -> Result<(), BlockStorageError> {
430 let file_paths = get_ordered_blocks_path(&self.path)
431 .map_err(|e| BlockStorageError::Read { source: Box::new(e) })?;
432
433 let mut items = vec![];
434 for (_, file_path) in file_paths.into_iter() {
435 let buffer =
436 std::fs::read(&file_path).map_err(|e| BlockStorageError::Read {
437 source: Box::new(FileSystemError {
438 path: file_path.clone(),
439 source: e,
440 context: "reading block from file".to_string(),
441 }),
442 })?;
443 let block =
444 node::Block::proto_deserialized(&util::bytes_to_bitvec(&buffer))
445 .ok_or(BlockStorageError::Serialization { source: None })?;
446 items.push((block, file_path));
447 }
448 items.into_iter().for_each(then);
449 Ok(())
450 }
451 fn disable(&mut self) {
452 self.enabled = false;
453 }
454 fn enable(&mut self) {
455 self.enabled = true
456 }
457}
458
459#[derive(ThisError, Debug)]
461pub enum OrderedBlocksError {
462 #[error(transparent)]
463 ReadDir(#[from] FileSystemError),
464 #[error("Could not read filename in {0}")]
465 BadFileName(PathBuf),
466 #[error("Missing extension {0}")]
467 MissingExtension(PathBuf),
468 #[error("Filename is not numeric {path:?}")]
469 NotNumeric { path: PathBuf, source: std::num::ParseIntError },
470}
471
472pub fn get_ordered_blocks_path(
482 path: &Path,
483) -> Result<Vec<(u64, PathBuf)>, OrderedBlocksError> {
484 let mut file_paths = vec![];
485
486 for entry in std::fs::read_dir(path).map_err(|e| {
487 OrderedBlocksError::ReadDir(FileSystemError {
488 path: path.to_path_buf(),
489 source: e,
490 context: "reading blocks directory".to_string(),
491 })
492 })? {
493 let path = entry
495 .map_err(|e| {
496 OrderedBlocksError::ReadDir(FileSystemError {
497 path: path.to_path_buf(),
498 source: e,
499 context: "reading blocks directory".to_string(),
500 })
501 })?
502 .path();
503 let name = path
504 .file_name()
505 .ok_or_else(|| OrderedBlocksError::BadFileName(path.to_path_buf()))?
506 .to_str()
507 .ok_or_else(|| OrderedBlocksError::BadFileName(path.to_path_buf()))?;
508 let bnum = name
509 .split('.')
510 .next()
511 .ok_or_else(|| OrderedBlocksError::MissingExtension(path.join(name)))?;
512 let bnum = u64::from_str_radix(bnum, 16).map_err(|e| {
513 OrderedBlocksError::NotNumeric { path: path.join(name), source: e }
514 })?;
515 file_paths.push((bnum, path));
516 }
517
518 file_paths.sort_unstable();
519 Ok(file_paths)
520}
521
522#[derive(Clone)]
523pub struct EmptyStorage;
524
525impl BlockStorage for EmptyStorage {
526 fn enable(&mut self) {}
527 fn disable(&mut self) {}
528 fn read_blocks<F: FnMut((node::Block, PathBuf))>(
529 &self,
530 _: F,
531 ) -> Result<(), BlockStorageError> {
532 Ok(())
533 }
534 fn write_block(
535 &self,
536 _: u128,
537 _: HashedBlock,
538 ) -> Result<(), BlockStorageError> {
539 Ok(())
540 }
541}