graft_kernel/local/
fjall_storage.rs

1use std::{fmt::Debug, ops::RangeInclusive, path::Path, sync::Arc};
2
3use bytestring::ByteString;
4use fjall::{Batch, Instant, KvSeparationOptions, PartitionCreateOptions};
5use graft_core::{
6    PageCount, PageIdx, SegmentId, VolumeId,
7    checkpoints::CachedCheckpoints,
8    checksum::{Checksum, ChecksumBuilder},
9    commit::{Commit, SegmentIdx, SegmentRangeRef},
10    commit_hash::CommitHash,
11    lsn::{LSN, LSNRangeExt, LSNSet},
12    page::Page,
13    pageset::PageSet,
14    volume_ref::VolumeRef,
15};
16use parking_lot::{Mutex, MutexGuard};
17use tryiter::TryIteratorExt;
18
19use crate::{
20    LogicalErr,
21    graft::{Graft, PendingCommit, SyncPoint},
22    local::fjall_storage::{
23        keys::PageKey,
24        typed_partition::{TypedPartition, TypedPartitionSnapshot, fjall_batch_ext::FjallBatchExt},
25    },
26    snapshot::Snapshot,
27};
28
29use culprit::{Result, ResultExt};
30
31mod fjall_repr;
32pub mod keys;
33mod typed_partition;
34mod values;
35
36#[derive(Debug, thiserror::Error)]
37pub enum FjallStorageErr {
38    #[error("Fjall error: {0}")]
39    FjallErr(#[from] fjall::Error),
40
41    #[error("Fjall LSM Tree error: {0}")]
42    LsmTreeErr(#[from] lsm_tree::Error),
43
44    #[error("Failed to decode key: {0}")]
45    DecodeErr(#[from] fjall_repr::DecodeErr),
46
47    #[error("I/O Error: {0}")]
48    IoErr(#[from] std::io::Error),
49
50    #[error("batch commit precondition failed")]
51    BatchPreconditionErr,
52
53    #[error(transparent)]
54    LogicalErr(#[from] LogicalErr),
55}
56
57pub struct FjallStorage {
58    keyspace: fjall::Keyspace,
59
60    /// This partition allows grafts to be identified by a tag.
61    /// The graft a tag points at can be changed.
62    tags: TypedPartition<ByteString, VolumeId>,
63
64    /// This partition stores state regarding each `Graft`
65    /// keyed by its Local Volume ID
66    /// {`VolumeId`} -> `GraftState`
67    grafts: TypedPartition<VolumeId, Graft>,
68
69    /// This partition stores `CachedCheckpoints` for each Volume
70    /// {vid} -> `CachedCheckpoints`
71    checkpoints: TypedPartition<VolumeId, CachedCheckpoints>,
72
73    /// This partition stores commits
74    /// {vid} / {lsn} -> Commit
75    log: TypedPartition<VolumeRef, Commit>,
76
77    /// This partition stores Pages
78    /// {sid} / {pageidx} -> Page
79    pages: TypedPartition<PageKey, Page>,
80
81    /// Must be held while performing read+write transactions.
82    /// Read-only and write-only transactions don't need to hold the lock as
83    /// long as they are safe:
84    /// To make read-only txns safe, use the same snapshot for all reads
85    /// To make write-only txns safe, they must be monotonic
86    lock: Arc<Mutex<()>>,
87}
88
89impl Debug for FjallStorage {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("FjallStorage").finish()
92    }
93}
94
95impl FjallStorage {
96    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, FjallStorageErr> {
97        Self::open_config(fjall::Config::new(path))
98    }
99
100    pub fn open_temporary() -> Result<Self, FjallStorageErr> {
101        let path = tempfile::tempdir()?.keep();
102        Self::open_config(fjall::Config::new(path).temporary(true))
103    }
104
105    fn open_config(config: fjall::Config) -> Result<Self, FjallStorageErr> {
106        let keyspace = config.open()?;
107        let tags = TypedPartition::open(&keyspace, "tags", Default::default())?;
108        let grafts = TypedPartition::open(&keyspace, "grafts", Default::default())?;
109        let checkpoints = TypedPartition::open(&keyspace, "checkpoints", Default::default())?;
110        let log = TypedPartition::open(&keyspace, "log", Default::default())?;
111        let pages = TypedPartition::open(
112            &keyspace,
113            "pages",
114            PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
115        )?;
116
117        Ok(Self {
118            keyspace,
119            tags,
120            grafts,
121            checkpoints,
122            log,
123            pages,
124            lock: Default::default(),
125        })
126    }
127
128    pub(crate) fn read(&self) -> ReadGuard<'_> {
129        ReadGuard::open(self)
130    }
131
132    pub(crate) fn batch(&self) -> WriteBatch<'_> {
133        WriteBatch::open(self)
134    }
135
136    /// Open a read + write txn on storage.
137    /// The returned object holds a lock, any subsequent calls to `read_write`
138    /// will block.
139    pub(crate) fn read_write(&self) -> ReadWriteGuard<'_> {
140        ReadWriteGuard::open(self)
141    }
142
143    pub fn write_page(
144        &self,
145        sid: SegmentId,
146        pageidx: PageIdx,
147        page: Page,
148    ) -> Result<(), FjallStorageErr> {
149        self.pages
150            .insert(PageKey::new(sid, pageidx), page)
151            .or_into_ctx()
152    }
153
154    pub fn remove_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<(), FjallStorageErr> {
155        self.pages.remove(PageKey::new(sid, pageidx)).or_into_ctx()
156    }
157
158    pub fn remove_page_range(
159        &self,
160        sid: &SegmentId,
161        pages: RangeInclusive<PageIdx>,
162    ) -> Result<(), FjallStorageErr> {
163        // PageKeys are stored in descending order
164        let keyrange =
165            PageKey::new(sid.clone(), *pages.end())..=PageKey::new(sid.clone(), *pages.start());
166        let mut batch = self.keyspace.batch();
167        let mut iter = self.pages.snapshot().range(keyrange);
168        while let Some((key, _)) = iter.try_next()? {
169            batch.remove_typed(&self.pages, key);
170        }
171        batch.commit()?;
172        Ok(())
173    }
174
175    pub fn tag_delete(&self, tag: &str) -> Result<(), FjallStorageErr> {
176        self.tags.remove(tag.into())
177    }
178
179    pub fn graft_delete(&self, graft: &VolumeId) -> Result<(), FjallStorageErr> {
180        self.grafts.remove(graft.clone())
181    }
182
183    pub fn write_checkpoints(
184        &self,
185        vid: VolumeId,
186        checkpoints: CachedCheckpoints,
187    ) -> Result<(), FjallStorageErr> {
188        self.checkpoints.insert(vid, checkpoints)
189    }
190
191    pub fn graft_from_snapshot(&self, snapshot: &Snapshot) -> Result<Graft, FjallStorageErr> {
192        let graft = Graft::new(VolumeId::random(), VolumeId::random(), None, None);
193        let commits = self
194            .read()
195            .commits(snapshot)
196            .collect::<Result<Vec<_>, _>>()?;
197        let mut lsn = LSN::FIRST.checked_add(commits.len() as u64).unwrap();
198        let mut batch = self.batch();
199        for commit in commits {
200            lsn = lsn.checked_prev().unwrap();
201            batch.write_commit(commit.with_vid(graft.local.clone()).with_lsn(lsn));
202        }
203        batch.write_graft(graft.clone());
204        batch.commit()?;
205        Ok(graft)
206    }
207}
208
209pub struct ReadGuard<'a> {
210    storage: &'a FjallStorage,
211    seqno: Instant,
212}
213
214impl Drop for ReadGuard<'_> {
215    fn drop(&mut self) {
216        // IMPORTANT: Decrement snapshot count
217        self.storage.keyspace.snapshot_tracker.close(self.seqno);
218    }
219}
220
221impl<'a> ReadGuard<'a> {
222    fn open(storage: &'a FjallStorage) -> ReadGuard<'a> {
223        let seqno = storage.keyspace.instant();
224        // IMPORTANT: Increment snapshot count
225        storage.keyspace.snapshot_tracker.open(seqno);
226        Self { storage, seqno }
227    }
228
229    #[inline]
230    fn _tags(&self) -> TypedPartitionSnapshot<ByteString, VolumeId> {
231        self.storage.tags.snapshot_at(self.seqno)
232    }
233
234    #[inline]
235    fn _grafts(&self) -> TypedPartitionSnapshot<VolumeId, Graft> {
236        self.storage.grafts.snapshot_at(self.seqno)
237    }
238
239    #[inline]
240    fn _checkpoints(&self) -> TypedPartitionSnapshot<VolumeId, CachedCheckpoints> {
241        self.storage.checkpoints.snapshot_at(self.seqno)
242    }
243
244    #[inline]
245    fn _log(&self) -> TypedPartitionSnapshot<VolumeRef, Commit> {
246        self.storage.log.snapshot_at(self.seqno)
247    }
248
249    #[inline]
250    fn _pages(&self) -> TypedPartitionSnapshot<PageKey, Page> {
251        self.storage.pages.snapshot_at(self.seqno)
252    }
253
254    pub fn iter_tags(
255        &self,
256    ) -> impl Iterator<Item = Result<(ByteString, VolumeId), FjallStorageErr>> + use<> {
257        self._tags().range(..)
258    }
259
260    pub fn tag_exists(&self, tag: &str) -> Result<bool, FjallStorageErr> {
261        self._tags().contains(tag)
262    }
263
264    pub fn get_tag(&self, tag: &str) -> Result<Option<VolumeId>, FjallStorageErr> {
265        self._tags().get(tag)
266    }
267
268    /// Lookup the latest LSN for a volume
269    pub fn latest_lsn(&self, vid: &VolumeId) -> Result<Option<LSN>, FjallStorageErr> {
270        Ok(self._log().first(vid)?.map(|(vref, _)| vref.lsn))
271    }
272
273    pub fn iter_grafts(&self) -> impl Iterator<Item = Result<Graft, FjallStorageErr>> + use<> {
274        self._grafts().values()
275    }
276
277    pub fn graft_exists(&self, graft: &VolumeId) -> Result<bool, FjallStorageErr> {
278        self._grafts().contains(graft)
279    }
280
281    pub fn graft(&self, vid: &VolumeId) -> Result<Graft, FjallStorageErr> {
282        self._grafts()
283            .get(vid)?
284            .ok_or_else(|| LogicalErr::GraftNotFound(vid.clone()).into())
285    }
286
287    /// Check if the provided Snapshot is logically equal to the latest snapshot
288    /// for the specified Graft.
289    pub fn is_latest_snapshot(
290        &self,
291        graft: &VolumeId,
292        snapshot: &Snapshot,
293    ) -> Result<bool, FjallStorageErr> {
294        let graft = self.graft(graft)?;
295        let latest_local = self.latest_lsn(&graft.local)?;
296
297        // The complexity here is that the snapshot may have been taken before
298        // we pushed commits to a remote. When this happens, the snapshot will
299        // be physically different but logically equivalent. We can use the
300        // relationship setup by the SyncPoint to handle this case.
301        Ok(match snapshot.head() {
302            Some((vid, lsn)) if vid == &graft.local => Some(lsn) == latest_local,
303
304            Some((vid, lsn)) if vid == &graft.remote => {
305                if let Some(sync) = graft.sync {
306                    lsn == sync.remote && sync.local_watermark == latest_local
307                } else {
308                    // if graft has no sync point, then a snapshot should not
309                    // include a remote layer, thus this snapshot is from
310                    // another graft
311                    false
312                }
313            }
314
315            // Snapshot from another graft
316            Some(_) => false,
317
318            // Snapshot is empty
319            None => latest_local.is_none() && graft.sync().is_none(),
320        })
321    }
322
323    /// Load the most recent Snapshot for a Graft.
324    pub fn snapshot(&self, graft: &VolumeId) -> Result<Snapshot, FjallStorageErr> {
325        let graft = self.graft(graft)?;
326
327        let mut snapshot = Snapshot::EMPTY;
328
329        if let Some(latest) = self.latest_lsn(&graft.local)? {
330            if let Some(watermark) = graft.sync().and_then(|s| s.local_watermark) {
331                if watermark < latest {
332                    snapshot.append(graft.local, watermark..=latest);
333                }
334            } else {
335                snapshot.append(graft.local, LSN::FIRST..=latest);
336            }
337        }
338
339        if let Some(remote) = graft.sync.map(|s| s.remote) {
340            snapshot.append(graft.remote, LSN::FIRST..=remote);
341        }
342
343        Ok(snapshot)
344    }
345
346    /// Retrieve a specific commit
347    pub fn get_commit(&self, vid: &VolumeId, lsn: LSN) -> Result<Option<Commit>, FjallStorageErr> {
348        self._log().get_owned(VolumeRef::new(vid.clone(), lsn))
349    }
350
351    /// Iterates through all of the commits reachable by the provided `Snapshot`
352    /// from the newest to oldest commit.
353    pub fn commits(
354        &self,
355        snapshot: &Snapshot,
356    ) -> impl Iterator<Item = Result<Commit, FjallStorageErr>> {
357        let log = self._log();
358
359        snapshot.iter().flat_map(move |entry| {
360            // the snapshot range is in the form `low..=high` but the log orders
361            // LSNs in reverse. thus we need to flip the range when passing it
362            // down to the underlying scan.
363            let low = entry.start_ref();
364            let high = entry.end_ref();
365            let range = high..=low;
366            log.range(range).map_ok(|(_, commit)| Ok(commit))
367        })
368    }
369
370    /// Produce an iterator of `SegmentIdx`s along with the pages we need from the segment.
371    /// Collectively provides full coverage of the pages visible to a snapshot.
372    pub fn iter_visible_pages(
373        &self,
374        snapshot: &Snapshot,
375    ) -> impl Iterator<Item = Result<(SegmentIdx, PageSet), FjallStorageErr>> {
376        // the set of pages we are searching for.
377        // we remove pages from this set as we iterate through commits.
378        let mut pages = PageSet::FULL;
379        // we keep track of the smallest page count as we iterate through commits
380        let mut page_count = PageCount::MAX;
381
382        self.commits(snapshot).try_filter_map(move |commit| {
383            // if we have found all pages, we are done
384            if pages.is_empty() {
385                return Ok(None);
386            }
387
388            // if we encounter a smaller commit on our travels, we need to shrink
389            // the page_count to ensure that truncation is respected
390            if commit.page_count < page_count {
391                page_count = commit.page_count;
392                pages.truncate(page_count);
393            }
394
395            if let Some(idx) = commit.segment_idx {
396                let mut commit_pages = idx.pageset.clone();
397
398                if commit_pages.last().map(|idx| idx.pages()) > Some(page_count) {
399                    // truncate any pages in this commit that extend beyond the page count
400                    commit_pages.truncate(page_count);
401                }
402
403                // figure out which pages we need from this commit
404                let outstanding = pages.cut(&commit_pages);
405
406                if !outstanding.is_empty() {
407                    return Ok(Some((idx, outstanding)));
408                }
409            }
410
411            Ok(None)
412        })
413    }
414
415    /// Given a range of LSNs for a particular volume, returns the set of LSNs
416    /// we have
417    pub fn lsns(
418        &self,
419        vid: &VolumeId,
420        lsns: &RangeInclusive<LSN>,
421    ) -> Result<LSNSet, FjallStorageErr> {
422        // lsns is in the form `low..=high` but the log orders
423        // LSNs in reverse. thus we need to flip the range
424        let low = VolumeRef::new(vid.clone(), *lsns.start());
425        let high = VolumeRef::new(vid.clone(), *lsns.end());
426        let range = high..=low;
427        self._log()
428            .range_keys(range)
429            .map_ok(|key| Ok(key.lsn()))
430            .collect()
431    }
432
433    pub fn search_page(
434        &self,
435        snapshot: &Snapshot,
436        pageidx: PageIdx,
437    ) -> Result<Option<Commit>, FjallStorageErr> {
438        let mut commits = self.commits(snapshot);
439
440        while let Some(commit) = commits.try_next()? {
441            if !commit.page_count().contains(pageidx) {
442                // the volume is smaller than the requested page idx.
443                // this also handles the case that a volume is truncated and
444                // then subsequently extended at a later time.
445                break;
446            }
447
448            let Some(idx) = commit.segment_idx() else {
449                // this commit contains no pages
450                continue;
451            };
452
453            if !idx.contains(pageidx) {
454                // this commit does not contain the requested pageidx
455                continue;
456            }
457
458            return Ok(Some(commit));
459        }
460        Ok(None)
461    }
462
463    pub fn has_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<bool, FjallStorageErr> {
464        self._pages().contains(&PageKey::new(sid, pageidx))
465    }
466
467    pub fn read_page(
468        &self,
469        sid: SegmentId,
470        pageidx: PageIdx,
471    ) -> Result<Option<Page>, FjallStorageErr> {
472        self._pages()
473            .get_owned(PageKey::new(sid, pageidx))
474            .or_into_ctx()
475    }
476
477    pub fn page_count(
478        &self,
479        vid: &VolumeId,
480        lsn: LSN,
481    ) -> Result<Option<PageCount>, FjallStorageErr> {
482        Ok(self.get_commit(vid, lsn)?.map(|c| c.page_count()))
483    }
484
485    pub fn checkpoints(
486        &self,
487        vid: &VolumeId,
488    ) -> Result<Option<CachedCheckpoints>, FjallStorageErr> {
489        self._checkpoints().get(vid)
490    }
491
492    pub fn checksum(&self, snapshot: &Snapshot) -> Result<Checksum, FjallStorageErr> {
493        let pages = self._pages();
494        let mut builder = ChecksumBuilder::new();
495        let mut iter = self.iter_visible_pages(snapshot);
496        while let Some((idx, pageset)) = iter.try_next()? {
497            for pageidx in pageset.iter() {
498                let key = PageKey::new(idx.sid.clone(), pageidx);
499                if let Some(page) = pages.get(&key)? {
500                    builder.write(&page);
501                }
502            }
503        }
504        Ok(builder.build())
505    }
506
507    pub fn find_missing_frames(
508        &self,
509        snapshot: &Snapshot,
510    ) -> Result<Vec<SegmentRangeRef>, FjallStorageErr> {
511        let mut missing_frames = vec![];
512        let pages = self._pages();
513        let mut iter = self.iter_visible_pages(snapshot);
514        while let Some((idx, pageset)) = iter.try_next()? {
515            // find candidate frames (intersects with the visible pageset)
516            let frames = idx.iter_frames(|pages| pageset.contains_any(pages));
517
518            // find frames for which we are missing the first page.
519            // since we always download entire segment frames, if we are missing
520            // the first page, we are missing all the pages (in the frame)
521            for frame in frames {
522                if let Some(first_page) = frame.pageset.first()
523                    && !pages.contains(&PageKey::new(frame.sid.clone(), first_page))?
524                {
525                    missing_frames.push(frame);
526                }
527            }
528        }
529        Ok(missing_frames)
530    }
531}
532
533pub struct WriteBatch<'a> {
534    storage: &'a FjallStorage,
535    batch: Batch,
536}
537
538impl<'a> WriteBatch<'a> {
539    fn open(storage: &'a FjallStorage) -> Self {
540        Self { storage, batch: storage.keyspace.batch() }
541    }
542
543    pub fn write_tag(&mut self, tag: &str, graft: VolumeId) {
544        self.batch
545            .insert_typed(&self.storage.tags, tag.into(), graft);
546    }
547
548    pub fn write_commit(&mut self, commit: Commit) {
549        self.batch
550            .insert_typed(&self.storage.log, commit.vref(), commit);
551    }
552
553    pub fn write_graft(&mut self, graft: Graft) {
554        self.batch
555            .insert_typed(&self.storage.grafts, graft.local.clone(), graft);
556    }
557
558    pub fn write_page(&mut self, sid: SegmentId, pageidx: PageIdx, page: Page) {
559        self.batch
560            .insert_typed(&self.storage.pages, PageKey::new(sid, pageidx), page);
561    }
562
563    pub fn commit(self) -> Result<(), FjallStorageErr> {
564        self.batch.commit().or_into_ctx()
565    }
566}
567
568pub struct ReadWriteGuard<'a> {
569    _permit: MutexGuard<'a, ()>,
570    read: ReadGuard<'a>,
571}
572
573impl<'a> ReadWriteGuard<'a> {
574    fn open(storage: &'a FjallStorage) -> Self {
575        // TODO: consider adding a lock timeout for deadlock detection
576        let _permit = storage.lock.lock();
577        // IMPORTANT: take the read snapshot after taking the lock
578        let read = storage.read();
579        Self { _permit, read }
580    }
581
582    fn storage(&self) -> &'a FjallStorage {
583        self.read.storage
584    }
585
586    pub fn tag_replace(
587        &self,
588        tag: &str,
589        graft: VolumeId,
590    ) -> Result<Option<VolumeId>, FjallStorageErr> {
591        let out = self.read.get_tag(tag)?;
592        self.storage().tags.insert(tag.into(), graft)?;
593        Ok(out)
594    }
595
596    /// opens a graft. if either the graft's `VolumeId` or the remote's `VolumeId`
597    /// are missing, they will be randomly generated. If the graft already
598    /// exists, this function will fail if its remote doesn't match.
599    pub fn graft_open(
600        self,
601        graft: Option<VolumeId>,
602        remote: Option<VolumeId>,
603    ) -> Result<Graft, FjallStorageErr> {
604        // generate the local graft vid if it's not specified
605        let local = graft.unwrap_or_else(VolumeId::random);
606
607        // lookup the graft if specified
608        if let Some(graft) = self.read._grafts().get(&local)? {
609            if let Some(remote) = remote
610                && graft.remote != remote
611            {
612                return Err(LogicalErr::GraftRemoteMismatch {
613                    graft: graft.local,
614                    expected: remote,
615                    actual: graft.remote,
616                }
617                .into());
618            }
619            return Ok(graft);
620        }
621
622        // determine the remote vid
623        let remote = remote.unwrap_or_else(VolumeId::random);
624
625        // if the remote exists, set the sync point to start from the latest
626        // remote lsn
627        let sync = self
628            .read
629            .latest_lsn(&remote)?
630            .map(|latest_remote| SyncPoint {
631                remote: latest_remote,
632                local_watermark: None,
633            });
634
635        // create the new graft
636        let graft = Graft::new(local.clone(), remote, sync, None);
637        self.storage().grafts.insert(local, graft.clone())?;
638
639        tracing::debug!(
640            local_vid = ?graft.local,
641            remote_vid = ?graft.remote,
642            "open graft"
643        );
644
645        Ok(graft)
646    }
647
648    /// Attempt to execute a local commit to the specified Graft's local volume.
649    ///
650    /// Returns the resulting `VolumeRef` on success
651    pub fn commit(
652        self,
653        graft: &VolumeId,
654        snapshot: Snapshot,
655        page_count: PageCount,
656        segment: SegmentIdx,
657    ) -> Result<Snapshot, FjallStorageErr> {
658        // Verify that the commit was constructed using the latest snapshot for
659        // the volume.
660        if !self.read.is_latest_snapshot(graft, &snapshot)? {
661            return Err(LogicalErr::GraftConcurrentWrite(graft.clone()).into());
662        }
663
664        let graft = self.read.graft(graft)?;
665
666        // the commit_lsn is the next lsn for the graft's local volume
667        let commit_lsn = self
668            .read
669            .latest_lsn(&graft.local)?
670            .map_or(LSN::FIRST, |lsn| lsn.next());
671
672        tracing::debug!(vid=?graft.local, %commit_lsn, "local commit");
673
674        let commit = Commit::new(graft.local.clone(), commit_lsn, page_count)
675            .with_segment_idx(Some(segment));
676
677        // write the commit to storage
678        self.read.storage.log.insert(commit.vref(), commit)?;
679
680        // open a new ReadGuard to read an updated graft snapshot
681        ReadGuard::open(self.storage()).snapshot(&graft.local)
682    }
683
684    /// Verify we are ready to make a remote commit and update the graft
685    /// with a `PendingCommit`
686    pub fn remote_commit_prepare(
687        self,
688        graft: &VolumeId,
689        pending_commit: PendingCommit,
690    ) -> Result<(), FjallStorageErr> {
691        let graft = self.read.graft(graft)?;
692
693        assert!(
694            graft.pending_commit().is_none(),
695            "BUG: pending commit is present"
696        );
697
698        // ensure LSN monotonicity
699        if let Some(local_watermark) = graft.local_watermark() {
700            assert!(
701                local_watermark < pending_commit.local,
702                "BUG: local_watermark monotonicity violation"
703            );
704        }
705        let latest_remote = self.read.latest_lsn(&graft.remote)?;
706        assert_eq!(
707            latest_remote,
708            pending_commit.commit.checked_prev(),
709            "BUG: remote lsn monotonicity violation"
710        );
711
712        // remember to set the commit hash
713        assert!(pending_commit.commit_hash != CommitHash::ZERO);
714
715        // save the new pending commit
716        let graft = graft.with_pending_commit(Some(pending_commit));
717        self.storage().grafts.insert(graft.local.clone(), graft)?;
718
719        Ok(())
720    }
721
722    /// Finish the remote commit process by writing out an updated graft
723    /// and recording the remote commit locally
724    pub fn remote_commit_success(
725        self,
726        graft: &VolumeId,
727        remote_commit: Commit,
728    ) -> Result<(), FjallStorageErr> {
729        let graft = self.read.graft(graft)?;
730
731        // verify the pending commit matches the remote commit
732        let pending_commit = graft.pending_commit.unwrap();
733        assert_eq!(remote_commit.lsn(), pending_commit.commit);
734        assert_eq!(
735            remote_commit.commit_hash(),
736            Some(&pending_commit.commit_hash)
737        );
738
739        // fail if we somehow already know about this commit locally
740        assert!(
741            !self.read._log().contains(&remote_commit.vref())?,
742            "BUG: remote commit already exists"
743        );
744
745        // update the graft with the new sync points and no pending_commit
746        let updated_graft = Graft {
747            sync: Some(pending_commit.into()),
748            pending_commit: None,
749            ..graft
750        };
751
752        let mut batch = self.storage().batch();
753        batch.write_commit(remote_commit);
754        batch.write_graft(updated_graft);
755        batch.commit()
756    }
757
758    /// Drop a pending commit without applying it. This should only be called
759    /// after receiving a rejection from the remote.
760    pub fn drop_pending_commit(self, graft: &VolumeId) -> Result<(), FjallStorageErr> {
761        let graft = self.read.graft(graft)?;
762        self.storage()
763            .grafts
764            .insert(graft.local.clone(), graft.with_pending_commit(None))
765    }
766
767    pub fn sync_remote_to_local(self, graft: VolumeId) -> Result<(), FjallStorageErr> {
768        let graft = self.read.graft(&graft)?;
769
770        // check to see if we have any changes to sync
771        let latest_remote = self.read.latest_lsn(&graft.remote).or_into_ctx()?;
772        let Some(remote_changes) = graft.remote_changes(latest_remote) else {
773            // nothing to sync
774            return Ok(());
775        };
776
777        // check for divergence
778        let latest_local = self.read.latest_lsn(&graft.local).or_into_ctx()?;
779        if graft.local_changes(latest_local).is_some() {
780            // the remote and local volumes have diverged
781            let status = graft.status(latest_local, latest_remote);
782            tracing::debug!("graft {} has diverged; status=`{status}`", graft.local);
783            return Err(LogicalErr::GraftDiverged(graft.local).into());
784        }
785
786        tracing::debug!(
787            sync = ?graft.sync(),
788            lsns = %remote_changes.to_string(),
789            remote = ?graft.remote,
790            local = ?graft.local,
791            "fast-forwarding graft to its remote"
792        );
793
794        // to perform the sync, we simply need to update the graft's SyncPoint
795        // to reference the latest remote_lsn
796        let remote_lsn = *remote_changes.end();
797
798        let new_sync = match graft.sync() {
799            Some(sync) => {
800                assert!(
801                    remote_lsn > sync.remote,
802                    "BUG: attempt to sync graft to older version of the remote"
803                );
804                SyncPoint {
805                    remote: remote_lsn,
806                    local_watermark: sync.local_watermark,
807                }
808            }
809            None => SyncPoint {
810                remote: remote_lsn,
811                local_watermark: None,
812            },
813        };
814
815        // update the sync point
816        self.storage()
817            .grafts
818            .insert(graft.local.clone(), graft.with_sync(Some(new_sync)))
819    }
820}