1use ahash::AHashMap;
46use borsh::{from_slice, to_vec, BorshDeserialize};
47use borsh_derive::{BorshDeserialize, BorshSerialize};
48use fs_err as fs;
49use fs_err::{File, OpenOptions};
50use indexmap::IndexMap;
51use log::{info, warn};
52use memmap2::Mmap;
53use sha2::{Digest, Sha256};
54use std::cell::RefCell;
55use std::io::{Cursor, Read, Seek, SeekFrom, Write};
56use std::path::PathBuf;
57
58#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug, Hash)]
60pub enum EntryLabel {
61 Unspecified,
62 NodeProvider,
63}
64
65#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug)]
67pub enum Operation {
68 Upsert,
69 Delete,
70}
71
72#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, Debug)]
74pub struct KvEntry {
75 pub label: EntryLabel,
76 pub key: Vec<u8>,
77 pub value: Vec<u8>,
78 pub operation: Operation,
79 file_offset: usize,
80 hash: Vec<u8>,
81}
82
83impl KvEntry {
84 pub fn new(
99 label: EntryLabel,
100 key: Vec<u8>,
101 value: Vec<u8>,
102 operation: Operation,
103 file_offset: usize,
104 hash: Vec<u8>,
105 ) -> Self {
106 KvEntry {
107 label,
108 key,
109 value,
110 operation,
111 file_offset,
112 hash,
113 }
114 }
115}
116
117impl std::fmt::Display for KvEntry {
119 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
120 if let Ok(key) = String::from_utf8(self.key.to_owned()) {
121 if let Ok(value) = String::from_utf8(self.value.to_owned()) {
122 return write!(f, "@{} Key: {}, Value: {}", self.file_offset, key, value);
123 }
124 }
125 write!(
126 f,
127 "@{} Key: {}, Value: {}",
128 self.file_offset,
129 String::from_utf8_lossy(&self.key),
130 String::from_utf8_lossy(&self.value)
131 )
132 }
133}
134
135pub struct LedgerKV {
137 pub file_path: PathBuf,
138 metadata: RefCell<Metadata>,
139 entries: AHashMap<EntryLabel, IndexMap<Vec<u8>, KvEntry>>,
140 entry_hash2offset: IndexMap<Vec<u8>, usize>,
141}
142
143impl LedgerKV {
144 pub fn new(data_dir: PathBuf, description: &str) -> Self {
155 fs::create_dir_all(&data_dir).unwrap();
156 let mut file_path = data_dir.join(description);
157 file_path.set_extension("bin");
158 let metadata = Metadata::new(data_dir.clone(), description);
159 let entries = AHashMap::new();
160 let entries_hashes = IndexMap::new();
161
162 LedgerKV {
163 file_path,
164 metadata: RefCell::new(metadata),
165 entries,
166 entry_hash2offset: entries_hashes,
167 }
168 .refresh_ledger()
169 }
170
171 fn _compute_cumulative_hash(parent_hash: &[u8], key: &[u8], value: &[u8]) -> Vec<u8> {
172 let mut hasher = Sha256::new();
173 hasher.update(parent_hash);
174 hasher.update(key);
175 hasher.update(value);
176 hasher.finalize().to_vec()
177 }
178
179 fn _get_append_journal_file(&self) -> anyhow::Result<File> {
180 OpenOptions::new()
181 .append(true)
182 .create(true)
183 .open(&self.file_path)
184 .map_err(|e| anyhow::format_err!("Open file failed: {}", e))
185 }
186
187 fn _journal_append_kv_entry(&self, entry: &KvEntry) -> anyhow::Result<()> {
188 let serialized_data = to_vec(&entry)?;
190 let entry_len_bytes = serialized_data.len();
192 let serialized_data_len = to_vec(&entry_len_bytes).expect("failed to serialize entry len");
193
194 let mut file = self._get_append_journal_file()?;
195 file.write_all(&serialized_data_len)
197 .map_err(|e| anyhow::format_err!("Append file failed: {}", e))?;
198 file.write_all(&serialized_data)
200 .map_err(|e| anyhow::format_err!("Append file failed: {}", e))?;
201
202 println!("Entry hash: {:?}", entry.hash);
203 self.metadata.borrow_mut().num_entries += 1;
204 self.metadata.borrow_mut().parent_hash = entry.hash.clone();
205 self.metadata.borrow_mut().last_offset = file.stream_position()? as usize;
206 self.metadata.borrow_mut().save();
207 self.metadata.borrow_mut().refresh();
208 Ok(())
209 }
210
211 pub fn upsert(
212 &mut self,
213 label: EntryLabel,
214 key: Vec<u8>,
215 value: Vec<u8>,
216 ) -> anyhow::Result<()> {
217 let hash =
218 Self::_compute_cumulative_hash(&self.metadata.borrow().parent_hash, &key, &value);
219 let entry = KvEntry::new(
220 label.clone(),
221 key.clone(),
222 value.clone(),
223 Operation::Upsert,
224 self.metadata.borrow().last_offset,
225 hash,
226 );
227
228 self._journal_append_kv_entry(&entry)?;
229
230 match self.entries.get_mut(&label) {
231 Some(entries) => {
232 entries.insert(key, entry);
233 }
234 None => {
235 let mut new_map = IndexMap::new();
236 new_map.insert(key, entry);
237 self.entries.insert(label, new_map);
238 }
239 };
240
241 Ok(())
242 }
243
244 pub fn delete(&mut self, label: EntryLabel, key: Vec<u8>) -> anyhow::Result<()> {
245 let hash = Self::_compute_cumulative_hash(&self.metadata.borrow().parent_hash, &key, &[]);
246 let entry = KvEntry::new(
247 label.clone(),
248 key.clone(),
249 Vec::new(),
250 Operation::Delete,
251 0,
252 hash,
253 );
254
255 self._journal_append_kv_entry(&entry)?;
256
257 match self.entries.get_mut(&label) {
258 Some(entries) => {
259 entries.remove(&key);
260 }
261 None => {
262 warn!("Entry label {:?} not found", label);
263 }
264 };
265
266 Ok(())
267 }
268
269 pub fn refresh_ledger(mut self) -> Self {
270 self.entries.clear();
271 self.entry_hash2offset.clear();
272 self.metadata.borrow_mut().refresh();
273
274 if !self.file_path.exists() {
276 return self;
277 }
278
279 let mut entries_hash2offset = IndexMap::new();
280
281 for entry in self.iter_raw().collect::<Vec<_>>() {
282 let entries = match self.entries.get_mut(&entry.label) {
285 Some(entries) => entries,
286 None => {
287 let new_map = IndexMap::new();
288 self.entries.insert(entry.label.clone(), new_map);
289 self.entries.get_mut(&entry.label).unwrap()
290 }
291 };
292
293 match &entry.operation {
294 Operation::Upsert => {
295 entries.insert(entry.key.clone(), entry.clone());
296 entries_hash2offset.insert(entry.hash, entry.file_offset);
297 }
298 Operation::Delete => {
299 entries.remove(&entry.key);
300 entries_hash2offset.remove(&entry.hash);
301 }
302 }
303 }
304
305 self.entry_hash2offset = entries_hash2offset;
306
307 self
308 }
309
310 pub fn iter(&self, label: Option<EntryLabel>) -> impl Iterator<Item = &KvEntry> {
311 self.entries
312 .iter()
313 .filter(|(entry_label, _entry)| match &label {
314 Some(label) => entry_label == &label,
315 None => true,
316 })
317 .map(|(_, entry)| entry)
318 .flat_map(|entry| entry.values())
319 .collect::<Vec<_>>()
320 .into_iter()
321 }
322
323 pub fn iter_raw(&self) -> impl Iterator<Item = KvEntry> + '_ {
324 let file = OpenOptions::new()
325 .read(true)
326 .write(true)
327 .create(true)
328 .open(&self.file_path)
329 .expect("failed to open ledger file");
330 let mmap = unsafe { Mmap::map(&file).unwrap() };
331 self.metadata.borrow_mut().refresh();
332 let cursor = Cursor::new(mmap);
333
334 info!("Num entries: {}", self.metadata.borrow().num_entries);
335 let iterator =
339 (0..self.metadata.borrow().num_entries).scan((cursor, Vec::new()), |state, _| {
340 let (cursor, parent_hash) = state;
341 let mut slice_begin = cursor.position() as usize;
342 let mut slice = &cursor.get_ref()[slice_begin..];
343
344 let entry_len_bytes = match usize::deserialize(&mut slice) {
345 Ok(len) => len,
346 Err(_) => panic!("Deserialize error"),
347 };
348
349 let size_of_usize = std::mem::size_of_val(&entry_len_bytes);
350 slice_begin = cursor.position() as usize + size_of_usize;
351 let mut slice = &cursor.get_ref()[slice_begin..];
352
353 let entry = match KvEntry::deserialize(&mut slice) {
354 Ok(entry) => entry,
355 Err(_) => panic!("Deserialize error"),
356 };
357
358 let expected_hash =
359 Self::_compute_cumulative_hash(parent_hash, &entry.key, &entry.value);
360 assert_eq!(expected_hash, entry.hash);
361 parent_hash.clear();
362 parent_hash.extend_from_slice(&entry.hash);
363
364 let seek_offset = size_of_usize + entry_len_bytes;
365 cursor
366 .seek(SeekFrom::Current(seek_offset as i64))
367 .expect("Seek error");
368
369 Some(entry)
370 });
371
372 iterator
373 }
374}
375
376#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
378pub struct Metadata {
379 pub file_name: String,
381 #[borsh(skip)]
382 pub file_path: PathBuf,
384 pub num_entries: u64,
386 pub last_offset: usize,
388 pub parent_hash: Vec<u8>,
390}
391
392impl Metadata {
393 pub fn new(data_dir: PathBuf, description: &str) -> Self {
404 let file_name = format!("{}.meta", description);
405 let mut file_path = data_dir.join(&file_name);
406 file_path.set_extension("meta");
407 let num_entries = 0;
408 let last_offset = 0;
409 let parent_hash = Vec::new();
410
411 Metadata {
412 file_name,
413 file_path,
414 num_entries,
415 last_offset,
416 parent_hash,
417 }
418 }
419
420 pub fn save(&self) {
422 let mut file = File::create(&self.file_path).unwrap();
423 let metadata_bytes = to_vec(self).unwrap();
424 file.write_all(&metadata_bytes).unwrap();
425 }
426
427 pub fn refresh(&mut self) {
429 if !self.file_path.exists() {
430 warn!(
431 "Metadata refresh: file {} does not exist",
432 self.file_path.display()
433 );
434 return;
435 }
436 let mut file = File::open(&self.file_path).unwrap();
437 let mut metadata_bytes = Vec::new();
438 file.read_to_end(&mut metadata_bytes).unwrap_or_else(|_| {
439 panic!(
440 "Metadata refresh: failed to read file {}",
441 self.file_path.display()
442 )
443 });
444
445 let deserialized_metadata: Metadata = from_slice::<Metadata>(&metadata_bytes).unwrap();
446 self.num_entries = deserialized_metadata.num_entries;
447 self.last_offset = deserialized_metadata.last_offset;
448 self.parent_hash = deserialized_metadata.parent_hash;
449 info!(
450 "Read metadata of num_entries {} last_offset {}",
451 self.num_entries, self.last_offset
452 );
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use fs_err::File;
460 use std::io::{Read, Write};
461 use tempfile::tempdir;
462
463 #[test]
464 fn test_save() {
465 let temp_dir = tempfile::tempdir().unwrap();
467 let file_path = temp_dir.path().join("metadata.bin");
468
469 let metadata = Metadata {
471 file_name: String::from("metadata.bin"),
472 file_path: file_path.clone(),
473 num_entries: 10,
474 last_offset: 0,
475 parent_hash: vec![0, 1, 2, 3],
476 };
477
478 metadata.save();
480
481 let mut file = File::open(file_path).unwrap();
483 let mut metadata_bytes = Vec::new();
484 file.read_to_end(&mut metadata_bytes).unwrap();
485
486 let deserialized_metadata: Metadata = from_slice::<Metadata>(&metadata_bytes).unwrap();
488
489 assert_eq!(deserialized_metadata.file_name, "metadata.bin");
491 assert_eq!(deserialized_metadata.num_entries, 10);
492 assert_eq!(deserialized_metadata.parent_hash, vec![0, 1, 2, 3]);
493 }
494
495 #[test]
496 fn test_refresh() {
497 let temp_dir = tempfile::tempdir().unwrap();
499 let file_path = temp_dir.path().join("metadata.bin");
500
501 let mut metadata = Metadata {
503 file_name: String::from("metadata.bin"),
504 file_path: file_path.clone(),
505 num_entries: 0,
506 last_offset: 0,
507 parent_hash: Vec::new(),
508 };
509
510 let serialized_metadata: Vec<u8> = to_vec(&metadata).unwrap();
512 let mut file = File::create(&file_path).unwrap();
513 file.write_all(&serialized_metadata).unwrap();
514
515 metadata.refresh();
517
518 assert_eq!(metadata.num_entries, 0);
520 assert_eq!(metadata.parent_hash, Vec::new());
521 }
522
523 fn new_temp_ledger() -> LedgerKV {
524 let data_dir = tempdir().unwrap().into_path();
525 let file_name = "test.bin";
526 LedgerKV::new(data_dir.clone(), file_name)
527 }
528
529 #[test]
530 fn test_compute_cumulative_hash() {
531 let parent_hash = vec![0, 1, 2, 3];
532 let key = vec![4, 5, 6, 7];
533 let value = vec![8, 9, 10, 11];
534 let cumulative_hash = LedgerKV::_compute_cumulative_hash(&parent_hash, &key, &value);
535
536 assert_eq!(
538 cumulative_hash,
539 vec![
540 255, 243, 169, 188, 221, 55, 54, 61, 112, 60, 28, 79, 149, 18, 83, 54, 134, 21,
541 120, 104, 240, 212, 241, 106, 15, 2, 208, 241, 218, 36, 249, 162
542 ]
543 );
544 }
545
546 #[test]
547 fn test_get_append_journal_file() {
548 let ledger_kv = new_temp_ledger();
549 let result = ledger_kv._get_append_journal_file();
550 assert!(result.is_ok());
551 }
552
553 #[test]
554 fn test_upsert() {
555 let mut ledger_kv = new_temp_ledger();
556
557 let key = vec![1, 2, 3];
559 let value = vec![4, 5, 6];
560 ledger_kv
561 .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
562 .unwrap();
563 let entries = ledger_kv.entries.get(&EntryLabel::Unspecified).unwrap();
564 assert_eq!(
565 entries.get(&key),
566 Some(&KvEntry::new(
567 EntryLabel::Unspecified,
568 key,
569 value,
570 Operation::Upsert,
571 0,
572 ledger_kv.metadata.borrow().parent_hash.clone()
573 ))
574 );
575 assert_eq!(ledger_kv.metadata.borrow().num_entries, 1);
576 }
577
578 #[test]
579 fn test_upsert_with_matching_entry_label() {
580 let mut ledger_kv = new_temp_ledger();
581
582 let key = vec![1, 2, 3];
583 let value = vec![4, 5, 6];
584 ledger_kv
585 .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
586 .unwrap();
587 let entries = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
588 assert_eq!(
589 entries.get(&key),
590 Some(&KvEntry::new(
591 EntryLabel::NodeProvider,
592 key.clone(),
593 value.clone(),
594 Operation::Upsert,
595 0,
596 ledger_kv.metadata.borrow().parent_hash.clone()
597 ))
598 );
599 }
600
601 #[test]
602 fn test_upsert_with_mismatched_entry_type() {
603 let mut ledger_kv = new_temp_ledger();
604
605 let key = vec![1, 2, 3];
606 let value = vec![4, 5, 6];
607 ledger_kv
608 .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
609 .unwrap();
610
611 assert_eq!(ledger_kv.entries.get(&EntryLabel::NodeProvider), None);
613 }
614
615 #[test]
616 fn test_delete_with_matching_entry_type() {
617 let mut ledger_kv = new_temp_ledger();
618
619 let key = vec![1, 2, 3];
620 let value = vec![4, 5, 6];
621 ledger_kv
622 .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
623 .unwrap();
624 ledger_kv
625 .delete(EntryLabel::NodeProvider, key.clone())
626 .unwrap();
627
628 let entries = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
630 assert_eq!(entries.get(&key), None);
631 }
632
633 #[test]
634 fn test_delete_with_mismatched_entry_type() {
635 let mut ledger_kv = new_temp_ledger();
636
637 let key = vec![1, 2, 3];
638 let value = vec![4, 5, 6];
639 ledger_kv
640 .upsert(EntryLabel::NodeProvider, key.clone(), value.clone())
641 .unwrap();
642 let expected_hash = ledger_kv.metadata.borrow().parent_hash.clone();
643 ledger_kv
644 .delete(EntryLabel::Unspecified, key.clone())
645 .unwrap();
646
647 let entries_np = ledger_kv.entries.get(&EntryLabel::NodeProvider).unwrap();
649 assert_eq!(
650 entries_np.get(&key),
651 Some(&KvEntry::new(
652 EntryLabel::NodeProvider,
653 key.clone(),
654 value.clone(),
655 Operation::Upsert,
656 0,
657 expected_hash
658 ))
659 );
660 assert_eq!(ledger_kv.entries.get(&EntryLabel::Unspecified), None);
661 }
662 #[test]
663 fn test_delete() {
664 let mut ledger_kv = new_temp_ledger();
665
666 let key = vec![1, 2, 3];
668 let value = vec![4, 5, 6];
669 ledger_kv
670 .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
671 .unwrap();
672 ledger_kv
673 .delete(EntryLabel::Unspecified, key.clone())
674 .unwrap();
675 let entries = ledger_kv.entries.get(&EntryLabel::Unspecified).unwrap();
676 assert_eq!(entries.get(&key), None);
677 }
678
679 #[test]
680 fn test_refresh_ledger() {
681 let mut ledger_kv = new_temp_ledger();
682
683 let key = vec![1, 2, 3];
685 let value = vec![4, 5, 6];
686 ledger_kv
687 .upsert(EntryLabel::Unspecified, key.clone(), value.clone())
688 .unwrap();
689 let parent_hash = ledger_kv.metadata.borrow().parent_hash.clone();
690 fs::remove_file(ledger_kv.file_path.clone()).unwrap();
691 ledger_kv = ledger_kv.refresh_ledger();
692
693 assert_eq!(ledger_kv.entries.get(&EntryLabel::Unspecified), None);
694 assert_eq!(ledger_kv.metadata.borrow().parent_hash, parent_hash);
695 }
696}