ledger_kv/
lib.rs

1//! This module implements a key-value storage system called LedgerKV.
2//!
3//! The LedgerKV struct provides methods for inserting, deleting, and retrieving key-value entries.
4//! It uses a journaling approach to store the entries in a binary file. Each entry is appended to the
5//! file along with its length, allowing efficient retrieval and updates.
6//!
7//! The LedgerKV struct maintains an in-memory index of the entries for quick lookups. It uses a HashMap
8//! to store the entries, where the key is an enum value representing the label of the entry, and the value
9//! is an IndexMap of key-value pairs.
10//!
11//! The LedgerKV struct also maintains a metadata file that keeps track of the number of entries, the last offset,
12//! and the parent hash of the entries. The parent hash is used to compute the cumulative hash of each entry,
13//! ensuring data integrity.
14//!
15//! The LedgerKV struct provides methods for inserting and deleting entries, as well as iterating over the entries
16//! by label or in raw form. It also supports refreshing the in-memory index and metadata from the binary file.
17//!
18//! Example usage:
19//!
20//! ```rust
21//! use std::path::PathBuf;
22//! use ledger_kv::{LedgerKV, EntryLabel, Operation};
23//!
24//! fn main() {
25//!     let data_dir = PathBuf::from("data");
26//!     let description = "example";
27//!
28//!     // Create a new LedgerKV instance
29//!     let mut ledger_kv = LedgerKV::new(data_dir, description);
30//!
31//!     // Insert a new entry
32//!     let label = EntryLabel::Unspecified;
33//!     let key = b"key".to_vec();
34//!     let value = b"value".to_vec();
35//!     ledger_kv.upsert(label.clone(), key.clone(), value.clone()).unwrap();
36//!
37//!     // Retrieve all entries
38//!     let entries = ledger_kv.iter(None).collect::<Vec<_>>();
39//!     println!("All entries: {:?}", entries);
40//!
41//!     // Delete an entry
42//!     ledger_kv.delete(label, key).unwrap();
43//! }
44//! ```
45use ahash::AHashMap;
46use borsh::{from_slice, to_vec, BorshDeserialize};
47use borsh_derive::{BorshDeserialize, BorshSerialize};
48use fs_err as fs;
49use fs_err::{File, OpenOptions};
50use indexmap::IndexMap;
51use log::{info, warn};
52use memmap2::Mmap;
53use sha2::{Digest, Sha256};
54use std::cell::RefCell;
55use std::io::{Cursor, Read, Seek, SeekFrom, Write};
56use std::path::PathBuf;
57
58/// Enum defining the different labels for entries.
59#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug, Hash)]
60pub enum EntryLabel {
61    Unspecified,
62    NodeProvider,
63}
64
65/// Enum defining the different operations that can be performed on entries.
66#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug)]
67pub enum Operation {
68    Upsert,
69    Delete,
70}
71
72/// Struct representing a key-value entry.
73#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug)]
74pub struct KvEntry {
75    pub label: EntryLabel,
76    pub key: Vec<u8>,
77    pub value: Vec<u8>,
78    pub operation: Operation,
79    file_offset: usize,
80    hash: Vec<u8>,
81}
82
83impl KvEntry {
84    /// Creates a new `KvEntry` instance.
85    ///
86    /// # Arguments
87    ///
88    /// * `label` - The label of the entry.
89    /// * `key` - The key of the entry.
90    /// * `value` - The value of the entry.
91    /// * `operation` - The operation to be performed on the entry.
92    /// * `file_offset` - The file offset of the entry.
93    /// * `hash` - The hash of the entry.
94    ///
95    /// # Returns
96    ///
97    /// A new `KvEntry` instance.
98    pub fn new(
99        label: EntryLabel,
100        key: Vec<u8>,
101        value: Vec<u8>,
102        operation: Operation,
103        file_offset: usize,
104        hash: Vec<u8>,
105    ) -> Self {
106        KvEntry {
107            label,
108            key,
109            value,
110            operation,
111            file_offset,
112            hash,
113        }
114    }
115}
116
117/// Implements the `Display` trait for `KvEntry`.
118impl std::fmt::Display for KvEntry {
119    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
120        if let Ok(key) = String::from_utf8(self.key.to_owned()) {
121            if let Ok(value) = String::from_utf8(self.value.to_owned()) {
122                return write!(f, "@{} Key: {}, Value: {}", self.file_offset, key, value);
123            }
124        }
125        write!(
126            f,
127            "@{} Key: {}, Value: {}",
128            self.file_offset,
129            String::from_utf8_lossy(&self.key),
130            String::from_utf8_lossy(&self.value)
131        )
132    }
133}
134
135/// Struct representing the LedgerKV.
136pub struct LedgerKV {
137    pub file_path: PathBuf,
138    metadata: RefCell<Metadata>,
139    entries: AHashMap<EntryLabel, IndexMap<Vec<u8>, KvEntry>>,
140    entry_hash2offset: IndexMap<Vec<u8>, usize>,
141}
142
143impl LedgerKV {
144    /// Creates a new `LedgerKV` instance.
145    ///
146    /// # Arguments
147    ///
148    /// * `data_dir` - The directory where the ledger data is stored.
149    /// * `description` - A description of the ledger.
150    ///
151    /// # Returns
152    ///
153    /// A new `LedgerKV` instance.
154    pub fn new(data_dir: PathBuf, description: &str) -> Self {
155        fs::create_dir_all(&data_dir).unwrap();
156        let mut file_path = data_dir.join(description);
157        file_path.set_extension("bin");
158        let metadata = Metadata::new(data_dir.clone(), description);
159        let entries = AHashMap::new();
160        let entries_hashes = IndexMap::new();
161
162        LedgerKV {
163            file_path,
164            metadata: RefCell::new(metadata),
165            entries,
166            entry_hash2offset: entries_hashes,
167        }
168        .refresh_ledger()
169    }
170
171    fn _compute_cumulative_hash(parent_hash: &[u8], key: &[u8], value: &[u8]) -> Vec<u8> {
172        let mut hasher = Sha256::new();
173        hasher.update(parent_hash);
174        hasher.update(key);
175        hasher.update(value);
176        hasher.finalize().to_vec()
177    }
178
179    fn _get_append_journal_file(&self) -> anyhow::Result<File> {
180        OpenOptions::new()
181            .append(true)
182            .create(true)
183            .open(&self.file_path)
184            .map_err(|e| anyhow::format_err!("Open file failed: {}", e))
185    }
186
187    fn _journal_append_kv_entry(&self, entry: &KvEntry) -> anyhow::Result<()> {
188        // Prepare entry as serialized bytes
189        let serialized_data = to_vec(&entry)?;
190        // Prepare entry len, as bytes
191        let entry_len_bytes = serialized_data.len();
192        let serialized_data_len = to_vec(&entry_len_bytes).expect("failed to serialize entry len");
193
194        let mut file = self._get_append_journal_file()?;
195        // Append entry len, as bytes
196        file.write_all(&serialized_data_len)
197            .map_err(|e| anyhow::format_err!("Append file failed: {}", e))?;
198        // Append entry
199        file.write_all(&serialized_data)
200            .map_err(|e| anyhow::format_err!("Append file failed: {}", e))?;
201
202        println!("Entry hash: {:?}", entry.hash);
203        self.metadata.borrow_mut().num_entries += 1;
204        self.metadata.borrow_mut().parent_hash = entry.hash.clone();
205        self.metadata.borrow_mut().last_offset = file.stream_position()? as usize;
206        self.metadata.borrow_mut().save();
207        self.metadata.borrow_mut().refresh();
208        Ok(())
209    }
210
211    pub fn upsert(
212        &mut self,
213        label: EntryLabel,
214        key: Vec<u8>,
215        value: Vec<u8>,
216    ) -> anyhow::Result<()> {
217        let hash =
218            Self::_compute_cumulative_hash(&self.metadata.borrow().parent_hash, &key, &value);
219        let entry = KvEntry::new(
220            label.clone(),
221            key.clone(),
222            value.clone(),
223            Operation::Upsert,
224            self.metadata.borrow().last_offset,
225            hash,
226        );
227
228        self._journal_append_kv_entry(&entry)?;
229
230        match self.entries.get_mut(&label) {
231            Some(entries) => {
232                entries.insert(key, entry);
233            }
234            None => {
235                let mut new_map = IndexMap::new();
236                new_map.insert(key, entry);
237                self.entries.insert(label, new_map);
238            }
239        };
240
241        Ok(())
242    }
243
244    pub fn delete(&mut self, label: EntryLabel, key: Vec<u8>) -> anyhow::Result<()> {
245        let hash = Self::_compute_cumulative_hash(&self.metadata.borrow().parent_hash, &key, &[]);
246        let entry = KvEntry::new(
247            label.clone(),
248            key.clone(),
249            Vec::new(),
250            Operation::Delete,
251            0,
252            hash,
253        );
254
255        self._journal_append_kv_entry(&entry)?;
256
257        match self.entries.get_mut(&label) {
258            Some(entries) => {
259                entries.remove(&key);
260            }
261            None => {
262                warn!("Entry label {:?} not found", label);
263            }
264        };
265
266        Ok(())
267    }
268
269    pub fn refresh_ledger(mut self) -> Self {
270        self.entries.clear();
271        self.entry_hash2offset.clear();
272        self.metadata.borrow_mut().refresh();
273
274        // If the file does not exist, just return
275        if !self.file_path.exists() {
276            return self;
277        }
278
279        let mut entries_hash2offset = IndexMap::new();
280
281        for entry in self.iter_raw().collect::<Vec<_>>() {
282            // Update the in-memory IndexMap of entries, used for quick lookups
283
284            let entries = match self.entries.get_mut(&entry.label) {
285                Some(entries) => entries,
286                None => {
287                    let new_map = IndexMap::new();
288                    self.entries.insert(entry.label.clone(), new_map);
289                    self.entries.get_mut(&entry.label).unwrap()
290                }
291            };
292
293            match &entry.operation {
294                Operation::Upsert => {
295                    entries.insert(entry.key.clone(), entry.clone());
296                    entries_hash2offset.insert(entry.hash, entry.file_offset);
297                }
298                Operation::Delete => {
299                    entries.remove(&entry.key);
300                    entries_hash2offset.remove(&entry.hash);
301                }
302            }
303        }
304
305        self.entry_hash2offset = entries_hash2offset;
306
307        self
308    }
309
310    pub fn iter(&self, label: Option<EntryLabel>) -> impl Iterator<Item = &KvEntry> {
311        self.entries
312            .iter()
313            .filter(|(entry_label, _entry)| match &label {
314                Some(label) => entry_label == &label,
315                None => true,
316            })
317            .map(|(_, entry)| entry)
318            .flat_map(|entry| entry.values())
319            .collect::<Vec<_>>()
320            .into_iter()
321    }
322
323    pub fn iter_raw(&self) -> impl Iterator<Item = KvEntry> + '_ {
324        let file = OpenOptions::new()
325            .read(true)
326            .write(true)
327            .create(true)
328            .open(&self.file_path)
329            .expect("failed to open ledger file");
330        let mmap = unsafe { Mmap::map(&file).unwrap() };
331        self.metadata.borrow_mut().refresh();
332        let cursor = Cursor::new(mmap);
333
334        info!("Num entries: {}", self.metadata.borrow().num_entries);
335        // scan is used to build a lazy iterator
336        // it gives us a way to maintain state between calls to the iterator
337        // (in this case, the Cursor and parent_hash).
338        let iterator =
339            (0..self.metadata.borrow().num_entries).scan((cursor, Vec::new()), |state, _| {
340                let (cursor, parent_hash) = state;
341                let mut slice_begin = cursor.position() as usize;
342                let mut slice = &cursor.get_ref()[slice_begin..];
343
344                let entry_len_bytes = match usize::deserialize(&mut slice) {
345                    Ok(len) => len,
346                    Err(_) => panic!("Deserialize error"),
347                };
348
349                let size_of_usize = std::mem::size_of_val(&entry_len_bytes);
350                slice_begin = cursor.position() as usize + size_of_usize;
351                let mut slice = &cursor.get_ref()[slice_begin..];
352
353                let entry = match KvEntry::deserialize(&mut slice) {
354                    Ok(entry) => entry,
355                    Err(_) => panic!("Deserialize error"),
356                };
357
358                let expected_hash =
359                    Self::_compute_cumulative_hash(parent_hash, &entry.key, &entry.value);
360                assert_eq!(expected_hash, entry.hash);
361                parent_hash.clear();
362                parent_hash.extend_from_slice(&entry.hash);
363
364                let seek_offset = size_of_usize + entry_len_bytes;
365                cursor
366                    .seek(SeekFrom::Current(seek_offset as i64))
367                    .expect("Seek error");
368
369                Some(entry)
370            });
371
372        iterator
373    }
374}
375
376/// Struct representing the metadata of the ledger.
377#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
378pub struct Metadata {
379    /// The name of the file associated with the metadata.
380    pub file_name: String,
381    #[borsh(skip)]
382    /// The path where the file is located.
383    pub file_path: PathBuf,
384    /// The number of entries in the ledger.
385    pub num_entries: u64,
386    /// The last offset in the file.
387    pub last_offset: usize,
388    /// The hash of the parent metadata.
389    pub parent_hash: Vec<u8>,
390}
391
392impl Metadata {
393    /// Creates a new instance of `Metadata`.
394    ///
395    /// # Arguments
396    ///
397    /// * `data_dir` - The directory where the data is stored.
398    /// * `description` - A description for the metadata.
399    ///
400    /// # Returns
401    ///
402    /// A new instance of `Metadata`.
403    pub fn new(data_dir: PathBuf, description: &str) -> Self {
404        let file_name = format!("{}.meta", description);
405        let mut file_path = data_dir.join(&file_name);
406        file_path.set_extension("meta");
407        let num_entries = 0;
408        let last_offset = 0;
409        let parent_hash = Vec::new();
410
411        Metadata {
412            file_name,
413            file_path,
414            num_entries,
415            last_offset,
416            parent_hash,
417        }
418    }
419
420    /// Saves the metadata to a file.
421    pub fn save(&self) {
422        let mut file = File::create(&self.file_path).unwrap();
423        let metadata_bytes = to_vec(self).unwrap();
424        file.write_all(&metadata_bytes).unwrap();
425    }
426
427    /// Refreshes the metadata by reading from the file.
428    pub fn refresh(&mut self) {
429        if !self.file_path.exists() {
430            warn!(
431                "Metadata refresh: file {} does not exist",
432                self.file_path.display()
433            );
434            return;
435        }
436        let mut file = File::open(&self.file_path).unwrap();
437        let mut metadata_bytes = Vec::new();
438        file.read_to_end(&mut metadata_bytes).unwrap_or_else(|_| {
439            panic!(
440                "Metadata refresh: failed to read file {}",
441                self.file_path.display()
442            )
443        });
444
445        let deserialized_metadata: Metadata = from_slice::<Metadata>(&metadata_bytes).unwrap();
446        self.num_entries = deserialized_metadata.num_entries;
447        self.last_offset = deserialized_metadata.last_offset;
448        self.parent_hash = deserialized_metadata.parent_hash;
449        info!(
450            "Read metadata of num_entries {} last_offset {}",
451            self.num_entries, self.last_offset
452        );
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use fs_err::File;
460    use std::io::{Read, Write};
461    use tempfile::tempdir;
462
463    #[test]
464    fn test_save() {
465        // Create a temporary file for testing
466        let temp_dir = tempfile::tempdir().unwrap();
467        let file_path = temp_dir.path().join("metadata.bin");
468
469        // Create a Metadata instance
470        let metadata = Metadata {
471            file_name: String::from("metadata.bin"),
472            file_path: file_path.clone(),
473            num_entries: 10,
474            last_offset: 0,
475            parent_hash: vec![0, 1, 2, 3],
476        };
477
478        // Call the save method
479        metadata.save();
480
481        // Read the contents of the file
482        let mut file = File::open(file_path).unwrap();
483        let mut metadata_bytes = Vec::new();
484        file.read_to_end(&mut metadata_bytes).unwrap();
485
486        // Deserialize the metadata bytes
487        let deserialized_metadata: Metadata = from_slice::<Metadata>(&metadata_bytes).unwrap();
488
489        // Assert that the deserialized metadata matches the original metadata
490        assert_eq!(deserialized_metadata.file_name, "metadata.bin");
491        assert_eq!(deserialized_metadata.num_entries, 10);
492        assert_eq!(deserialized_metadata.parent_hash, vec![0, 1, 2, 3]);
493    }
494
495    #[test]
496    fn test_refresh() {
497        // Create a temporary file for testing
498        let temp_dir = tempfile::tempdir().unwrap();
499        let file_path = temp_dir.path().join("metadata.bin");
500
501        // Create a Metadata instance
502        let mut metadata = Metadata {
503            file_name: String::from("metadata.bin"),
504            file_path: file_path.clone(),
505            num_entries: 0,
506            last_offset: 0,
507            parent_hash: Vec::new(),
508        };
509
510        // Write some metadata bytes to the file
511        let serialized_metadata: Vec<u8> = to_vec(&metadata).unwrap();
512        let mut file = File::create(&file_path).unwrap();
513        file.write_all(&serialized_metadata).unwrap();
514
515        // Call the refresh method
516        metadata.refresh();
517
518        // Assert that the metadata fields are correctly refreshed
519        assert_eq!(metadata.num_entries, 0);
520        assert_eq!(metadata.parent_hash, Vec::new());
521    }
522
523    fn new_temp_ledger() -> LedgerKV {
524        let data_dir = tempdir().unwrap().into_path();
525        let file_name = "test.bin";
526        LedgerKV::new(data_dir.clone(), file_name)
527    }
528
529    #[test]
530    fn test_compute_cumulative_hash() {
531        let parent_hash = vec![0, 1, 2, 3];
532        let key = vec![4, 5, 6, 7];
533        let value = vec![8, 9, 10, 11];
534        let cumulative_hash = LedgerKV::_compute_cumulative_hash(&parent_hash, &key, &value);
535
536        // Cumulative hash is a sha256 hash of the parent hash, key, and value
537        assert_eq!(
538            cumulative_hash,
539            vec![
540                255, 243, 169, 188, 221, 55, 54, 61, 112, 60, 28, 79, 149, 18, 83, 54, 134, 21,
541                120, 104, 240, 212, 241, 106, 15, 2, 208, 241, 218, 36, 249, 162
542            ]
543        );
544    }
545
546    #[test]
547    fn test_get_append_journal_file() {
548        let ledger_kv = new_temp_ledger();
549        let result = ledger_kv._get_append_journal_file();
550        assert!(result.is_ok());
551    }
552
553    #[test]
554    fn test_upsert() {
555        let mut ledger_kv = new_temp_ledger();
556
557        // Test upsert
558        let key = vec![1, 2, 3];
559        let value = vec![4, 5, 6];
560        ledger_kv
561            .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
562            .unwrap();
563        let entries = ledger_kv.entries.get(&EntryLabel::Unspecified).unwrap();
564        assert_eq!(
565            entries.get(&key),
566            Some(&KvEntry::new(
567                EntryLabel::Unspecified,
568                key,
569                value,
570                Operation::Upsert,
571                0,
572                ledger_kv.metadata.borrow().parent_hash.clone()
573            ))
574        );
575        assert_eq!(ledger_kv.metadata.borrow().num_entries, 1);
576    }
577
578    #[test]
579    fn test_upsert_with_matching_entry_label() {
580        let mut ledger_kv = new_temp_ledger();
581
582        let key = vec![1, 2, 3];
583        let value = vec![4, 5, 6];
584        ledger_kv
585            .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
586            .unwrap();
587        let entries = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
588        assert_eq!(
589            entries.get(&key),
590            Some(&KvEntry::new(
591                EntryLabel::NodeProvider,
592                key.clone(),
593                value.clone(),
594                Operation::Upsert,
595                0,
596                ledger_kv.metadata.borrow().parent_hash.clone()
597            ))
598        );
599    }
600
601    #[test]
602    fn test_upsert_with_mismatched_entry_type() {
603        let mut ledger_kv = new_temp_ledger();
604
605        let key = vec![1, 2, 3];
606        let value = vec![4, 5, 6];
607        ledger_kv
608            .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
609            .unwrap();
610
611        // Ensure that the entry is not added to the NodeProvider ledger since the entry_type doesn't match
612        assert_eq!(ledger_kv.entries.get(&EntryLabel::NodeProvider), None);
613    }
614
615    #[test]
616    fn test_delete_with_matching_entry_type() {
617        let mut ledger_kv = new_temp_ledger();
618
619        let key = vec![1, 2, 3];
620        let value = vec![4, 5, 6];
621        ledger_kv
622            .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
623            .unwrap();
624        ledger_kv
625            .delete(EntryLabel::NodeProvider, key.clone())
626            .unwrap();
627
628        // Ensure that the entry is deleted from the ledger since the entry_type matches
629        let entries = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
630        assert_eq!(entries.get(&key), None);
631    }
632
633    #[test]
634    fn test_delete_with_mismatched_entry_type() {
635        let mut ledger_kv = new_temp_ledger();
636
637        let key = vec![1, 2, 3];
638        let value = vec![4, 5, 6];
639        ledger_kv
640            .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
641            .unwrap();
642        let expected_hash = ledger_kv.metadata.borrow().parent_hash.clone();
643        ledger_kv
644            .delete(EntryLabel::Unspecified, key.clone())
645            .unwrap();
646
647        // Ensure that the entry is not deleted from the ledger since the entry_type doesn't match
648        let entries_np = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
649        assert_eq!(
650            entries_np.get(&key),
651            Some(&KvEntry::new(
652                EntryLabel::NodeProvider,
653                key.clone(),
654                value.clone(),
655                Operation::Upsert,
656                0,
657                expected_hash
658            ))
659        );
660        assert_eq!(ledger_kv.entries.get(&EntryLabel::Unspecified), None);
661    }
662    #[test]
663    fn test_delete() {
664        let mut ledger_kv = new_temp_ledger();
665
666        // Test delete
667        let key = vec![1, 2, 3];
668        let value = vec![4, 5, 6];
669        ledger_kv
670            .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
671            .unwrap();
672        ledger_kv
673            .delete(EntryLabel::Unspecified, key.clone())
674            .unwrap();
675        let entries = ledger_kv.entries.get(&EntryLabel::Unspecified).unwrap();
676        assert_eq!(entries.get(&key), None);
677    }
678
679    #[test]
680    fn test_refresh_ledger() {
681        let mut ledger_kv = new_temp_ledger();
682
683        // Test refresh_ledger
684        let key = vec![1, 2, 3];
685        let value = vec![4, 5, 6];
686        ledger_kv
687            .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
688            .unwrap();
689        let parent_hash = ledger_kv.metadata.borrow().parent_hash.clone();
690        fs::remove_file(ledger_kv.file_path.clone()).unwrap();
691        ledger_kv = ledger_kv.refresh_ledger();
692
693        assert_eq!(ledger_kv.entries.get(&EntryLabel::Unspecified), None);
694        assert_eq!(ledger_kv.metadata.borrow().parent_hash, parent_hash);
695    }
696}