1use std::{fmt::Debug, ops::RangeInclusive, path::Path, sync::Arc};
2
3use bytestring::ByteString;
4use fjall::{Batch, Instant, KvSeparationOptions, PartitionCreateOptions};
5use graft_core::{
6 PageCount, PageIdx, SegmentId, VolumeId,
7 checkpoints::CachedCheckpoints,
8 checksum::{Checksum, ChecksumBuilder},
9 commit::{Commit, SegmentIdx, SegmentRangeRef},
10 commit_hash::CommitHash,
11 lsn::{LSN, LSNRangeExt, LSNSet},
12 page::Page,
13 pageset::PageSet,
14 volume_ref::VolumeRef,
15};
16use parking_lot::{Mutex, MutexGuard};
17use tryiter::TryIteratorExt;
18
19use crate::{
20 LogicalErr,
21 graft::{Graft, PendingCommit, SyncPoint},
22 local::fjall_storage::{
23 keys::PageKey,
24 typed_partition::{TypedPartition, TypedPartitionSnapshot, fjall_batch_ext::FjallBatchExt},
25 },
26 snapshot::Snapshot,
27};
28
29use culprit::{Result, ResultExt};
30
31mod fjall_repr;
32pub mod keys;
33mod typed_partition;
34mod values;
35
36#[derive(Debug, thiserror::Error)]
37pub enum FjallStorageErr {
38 #[error("Fjall error: {0}")]
39 FjallErr(#[from] fjall::Error),
40
41 #[error("Fjall LSM Tree error: {0}")]
42 LsmTreeErr(#[from] lsm_tree::Error),
43
44 #[error("Failed to decode key: {0}")]
45 DecodeErr(#[from] fjall_repr::DecodeErr),
46
47 #[error("I/O Error: {0}")]
48 IoErr(#[from] std::io::Error),
49
50 #[error("batch commit precondition failed")]
51 BatchPreconditionErr,
52
53 #[error(transparent)]
54 LogicalErr(#[from] LogicalErr),
55}
56
57pub struct FjallStorage {
58 keyspace: fjall::Keyspace,
59
60 tags: TypedPartition<ByteString, VolumeId>,
63
64 grafts: TypedPartition<VolumeId, Graft>,
68
69 checkpoints: TypedPartition<VolumeId, CachedCheckpoints>,
72
73 log: TypedPartition<VolumeRef, Commit>,
76
77 pages: TypedPartition<PageKey, Page>,
80
81 lock: Arc<Mutex<()>>,
87}
88
89impl Debug for FjallStorage {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("FjallStorage").finish()
92 }
93}
94
95impl FjallStorage {
96 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, FjallStorageErr> {
97 Self::open_config(fjall::Config::new(path))
98 }
99
100 pub fn open_temporary() -> Result<Self, FjallStorageErr> {
101 let path = tempfile::tempdir()?.keep();
102 Self::open_config(fjall::Config::new(path).temporary(true))
103 }
104
105 fn open_config(config: fjall::Config) -> Result<Self, FjallStorageErr> {
106 let keyspace = config.open()?;
107 let tags = TypedPartition::open(&keyspace, "tags", Default::default())?;
108 let grafts = TypedPartition::open(&keyspace, "grafts", Default::default())?;
109 let checkpoints = TypedPartition::open(&keyspace, "checkpoints", Default::default())?;
110 let log = TypedPartition::open(&keyspace, "log", Default::default())?;
111 let pages = TypedPartition::open(
112 &keyspace,
113 "pages",
114 PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
115 )?;
116
117 Ok(Self {
118 keyspace,
119 tags,
120 grafts,
121 checkpoints,
122 log,
123 pages,
124 lock: Default::default(),
125 })
126 }
127
128 pub(crate) fn read(&self) -> ReadGuard<'_> {
129 ReadGuard::open(self)
130 }
131
132 pub(crate) fn batch(&self) -> WriteBatch<'_> {
133 WriteBatch::open(self)
134 }
135
136 pub(crate) fn read_write(&self) -> ReadWriteGuard<'_> {
140 ReadWriteGuard::open(self)
141 }
142
143 pub fn write_page(
144 &self,
145 sid: SegmentId,
146 pageidx: PageIdx,
147 page: Page,
148 ) -> Result<(), FjallStorageErr> {
149 self.pages
150 .insert(PageKey::new(sid, pageidx), page)
151 .or_into_ctx()
152 }
153
154 pub fn remove_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<(), FjallStorageErr> {
155 self.pages.remove(PageKey::new(sid, pageidx)).or_into_ctx()
156 }
157
158 pub fn remove_page_range(
159 &self,
160 sid: &SegmentId,
161 pages: RangeInclusive<PageIdx>,
162 ) -> Result<(), FjallStorageErr> {
163 let keyrange =
165 PageKey::new(sid.clone(), *pages.end())..=PageKey::new(sid.clone(), *pages.start());
166 let mut batch = self.keyspace.batch();
167 let mut iter = self.pages.snapshot().range(keyrange);
168 while let Some((key, _)) = iter.try_next()? {
169 batch.remove_typed(&self.pages, key);
170 }
171 batch.commit()?;
172 Ok(())
173 }
174
175 pub fn tag_delete(&self, tag: &str) -> Result<(), FjallStorageErr> {
176 self.tags.remove(tag.into())
177 }
178
179 pub fn graft_delete(&self, graft: &VolumeId) -> Result<(), FjallStorageErr> {
180 self.grafts.remove(graft.clone())
181 }
182
183 pub fn write_checkpoints(
184 &self,
185 vid: VolumeId,
186 checkpoints: CachedCheckpoints,
187 ) -> Result<(), FjallStorageErr> {
188 self.checkpoints.insert(vid, checkpoints)
189 }
190
191 pub fn graft_from_snapshot(&self, snapshot: &Snapshot) -> Result<Graft, FjallStorageErr> {
192 let graft = Graft::new(VolumeId::random(), VolumeId::random(), None, None);
193 let commits = self
194 .read()
195 .commits(snapshot)
196 .collect::<Result<Vec<_>, _>>()?;
197 let mut lsn = LSN::FIRST.checked_add(commits.len() as u64).unwrap();
198 let mut batch = self.batch();
199 for commit in commits {
200 lsn = lsn.checked_prev().unwrap();
201 batch.write_commit(commit.with_vid(graft.local.clone()).with_lsn(lsn));
202 }
203 batch.write_graft(graft.clone());
204 batch.commit()?;
205 Ok(graft)
206 }
207}
208
209pub struct ReadGuard<'a> {
210 storage: &'a FjallStorage,
211 seqno: Instant,
212}
213
214impl Drop for ReadGuard<'_> {
215 fn drop(&mut self) {
216 self.storage.keyspace.snapshot_tracker.close(self.seqno);
218 }
219}
220
221impl<'a> ReadGuard<'a> {
222 fn open(storage: &'a FjallStorage) -> ReadGuard<'a> {
223 let seqno = storage.keyspace.instant();
224 storage.keyspace.snapshot_tracker.open(seqno);
226 Self { storage, seqno }
227 }
228
229 #[inline]
230 fn _tags(&self) -> TypedPartitionSnapshot<ByteString, VolumeId> {
231 self.storage.tags.snapshot_at(self.seqno)
232 }
233
234 #[inline]
235 fn _grafts(&self) -> TypedPartitionSnapshot<VolumeId, Graft> {
236 self.storage.grafts.snapshot_at(self.seqno)
237 }
238
239 #[inline]
240 fn _checkpoints(&self) -> TypedPartitionSnapshot<VolumeId, CachedCheckpoints> {
241 self.storage.checkpoints.snapshot_at(self.seqno)
242 }
243
244 #[inline]
245 fn _log(&self) -> TypedPartitionSnapshot<VolumeRef, Commit> {
246 self.storage.log.snapshot_at(self.seqno)
247 }
248
249 #[inline]
250 fn _pages(&self) -> TypedPartitionSnapshot<PageKey, Page> {
251 self.storage.pages.snapshot_at(self.seqno)
252 }
253
254 pub fn iter_tags(
255 &self,
256 ) -> impl Iterator<Item = Result<(ByteString, VolumeId), FjallStorageErr>> + use<> {
257 self._tags().range(..)
258 }
259
260 pub fn tag_exists(&self, tag: &str) -> Result<bool, FjallStorageErr> {
261 self._tags().contains(tag)
262 }
263
264 pub fn get_tag(&self, tag: &str) -> Result<Option<VolumeId>, FjallStorageErr> {
265 self._tags().get(tag)
266 }
267
268 pub fn latest_lsn(&self, vid: &VolumeId) -> Result<Option<LSN>, FjallStorageErr> {
270 Ok(self._log().first(vid)?.map(|(vref, _)| vref.lsn))
271 }
272
273 pub fn iter_grafts(&self) -> impl Iterator<Item = Result<Graft, FjallStorageErr>> + use<> {
274 self._grafts().values()
275 }
276
277 pub fn graft_exists(&self, graft: &VolumeId) -> Result<bool, FjallStorageErr> {
278 self._grafts().contains(graft)
279 }
280
281 pub fn graft(&self, vid: &VolumeId) -> Result<Graft, FjallStorageErr> {
282 self._grafts()
283 .get(vid)?
284 .ok_or_else(|| LogicalErr::GraftNotFound(vid.clone()).into())
285 }
286
287 pub fn is_latest_snapshot(
290 &self,
291 graft: &VolumeId,
292 snapshot: &Snapshot,
293 ) -> Result<bool, FjallStorageErr> {
294 let graft = self.graft(graft)?;
295 let latest_local = self.latest_lsn(&graft.local)?;
296
297 Ok(match snapshot.head() {
302 Some((vid, lsn)) if vid == &graft.local => Some(lsn) == latest_local,
303
304 Some((vid, lsn)) if vid == &graft.remote => {
305 if let Some(sync) = graft.sync {
306 lsn == sync.remote && sync.local_watermark == latest_local
307 } else {
308 false
312 }
313 }
314
315 Some(_) => false,
317
318 None => latest_local.is_none() && graft.sync().is_none(),
320 })
321 }
322
323 pub fn snapshot(&self, graft: &VolumeId) -> Result<Snapshot, FjallStorageErr> {
325 let graft = self.graft(graft)?;
326
327 let mut snapshot = Snapshot::EMPTY;
328
329 if let Some(latest) = self.latest_lsn(&graft.local)? {
330 if let Some(watermark) = graft.sync().and_then(|s| s.local_watermark) {
331 if watermark < latest {
332 snapshot.append(graft.local, watermark..=latest);
333 }
334 } else {
335 snapshot.append(graft.local, LSN::FIRST..=latest);
336 }
337 }
338
339 if let Some(remote) = graft.sync.map(|s| s.remote) {
340 snapshot.append(graft.remote, LSN::FIRST..=remote);
341 }
342
343 Ok(snapshot)
344 }
345
346 pub fn get_commit(&self, vid: &VolumeId, lsn: LSN) -> Result<Option<Commit>, FjallStorageErr> {
348 self._log().get_owned(VolumeRef::new(vid.clone(), lsn))
349 }
350
351 pub fn commits(
354 &self,
355 snapshot: &Snapshot,
356 ) -> impl Iterator<Item = Result<Commit, FjallStorageErr>> {
357 let log = self._log();
358
359 snapshot.iter().flat_map(move |entry| {
360 let low = entry.start_ref();
364 let high = entry.end_ref();
365 let range = high..=low;
366 log.range(range).map_ok(|(_, commit)| Ok(commit))
367 })
368 }
369
370 pub fn iter_visible_pages(
373 &self,
374 snapshot: &Snapshot,
375 ) -> impl Iterator<Item = Result<(SegmentIdx, PageSet), FjallStorageErr>> {
376 let mut pages = PageSet::FULL;
379 let mut page_count = PageCount::MAX;
381
382 self.commits(snapshot).try_filter_map(move |commit| {
383 if pages.is_empty() {
385 return Ok(None);
386 }
387
388 if commit.page_count < page_count {
391 page_count = commit.page_count;
392 pages.truncate(page_count);
393 }
394
395 if let Some(idx) = commit.segment_idx {
396 let mut commit_pages = idx.pageset.clone();
397
398 if commit_pages.last().map(|idx| idx.pages()) > Some(page_count) {
399 commit_pages.truncate(page_count);
401 }
402
403 let outstanding = pages.cut(&commit_pages);
405
406 if !outstanding.is_empty() {
407 return Ok(Some((idx, outstanding)));
408 }
409 }
410
411 Ok(None)
412 })
413 }
414
415 pub fn lsns(
418 &self,
419 vid: &VolumeId,
420 lsns: &RangeInclusive<LSN>,
421 ) -> Result<LSNSet, FjallStorageErr> {
422 let low = VolumeRef::new(vid.clone(), *lsns.start());
425 let high = VolumeRef::new(vid.clone(), *lsns.end());
426 let range = high..=low;
427 self._log()
428 .range_keys(range)
429 .map_ok(|key| Ok(key.lsn()))
430 .collect()
431 }
432
433 pub fn search_page(
434 &self,
435 snapshot: &Snapshot,
436 pageidx: PageIdx,
437 ) -> Result<Option<Commit>, FjallStorageErr> {
438 let mut commits = self.commits(snapshot);
439
440 while let Some(commit) = commits.try_next()? {
441 if !commit.page_count().contains(pageidx) {
442 break;
446 }
447
448 let Some(idx) = commit.segment_idx() else {
449 continue;
451 };
452
453 if !idx.contains(pageidx) {
454 continue;
456 }
457
458 return Ok(Some(commit));
459 }
460 Ok(None)
461 }
462
463 pub fn has_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<bool, FjallStorageErr> {
464 self._pages().contains(&PageKey::new(sid, pageidx))
465 }
466
467 pub fn read_page(
468 &self,
469 sid: SegmentId,
470 pageidx: PageIdx,
471 ) -> Result<Option<Page>, FjallStorageErr> {
472 self._pages()
473 .get_owned(PageKey::new(sid, pageidx))
474 .or_into_ctx()
475 }
476
477 pub fn page_count(
478 &self,
479 vid: &VolumeId,
480 lsn: LSN,
481 ) -> Result<Option<PageCount>, FjallStorageErr> {
482 Ok(self.get_commit(vid, lsn)?.map(|c| c.page_count()))
483 }
484
485 pub fn checkpoints(
486 &self,
487 vid: &VolumeId,
488 ) -> Result<Option<CachedCheckpoints>, FjallStorageErr> {
489 self._checkpoints().get(vid)
490 }
491
492 pub fn checksum(&self, snapshot: &Snapshot) -> Result<Checksum, FjallStorageErr> {
493 let pages = self._pages();
494 let mut builder = ChecksumBuilder::new();
495 let mut iter = self.iter_visible_pages(snapshot);
496 while let Some((idx, pageset)) = iter.try_next()? {
497 for pageidx in pageset.iter() {
498 let key = PageKey::new(idx.sid.clone(), pageidx);
499 if let Some(page) = pages.get(&key)? {
500 builder.write(&page);
501 }
502 }
503 }
504 Ok(builder.build())
505 }
506
507 pub fn find_missing_frames(
508 &self,
509 snapshot: &Snapshot,
510 ) -> Result<Vec<SegmentRangeRef>, FjallStorageErr> {
511 let mut missing_frames = vec![];
512 let pages = self._pages();
513 let mut iter = self.iter_visible_pages(snapshot);
514 while let Some((idx, pageset)) = iter.try_next()? {
515 let frames = idx.iter_frames(|pages| pageset.contains_any(pages));
517
518 for frame in frames {
522 if let Some(first_page) = frame.pageset.first()
523 && !pages.contains(&PageKey::new(frame.sid.clone(), first_page))?
524 {
525 missing_frames.push(frame);
526 }
527 }
528 }
529 Ok(missing_frames)
530 }
531}
532
533pub struct WriteBatch<'a> {
534 storage: &'a FjallStorage,
535 batch: Batch,
536}
537
538impl<'a> WriteBatch<'a> {
539 fn open(storage: &'a FjallStorage) -> Self {
540 Self { storage, batch: storage.keyspace.batch() }
541 }
542
543 pub fn write_tag(&mut self, tag: &str, graft: VolumeId) {
544 self.batch
545 .insert_typed(&self.storage.tags, tag.into(), graft);
546 }
547
548 pub fn write_commit(&mut self, commit: Commit) {
549 self.batch
550 .insert_typed(&self.storage.log, commit.vref(), commit);
551 }
552
553 pub fn write_graft(&mut self, graft: Graft) {
554 self.batch
555 .insert_typed(&self.storage.grafts, graft.local.clone(), graft);
556 }
557
558 pub fn write_page(&mut self, sid: SegmentId, pageidx: PageIdx, page: Page) {
559 self.batch
560 .insert_typed(&self.storage.pages, PageKey::new(sid, pageidx), page);
561 }
562
563 pub fn commit(self) -> Result<(), FjallStorageErr> {
564 self.batch.commit().or_into_ctx()
565 }
566}
567
568pub struct ReadWriteGuard<'a> {
569 _permit: MutexGuard<'a, ()>,
570 read: ReadGuard<'a>,
571}
572
573impl<'a> ReadWriteGuard<'a> {
574 fn open(storage: &'a FjallStorage) -> Self {
575 let _permit = storage.lock.lock();
577 let read = storage.read();
579 Self { _permit, read }
580 }
581
582 fn storage(&self) -> &'a FjallStorage {
583 self.read.storage
584 }
585
586 pub fn tag_replace(
587 &self,
588 tag: &str,
589 graft: VolumeId,
590 ) -> Result<Option<VolumeId>, FjallStorageErr> {
591 let out = self.read.get_tag(tag)?;
592 self.storage().tags.insert(tag.into(), graft)?;
593 Ok(out)
594 }
595
596 pub fn graft_open(
600 self,
601 graft: Option<VolumeId>,
602 remote: Option<VolumeId>,
603 ) -> Result<Graft, FjallStorageErr> {
604 let local = graft.unwrap_or_else(VolumeId::random);
606
607 if let Some(graft) = self.read._grafts().get(&local)? {
609 if let Some(remote) = remote
610 && graft.remote != remote
611 {
612 return Err(LogicalErr::GraftRemoteMismatch {
613 graft: graft.local,
614 expected: remote,
615 actual: graft.remote,
616 }
617 .into());
618 }
619 return Ok(graft);
620 }
621
622 let remote = remote.unwrap_or_else(VolumeId::random);
624
625 let sync = self
628 .read
629 .latest_lsn(&remote)?
630 .map(|latest_remote| SyncPoint {
631 remote: latest_remote,
632 local_watermark: None,
633 });
634
635 let graft = Graft::new(local.clone(), remote, sync, None);
637 self.storage().grafts.insert(local, graft.clone())?;
638
639 tracing::debug!(
640 local_vid = ?graft.local,
641 remote_vid = ?graft.remote,
642 "open graft"
643 );
644
645 Ok(graft)
646 }
647
648 pub fn commit(
652 self,
653 graft: &VolumeId,
654 snapshot: Snapshot,
655 page_count: PageCount,
656 segment: SegmentIdx,
657 ) -> Result<Snapshot, FjallStorageErr> {
658 if !self.read.is_latest_snapshot(graft, &snapshot)? {
661 return Err(LogicalErr::GraftConcurrentWrite(graft.clone()).into());
662 }
663
664 let graft = self.read.graft(graft)?;
665
666 let commit_lsn = self
668 .read
669 .latest_lsn(&graft.local)?
670 .map_or(LSN::FIRST, |lsn| lsn.next());
671
672 tracing::debug!(vid=?graft.local, %commit_lsn, "local commit");
673
674 let commit = Commit::new(graft.local.clone(), commit_lsn, page_count)
675 .with_segment_idx(Some(segment));
676
677 self.read.storage.log.insert(commit.vref(), commit)?;
679
680 ReadGuard::open(self.storage()).snapshot(&graft.local)
682 }
683
684 pub fn remote_commit_prepare(
687 self,
688 graft: &VolumeId,
689 pending_commit: PendingCommit,
690 ) -> Result<(), FjallStorageErr> {
691 let graft = self.read.graft(graft)?;
692
693 assert!(
694 graft.pending_commit().is_none(),
695 "BUG: pending commit is present"
696 );
697
698 if let Some(local_watermark) = graft.local_watermark() {
700 assert!(
701 local_watermark < pending_commit.local,
702 "BUG: local_watermark monotonicity violation"
703 );
704 }
705 let latest_remote = self.read.latest_lsn(&graft.remote)?;
706 assert_eq!(
707 latest_remote,
708 pending_commit.commit.checked_prev(),
709 "BUG: remote lsn monotonicity violation"
710 );
711
712 assert!(pending_commit.commit_hash != CommitHash::ZERO);
714
715 let graft = graft.with_pending_commit(Some(pending_commit));
717 self.storage().grafts.insert(graft.local.clone(), graft)?;
718
719 Ok(())
720 }
721
722 pub fn remote_commit_success(
725 self,
726 graft: &VolumeId,
727 remote_commit: Commit,
728 ) -> Result<(), FjallStorageErr> {
729 let graft = self.read.graft(graft)?;
730
731 let pending_commit = graft.pending_commit.unwrap();
733 assert_eq!(remote_commit.lsn(), pending_commit.commit);
734 assert_eq!(
735 remote_commit.commit_hash(),
736 Some(&pending_commit.commit_hash)
737 );
738
739 assert!(
741 !self.read._log().contains(&remote_commit.vref())?,
742 "BUG: remote commit already exists"
743 );
744
745 let updated_graft = Graft {
747 sync: Some(pending_commit.into()),
748 pending_commit: None,
749 ..graft
750 };
751
752 let mut batch = self.storage().batch();
753 batch.write_commit(remote_commit);
754 batch.write_graft(updated_graft);
755 batch.commit()
756 }
757
758 pub fn drop_pending_commit(self, graft: &VolumeId) -> Result<(), FjallStorageErr> {
761 let graft = self.read.graft(graft)?;
762 self.storage()
763 .grafts
764 .insert(graft.local.clone(), graft.with_pending_commit(None))
765 }
766
767 pub fn sync_remote_to_local(self, graft: VolumeId) -> Result<(), FjallStorageErr> {
768 let graft = self.read.graft(&graft)?;
769
770 let latest_remote = self.read.latest_lsn(&graft.remote).or_into_ctx()?;
772 let Some(remote_changes) = graft.remote_changes(latest_remote) else {
773 return Ok(());
775 };
776
777 let latest_local = self.read.latest_lsn(&graft.local).or_into_ctx()?;
779 if graft.local_changes(latest_local).is_some() {
780 let status = graft.status(latest_local, latest_remote);
782 tracing::debug!("graft {} has diverged; status=`{status}`", graft.local);
783 return Err(LogicalErr::GraftDiverged(graft.local).into());
784 }
785
786 tracing::debug!(
787 sync = ?graft.sync(),
788 lsns = %remote_changes.to_string(),
789 remote = ?graft.remote,
790 local = ?graft.local,
791 "fast-forwarding graft to its remote"
792 );
793
794 let remote_lsn = *remote_changes.end();
797
798 let new_sync = match graft.sync() {
799 Some(sync) => {
800 assert!(
801 remote_lsn > sync.remote,
802 "BUG: attempt to sync graft to older version of the remote"
803 );
804 SyncPoint {
805 remote: remote_lsn,
806 local_watermark: sync.local_watermark,
807 }
808 }
809 None => SyncPoint {
810 remote: remote_lsn,
811 local_watermark: None,
812 },
813 };
814
815 self.storage()
817 .grafts
818 .insert(graft.local.clone(), graft.with_sync(Some(new_sync)))
819 }
820}