kindelia_core/
persistence.rs

1use std::collections::HashMap;
2use std::hash::{BuildHasher, Hash};
3use std::io::{Error, ErrorKind, Read, Result as IoResult, Write};
4use std::ops::Deref;
5use std::path::{Path, PathBuf};
6use std::sync::{mpsc, Arc};
7use thiserror::Error as ThisError;
8
9use kindelia_common::{crypto, Name, U120};
10use kindelia_lang::ast::Func;
11
12use crate::bits::ProtoSerialize;
13use crate::node::{self, HashedBlock};
14use crate::runtime::functions::{compile_func, CompFunc};
15use crate::util::{self, bitvec_to_bytes, FileSystemError};
16
17/// Trait that represents serialization of a type to memory.
18/// `disk_serialize` expects a sink to write to and returns the amount of bytes written
19/// `disk_deserialize` expects a source to read from, and returns an option:
20///  - Some(obj) represents that it was successfully created.
21///  - None represents that the `source` was empty.
22pub trait DiskSer
23where
24  Self: Sized,
25{
26  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize>;
27  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>>;
28}
29
30impl DiskSer for u8 {
31  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
32    sink.write(&self.to_le_bytes())
33  }
34  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u8>> {
35    let mut buf = [0; 1];
36    let bytes_read = source.read(&mut buf)?;
37    match bytes_read {
38      0 => Ok(None),
39      _ => Ok(Some(u8::from_le_bytes(buf))),
40    }
41  }
42}
43impl DiskSer for i128 {
44  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
45    sink.write(&self.to_le_bytes())
46  }
47  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<i128>> {
48    const BYTES: usize = (i128::BITS / 8) as usize;
49    const AT_MOST: usize = BYTES - 1;
50    let mut buf = [0; BYTES];
51    let bytes_read = source.read(&mut buf)?;
52    match bytes_read {
53      0 => Ok(None),
54      1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
55      _ => Ok(Some(i128::from_le_bytes(buf))),
56    }
57  }
58}
59
60// All numeric serializations are just this `u128` boilerplate
61// We could write this for any Type that implements
62// the function `from_le_bytes`.
63impl DiskSer for u128 {
64  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
65    sink.write(&self.to_le_bytes())
66  }
67  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u128>> {
68    const BYTES: usize = (u128::BITS / 8) as usize;
69    const AT_MOST: usize = BYTES - 1;
70    let mut buf = [0; BYTES];
71    let bytes_read = source.read(&mut buf)?;
72    match bytes_read {
73      0 => Ok(None),
74      1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
75      _ => Ok(Some(u128::from_le_bytes(buf))),
76    }
77  }
78}
79
80impl DiskSer for u64 {
81  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
82    sink.write(&self.to_le_bytes())
83  }
84  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<u64>> {
85    const BYTES: usize = (u64::BITS / 8) as usize;
86    const AT_MOST: usize = BYTES - 1;
87    let mut buf = [0; BYTES];
88    let bytes_read = source.read(&mut buf)?;
89    match bytes_read {
90      0 => Ok(None),
91      1..=AT_MOST => Err(Error::from(ErrorKind::UnexpectedEof)),
92      _ => Ok(Some(u64::from_le_bytes(buf))),
93    }
94  }
95}
96
97// We assume that every map will be stored in a whole file.
98// because of that, it will consume all of the file while reading it.
99impl<K, V, H> DiskSer for HashMap<K, V, H>
100where
101  K: DiskSer + Eq + Hash,
102  V: DiskSer,
103  H: BuildHasher + Default,
104{
105  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
106    let mut total_written = 0;
107    for (k, v) in self {
108      let key_size = k.disk_serialize(sink)?;
109      let val_size = v.disk_serialize(sink)?;
110      total_written += key_size + val_size;
111    }
112    Ok(total_written)
113  }
114  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
115    let mut slf = HashMap::with_hasher(H::default());
116    while let Some(key) = K::disk_deserialize(source)? {
117      let val = V::disk_deserialize(source)?;
118      if let Some(val) = val {
119        slf.insert(key, val);
120      } else {
121        return Err(Error::from(ErrorKind::UnexpectedEof));
122      }
123    }
124    Ok(Some(slf))
125  }
126}
127
128impl<K> DiskSer for Vec<K>
129where
130  K: DiskSer,
131{
132  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
133    let mut total_written = 0;
134    for elem in self {
135      let elem_size = elem.disk_serialize(sink)?;
136      total_written += elem_size;
137    }
138    Ok(total_written)
139  }
140  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
141    let mut res = Vec::new();
142    while let Some(elem) = K::disk_deserialize(source)? {
143      res.push(elem);
144    }
145    Ok(Some(res))
146  }
147}
148
149impl<T> DiskSer for Arc<T>
150where
151  T: DiskSer,
152{
153  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
154    let t = Arc::deref(self);
155    t.disk_serialize(sink)
156  }
157  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
158    let t = T::disk_deserialize(source)?;
159    Ok(t.map(Arc::new))
160  }
161}
162
163impl DiskSer for CompFunc {
164  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
165    let func_buff = self.func.proto_serialized().to_bytes();
166    let size = func_buff.len() as u128;
167    let written1 = size.disk_serialize(sink)?;
168    let written2 = func_buff.disk_serialize(sink)?;
169    Ok(written1 + written2)
170  }
171  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
172    // let compfunc = CompFunc {};
173    if let Some(len) = u128::disk_deserialize(source)? {
174      let len = len as usize;
175      let mut buf = vec![0; len];
176      let read_bytes = source.read(&mut buf)?;
177      if read_bytes != len {
178        return Err(Error::from(ErrorKind::UnexpectedEof));
179      }
180      let func = &Func::proto_deserialized(&bit_vec::BitVec::from_bytes(&buf))
181        .ok_or_else(|| Error::from(ErrorKind::InvalidData))?; // invalid data? which error is better?
182      let func = compile_func(func, false)
183        .map_err(|_| Error::from(ErrorKind::InvalidData))?; // TODO: return error in deserialization?
184      Ok(Some(func))
185    } else {
186      Ok(None)
187    }
188  }
189}
190
191impl<T: DiskSer + Default + std::marker::Copy, const N: usize> DiskSer
192  for [T; N]
193{
194  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
195    let mut total_written = 0;
196    for elem in self {
197      let elem_size = elem.disk_serialize(sink)?;
198      total_written += elem_size;
199    }
200    Ok(total_written)
201  }
202  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
203    let mut res: [T; N] = [T::default(); N];
204    for (i, e) in res.iter_mut().take(N).enumerate() {
205      let read = T::disk_deserialize(source)?;
206      match (i, read) {
207        (_, Some(elem)) => *e = elem,
208        (0, None) => return Ok(None),
209        (_, None) => return Err(Error::from(ErrorKind::UnexpectedEof)),
210      }
211    }
212    Ok(Some(res))
213  }
214}
215
216impl DiskSer for crypto::Hash {
217  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
218    self.0.disk_serialize(sink)
219  }
220  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
221    let hash = <[u8; 32]>::disk_deserialize(source)?;
222    Ok(hash.map(crypto::Hash))
223  }
224}
225
226impl DiskSer for crate::runtime::RawCell {
227  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
228    (**self).disk_serialize(sink)
229  }
230  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
231    let cell = u128::disk_deserialize(source)?;
232    match cell {
233      None => Ok(None),
234      Some(num) => {
235        let rawcell = crate::runtime::RawCell::new(num);
236        match rawcell {
237          Some(rawcell) => Ok(Some(rawcell)),
238          None => Err(Error::from(ErrorKind::InvalidData)),
239        }
240      }
241    }
242  }
243}
244
245impl DiskSer for crate::runtime::Loc {
246  fn disk_serialize<W: Write>(&self, sink: &mut W) -> IoResult<usize> {
247    (**self).disk_serialize(sink)
248  }
249  fn disk_deserialize<R: Read>(source: &mut R) -> IoResult<Option<Self>> {
250    let loc = u64::disk_deserialize(source)?;
251    match loc {
252      None => Ok(None),
253      Some(num) => {
254        let loc = crate::runtime::Loc::new(num);
255        match loc {
256          Some(loc) => Ok(Some(loc)),
257          None => Err(Error::from(ErrorKind::InvalidData)),
258        }
259      }
260    }
261  }
262}
263
264impl DiskSer for U120 {
265  fn disk_serialize<W: std::io::Write>(
266    &self,
267    sink: &mut W,
268  ) -> std::io::Result<usize> {
269    self.0.disk_serialize(sink)
270  }
271  fn disk_deserialize<R: std::io::Read>(
272    source: &mut R,
273  ) -> std::io::Result<Option<Self>> {
274    let num = u128::disk_deserialize(source)?;
275    match num {
276      None => Ok(None),
277      Some(num) => {
278        if num >> 120 == 0 {
279          Ok(Some(U120(num)))
280        } else {
281          Err(std::io::Error::from(std::io::ErrorKind::InvalidData))
282        }
283      }
284    }
285  }
286}
287
288impl DiskSer for Name {
289  fn disk_serialize<W: std::io::Write>(
290    &self,
291    sink: &mut W,
292  ) -> std::io::Result<usize> {
293    self.0.disk_serialize(sink)
294  }
295  fn disk_deserialize<R: std::io::Read>(
296    source: &mut R,
297  ) -> std::io::Result<Option<Self>> {
298    let num = u128::disk_deserialize(source)?;
299    match num {
300      None => Ok(None),
301      Some(num) => Ok(Name::new(num)),
302    }
303  }
304}
305
306// Node persistence
307// ================
308
309pub const BLOCKS_DIR: &str = "blocks";
310
311/// Errors associated with the BlockStorage trait.
312///
313/// Source error types are dynamic to enable trait implementors
314/// to provide their own error type if needed.
315#[derive(ThisError, Debug)]
316pub enum BlockStorageError {
317  #[error("Block storage write error")]
318  Write { source: Box<dyn std::error::Error + Send + Sync + 'static> },
319  #[error("Block storage read error")]
320  Read { source: Box<dyn std::error::Error + Send + Sync + 'static> },
321  #[error("Block storage serialization error")]
322  Serialization {
323    source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
324  },
325}
326
327/// A block writter interface, used to tell the node
328/// how it should write a block (in file system, in a mocked container, etc).
329pub trait BlockStorage
330where
331  Self: Clone,
332{
333  fn write_block(
334    &self,
335    height: u128,
336    block: HashedBlock,
337  ) -> Result<(), BlockStorageError>;
338  fn read_blocks<F: FnMut((node::Block, PathBuf))>(
339    &self,
340    then: F,
341  ) -> Result<(), BlockStorageError>;
342  fn disable(&mut self);
343  fn enable(&mut self);
344}
345
346/// Represents the information passed in the FileWritter channels.
347type FileWritterChannelInfo = (u128, HashedBlock);
348
349#[derive(Clone)]
350/// A file system writter for the node
351pub struct SimpleFileStorage {
352  tx: mpsc::Sender<FileWritterChannelInfo>,
353  path: PathBuf,
354  enabled: bool,
355}
356
357impl SimpleFileStorage {
358  /// This function spawns a thread that will receive the blocks
359  /// from the node and will write them in the filesystem. As the
360  /// thread is not joined here, it will become detached, only ending
361  /// when the process execution ends.
362  ///
363  /// But this function is only used in `node start` function, therefore
364  /// this thread will be terminated together with the other node threads (mining, events, etc).
365  pub fn new(path: PathBuf) -> Result<Self, BlockStorageError> {
366    // create channel
367    let (tx, rx) = mpsc::channel::<FileWritterChannelInfo>();
368    // blocks are stored in `blocks` dir
369    let blocks_path = path.join(BLOCKS_DIR);
370    std::fs::create_dir_all(&blocks_path).map_err(|e| {
371      BlockStorageError::Write {
372        source: Box::new(FileSystemError {
373          path: blocks_path.clone(),
374          source: e,
375          context: "creating blocks directory".to_string(),
376        }),
377      }
378    })?;
379
380    let moved_path = blocks_path.clone();
381    // spawn thread for write the blocks files
382    std::thread::spawn(move || {
383      // for each message received
384      while let Ok((height, block)) = rx.recv() {
385        // create file path
386        let file_path =
387          moved_path.join(format!("{:0>16x}.kindelia_block.bin", height));
388        // create file buffer
389        let file_buff = bitvec_to_bytes(&block.proto_serialized());
390        // write file
391        if let Err(e) = std::fs::write(&file_path, file_buff).map_err(|e| {
392          BlockStorageError::Write {
393            source: Box::new(FileSystemError {
394              path: file_path,
395              source: e,
396              context: "writing block to file".to_string(),
397            }),
398          }
399        }) {
400          eprintln!("Couldn't save block to disk.\n{}", e);
401        }
402      }
403    });
404
405    Ok(SimpleFileStorage { tx, path: blocks_path, enabled: true })
406  }
407}
408
409impl BlockStorage for SimpleFileStorage {
410  fn write_block(
411    &self,
412    height: u128,
413    block: HashedBlock,
414  ) -> Result<(), BlockStorageError> {
415    // if file storage is enabled
416    if self.enabled {
417      // try to send the info for the file writter
418      // if an error occurr, print it
419      if let Err(err) = self.tx.send((height, block)) {
420        eprintln!("Could not save block of height {}: {}", height, err);
421        return Err(BlockStorageError::Write { source: Box::new(err) });
422      }
423    }
424    Ok(())
425  }
426  fn read_blocks<F: FnMut((node::Block, PathBuf))>(
427    &self,
428    then: F,
429  ) -> Result<(), BlockStorageError> {
430    let file_paths = get_ordered_blocks_path(&self.path)
431      .map_err(|e| BlockStorageError::Read { source: Box::new(e) })?;
432
433    let mut items = vec![];
434    for (_, file_path) in file_paths.into_iter() {
435      let buffer =
436        std::fs::read(&file_path).map_err(|e| BlockStorageError::Read {
437          source: Box::new(FileSystemError {
438            path: file_path.clone(),
439            source: e,
440            context: "reading block from file".to_string(),
441          }),
442        })?;
443      let block =
444        node::Block::proto_deserialized(&util::bytes_to_bitvec(&buffer))
445          .ok_or(BlockStorageError::Serialization { source: None })?;
446      items.push((block, file_path));
447    }
448    items.into_iter().for_each(then);
449    Ok(())
450  }
451  fn disable(&mut self) {
452    self.enabled = false;
453  }
454  fn enable(&mut self) {
455    self.enabled = true
456  }
457}
458
459/// Errors associated with get_ordered_blocks_path
460#[derive(ThisError, Debug)]
461pub enum OrderedBlocksError {
462  #[error(transparent)]
463  ReadDir(#[from] FileSystemError),
464  #[error("Could not read filename in {0}")]
465  BadFileName(PathBuf),
466  #[error("Missing extension {0}")]
467  MissingExtension(PathBuf),
468  #[error("Filename is not numeric {path:?}")]
469  NotNumeric { path: PathBuf, source: std::num::ParseIntError },
470}
471
472/// Get all block entries in the `path`, sort it by the name/height and
473/// returns a vector containing ordered (height, paths).
474///
475/// Expects all the dir entries to be a kindelia block, named as
476/// <block_height>.kindelia_block.bin.
477///
478/// note: presently we error out immediately if any file naming
479/// abnormality is detected.  We could just ignore that
480/// entry instead.
481pub fn get_ordered_blocks_path(
482  path: &Path,
483) -> Result<Vec<(u64, PathBuf)>, OrderedBlocksError> {
484  let mut file_paths = vec![];
485
486  for entry in std::fs::read_dir(path).map_err(|e| {
487    OrderedBlocksError::ReadDir(FileSystemError {
488      path: path.to_path_buf(),
489      source: e,
490      context: "reading blocks directory".to_string(),
491    })
492  })? {
493    // Extract block height from block file path for fast sort
494    let path = entry
495      .map_err(|e| {
496        OrderedBlocksError::ReadDir(FileSystemError {
497          path: path.to_path_buf(),
498          source: e,
499          context: "reading blocks directory".to_string(),
500        })
501      })?
502      .path();
503    let name = path
504      .file_name()
505      .ok_or_else(|| OrderedBlocksError::BadFileName(path.to_path_buf()))?
506      .to_str()
507      .ok_or_else(|| OrderedBlocksError::BadFileName(path.to_path_buf()))?;
508    let bnum = name
509      .split('.')
510      .next()
511      .ok_or_else(|| OrderedBlocksError::MissingExtension(path.join(name)))?;
512    let bnum = u64::from_str_radix(bnum, 16).map_err(|e| {
513      OrderedBlocksError::NotNumeric { path: path.join(name), source: e }
514    })?;
515    file_paths.push((bnum, path));
516  }
517
518  file_paths.sort_unstable();
519  Ok(file_paths)
520}
521
522#[derive(Clone)]
523pub struct EmptyStorage;
524
525impl BlockStorage for EmptyStorage {
526  fn enable(&mut self) {}
527  fn disable(&mut self) {}
528  fn read_blocks<F: FnMut((node::Block, PathBuf))>(
529    &self,
530    _: F,
531  ) -> Result<(), BlockStorageError> {
532    Ok(())
533  }
534  fn write_block(
535    &self,
536    _: u128,
537    _: HashedBlock,
538  ) -> Result<(), BlockStorageError> {
539    Ok(())
540  }
541}