graft 0.2.1

The Graft storage engine.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
use std::{fmt::Debug, ops::RangeInclusive, path::Path, sync::Arc};

use crate::core::{
    LogId, PageCount, PageIdx, SegmentId, VolumeId,
    checkpoints::CachedCheckpoints,
    checksum::{Checksum, ChecksumBuilder},
    commit::{Commit, SegmentIdx, SegmentRangeRef},
    commit_hash::CommitHash,
    logref::LogRef,
    lsn::{LSN, LSNRangeExt, LSNSet},
    page::Page,
    pageset::PageSet,
};
use bytestring::ByteString;
use fjall::{Batch, Instant, KvSeparationOptions, PartitionCreateOptions};
use parking_lot::{Mutex, MutexGuard};
use tryiter::TryIteratorExt;

use crate::{
    LogicalErr,
    local::fjall_storage::{
        keys::PageKey,
        typed_partition::{TypedPartition, TypedPartitionSnapshot, fjall_batch_ext::FjallBatchExt},
    },
    snapshot::Snapshot,
    volume::{PendingCommit, SyncPoint, Volume},
};

use culprit::{Result, ResultExt};

mod fjall_repr;
pub mod keys;
mod typed_partition;
mod values;

#[derive(Debug, thiserror::Error)]
pub enum FjallStorageErr {
    #[error("Fjall error: {0}")]
    FjallErr(#[from] fjall::Error),

    #[error("Fjall LSM Tree error: {0}")]
    LsmTreeErr(#[from] lsm_tree::Error),

    #[error("Failed to decode key: {0}")]
    DecodeErr(#[from] fjall_repr::DecodeErr),

    #[error("I/O Error: {0}")]
    IoErr(#[from] std::io::Error),

    #[error("batch commit precondition failed")]
    BatchPreconditionErr,

    #[error(transparent)]
    LogicalErr(#[from] LogicalErr),
}

pub struct FjallStorage {
    keyspace: fjall::Keyspace,

    /// This partition allows volumes to be identified by a tag.
    /// The volume a tag points at can be changed.
    tags: TypedPartition<ByteString, VolumeId>,

    /// This partition stores state regarding each `Volume`
    /// keyed by its `VolumeId`
    /// {`VolumeId`} -> `Volume`
    volumes: TypedPartition<VolumeId, Volume>,

    /// This partition stores `CachedCheckpoints` for each Log
    /// {`LogId`} -> `CachedCheckpoints`
    checkpoints: TypedPartition<LogId, CachedCheckpoints>,

    /// This partition stores commits
    /// {`LogId`} / {lsn} -> Commit
    log: TypedPartition<LogRef, Commit>,

    /// This partition stores Pages
    /// {sid} / {pageidx} -> Page
    pages: TypedPartition<PageKey, Page>,

    /// Must be held while performing read+write transactions.
    /// Read-only and write-only transactions don't need to hold the lock as
    /// long as they are safe:
    /// To make read-only txns safe, use the same snapshot for all reads
    /// To make write-only txns safe, they must be monotonic
    lock: Arc<Mutex<()>>,
}

impl Debug for FjallStorage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("FjallStorage").finish()
    }
}

impl FjallStorage {
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, FjallStorageErr> {
        Self::open_config(fjall::Config::new(path))
    }

    pub fn open_temporary() -> Result<Self, FjallStorageErr> {
        let path = tempfile::tempdir()?.keep();
        Self::open_config(fjall::Config::new(path).temporary(true))
    }

    fn open_config(config: fjall::Config) -> Result<Self, FjallStorageErr> {
        let keyspace = config.open()?;
        let tags = TypedPartition::open(&keyspace, "tags", Default::default())?;
        let volumes = TypedPartition::open(&keyspace, "volumes", Default::default())?;
        let checkpoints = TypedPartition::open(&keyspace, "checkpoints", Default::default())?;
        let log = TypedPartition::open(&keyspace, "log", Default::default())?;
        let pages = TypedPartition::open(
            &keyspace,
            "pages",
            PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
        )?;

        Ok(Self {
            keyspace,
            tags,
            volumes,
            checkpoints,
            log,
            pages,
            lock: Default::default(),
        })
    }

    pub(crate) fn read(&self) -> ReadGuard<'_> {
        ReadGuard::open(self)
    }

    pub(crate) fn batch(&self) -> WriteBatch<'_> {
        WriteBatch::open(self)
    }

    /// Open a read + write txn on storage.
    /// The returned object holds a lock, any subsequent calls to `read_write`
    /// will block.
    pub(crate) fn read_write(&self) -> ReadWriteGuard<'_> {
        ReadWriteGuard::open(self)
    }

    pub fn write_page(
        &self,
        sid: SegmentId,
        pageidx: PageIdx,
        page: Page,
    ) -> Result<(), FjallStorageErr> {
        self.pages
            .insert(PageKey::new(sid, pageidx), page)
            .or_into_ctx()
    }

    pub fn remove_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<(), FjallStorageErr> {
        self.pages.remove(PageKey::new(sid, pageidx)).or_into_ctx()
    }

    pub fn remove_page_range(
        &self,
        sid: &SegmentId,
        pages: RangeInclusive<PageIdx>,
    ) -> Result<(), FjallStorageErr> {
        // PageKeys are stored in descending order
        let keyrange =
            PageKey::new(sid.clone(), *pages.end())..=PageKey::new(sid.clone(), *pages.start());
        let mut batch = self.keyspace.batch();
        let mut iter = self.pages.snapshot().range(keyrange);
        while let Some((key, _)) = iter.try_next()? {
            batch.remove_typed(&self.pages, key);
        }
        batch.commit()?;
        Ok(())
    }

    pub fn tag_delete(&self, tag: &str) -> Result<(), FjallStorageErr> {
        self.tags.remove(tag.into())
    }

    pub fn volume_delete(&self, vid: &VolumeId) -> Result<(), FjallStorageErr> {
        self.volumes.remove(vid.clone())
    }

    pub fn write_checkpoints(
        &self,
        log: LogId,
        checkpoints: CachedCheckpoints,
    ) -> Result<(), FjallStorageErr> {
        self.checkpoints.insert(log, checkpoints)
    }

    pub fn volume_from_snapshot(&self, snapshot: &Snapshot) -> Result<Volume, FjallStorageErr> {
        let volume = Volume::new_random();
        let commits = self
            .read()
            .commits(snapshot)
            .collect::<Result<Vec<_>, _>>()?;
        let mut lsn = LSN::FIRST.checked_add(commits.len() as u64).unwrap();
        let mut batch = self.batch();
        for commit in commits {
            lsn = lsn.checked_prev().unwrap();
            batch.write_commit(commit.with_log_id(volume.local.clone()).with_lsn(lsn));
        }
        batch.write_volume(volume.clone());
        batch.commit()?;
        Ok(volume)
    }
}

pub struct ReadGuard<'a> {
    storage: &'a FjallStorage,
    seqno: Instant,
}

impl Drop for ReadGuard<'_> {
    fn drop(&mut self) {
        // IMPORTANT: Decrement snapshot count
        self.storage.keyspace.snapshot_tracker.close(self.seqno);
    }
}

impl<'a> ReadGuard<'a> {
    fn open(storage: &'a FjallStorage) -> ReadGuard<'a> {
        let seqno = storage.keyspace.instant();
        // IMPORTANT: Increment snapshot count
        storage.keyspace.snapshot_tracker.open(seqno);
        Self { storage, seqno }
    }

    #[inline]
    fn _tags(&self) -> TypedPartitionSnapshot<ByteString, VolumeId> {
        self.storage.tags.snapshot_at(self.seqno)
    }

    #[inline]
    fn _volumes(&self) -> TypedPartitionSnapshot<VolumeId, Volume> {
        self.storage.volumes.snapshot_at(self.seqno)
    }

    #[inline]
    fn _checkpoints(&self) -> TypedPartitionSnapshot<LogId, CachedCheckpoints> {
        self.storage.checkpoints.snapshot_at(self.seqno)
    }

    #[inline]
    fn _log(&self) -> TypedPartitionSnapshot<LogRef, Commit> {
        self.storage.log.snapshot_at(self.seqno)
    }

    #[inline]
    fn _pages(&self) -> TypedPartitionSnapshot<PageKey, Page> {
        self.storage.pages.snapshot_at(self.seqno)
    }

    pub fn iter_tags(
        &self,
    ) -> impl Iterator<Item = Result<(ByteString, VolumeId), FjallStorageErr>> + use<> {
        self._tags().range(..)
    }

    pub fn tag_exists(&self, tag: &str) -> Result<bool, FjallStorageErr> {
        self._tags().contains(tag)
    }

    pub fn get_tag(&self, tag: &str) -> Result<Option<VolumeId>, FjallStorageErr> {
        self._tags().get(tag)
    }

    /// Lookup the latest LSN for a Log
    pub fn latest_lsn(&self, log: &LogId) -> Result<Option<LSN>, FjallStorageErr> {
        Ok(self._log().first(log)?.map(|(logref, _)| logref.lsn))
    }

    pub fn iter_volumes(&self) -> impl Iterator<Item = Result<Volume, FjallStorageErr>> + use<> {
        self._volumes().values()
    }

    pub fn volume_exists(&self, vid: &VolumeId) -> Result<bool, FjallStorageErr> {
        self._volumes().contains(vid)
    }

    pub fn volume(&self, vid: &VolumeId) -> Result<Volume, FjallStorageErr> {
        self._volumes()
            .get(vid)?
            .ok_or_else(|| LogicalErr::VolumeNotFound(vid.clone()).into())
    }

    /// Check if the provided Snapshot is logically equal to the latest snapshot
    /// for the specified Volume.
    pub fn is_latest_snapshot(
        &self,
        vid: &VolumeId,
        snapshot: &Snapshot,
    ) -> Result<bool, FjallStorageErr> {
        let volume = self.volume(vid)?;
        let latest_local = self.latest_lsn(&volume.local)?;

        // The complexity here is that the snapshot may have been taken before
        // we pushed commits to a remote. When this happens, the snapshot will
        // be physically different but logically equivalent. We can use the
        // relationship setup by the SyncPoint to handle this case.
        Ok(match snapshot.head() {
            Some((log, lsn)) if log == &volume.local => Some(lsn) == latest_local,

            Some((log, lsn)) if log == &volume.remote => {
                if let Some(sync) = volume.sync {
                    lsn == sync.remote && sync.local_watermark == latest_local
                } else {
                    // if volume has no sync point, then a snapshot should not
                    // include a remote layer, thus this snapshot is from
                    // another volume
                    false
                }
            }

            // Snapshot from another volume
            Some(_) => false,

            // Snapshot is empty
            None => latest_local.is_none() && volume.sync().is_none(),
        })
    }

    /// Load the most recent Snapshot for a Volume.
    pub fn snapshot(&self, vid: &VolumeId) -> Result<Snapshot, FjallStorageErr> {
        let volume = self.volume(vid)?;

        let mut snapshot = Snapshot::EMPTY;

        if let Some(latest) = self.latest_lsn(&volume.local)? {
            if let Some(watermark) = volume.sync().and_then(|s| s.local_watermark) {
                if watermark < latest {
                    snapshot.append(volume.local, watermark..=latest);
                }
            } else {
                snapshot.append(volume.local, LSN::FIRST..=latest);
            }
        }

        if let Some(remote) = volume.sync.map(|s| s.remote) {
            snapshot.append(volume.remote, LSN::FIRST..=remote);
        }

        Ok(snapshot)
    }

    /// Retrieve a specific commit
    pub fn get_commit(&self, log: &LogId, lsn: LSN) -> Result<Option<Commit>, FjallStorageErr> {
        self._log().get_owned(LogRef::new(log.clone(), lsn))
    }

    /// Iterates through all of the commits reachable by the provided `Snapshot`
    /// from the newest to oldest commit.
    pub fn commits(
        &self,
        snapshot: &Snapshot,
    ) -> impl Iterator<Item = Result<Commit, FjallStorageErr>> {
        let log = self._log();

        snapshot.iter().flat_map(move |entry| {
            // the snapshot range is in the form `low..=high` but the log orders
            // LSNs in reverse. thus we need to flip the range when passing it
            // down to the underlying scan.
            let low = entry.start_ref();
            let high = entry.end_ref();
            let range = high..=low;
            log.range(range).map_ok(|(_, commit)| Ok(commit))
        })
    }

    /// Produce an iterator of `SegmentIdx`s along with the pages we need from the segment.
    /// Collectively provides full coverage of the pages visible to a snapshot.
    pub fn iter_visible_pages(
        &self,
        snapshot: &Snapshot,
    ) -> impl Iterator<Item = Result<(SegmentIdx, PageSet), FjallStorageErr>> {
        // the set of pages we are searching for.
        // we remove pages from this set as we iterate through commits.
        let mut pages = PageSet::FULL;
        // we keep track of the smallest page count as we iterate through commits
        let mut page_count = PageCount::MAX;

        self.commits(snapshot).try_filter_map(move |commit| {
            // if we have found all pages, we are done
            if pages.is_empty() {
                return Ok(None);
            }

            // if we encounter a smaller commit on our travels, we need to shrink
            // the page_count to ensure that truncation is respected
            if commit.page_count < page_count {
                page_count = commit.page_count;
                pages.truncate(page_count);
            }

            if let Some(idx) = commit.segment_idx {
                let mut commit_pages = idx.pageset.clone();

                if commit_pages.last().map(|idx| idx.pages()) > Some(page_count) {
                    // truncate any pages in this commit that extend beyond the page count
                    commit_pages.truncate(page_count);
                }

                // figure out which pages we need from this commit
                let outstanding = pages.cut(&commit_pages);

                if !outstanding.is_empty() {
                    return Ok(Some((idx, outstanding)));
                }
            }

            Ok(None)
        })
    }

    /// Given a range of LSNs for a particular Log, returns the set of LSNs we have
    pub fn lsns(&self, log: &LogId, lsns: &RangeInclusive<LSN>) -> Result<LSNSet, FjallStorageErr> {
        // lsns is in the form `low..=high` but the log orders
        // LSNs in reverse. thus we need to flip the range
        let low = LogRef::new(log.clone(), *lsns.start());
        let high = LogRef::new(log.clone(), *lsns.end());
        let range = high..=low;
        self._log()
            .range_keys(range)
            .map_ok(|key| Ok(key.lsn()))
            .collect()
    }

    pub fn search_page(
        &self,
        snapshot: &Snapshot,
        pageidx: PageIdx,
    ) -> Result<Option<Commit>, FjallStorageErr> {
        let mut commits = self.commits(snapshot);

        while let Some(commit) = commits.try_next()? {
            if !commit.page_count().contains(pageidx) {
                // the volume is smaller than the requested page idx.
                // this also handles the case that a volume is truncated and
                // then subsequently extended at a later time.
                break;
            }

            let Some(idx) = commit.segment_idx() else {
                // this commit contains no pages
                continue;
            };

            if !idx.contains(pageidx) {
                // this commit does not contain the requested pageidx
                continue;
            }

            return Ok(Some(commit));
        }
        Ok(None)
    }

    pub fn has_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<bool, FjallStorageErr> {
        self._pages().contains(&PageKey::new(sid, pageidx))
    }

    pub fn read_page(
        &self,
        sid: SegmentId,
        pageidx: PageIdx,
    ) -> Result<Option<Page>, FjallStorageErr> {
        self._pages()
            .get_owned(PageKey::new(sid, pageidx))
            .or_into_ctx()
    }

    /// Retrieve the `PageCount` of a Volume at a particular LSN.
    pub fn page_count(&self, log: &LogId, lsn: LSN) -> Result<Option<PageCount>, FjallStorageErr> {
        Ok(self.get_commit(log, lsn)?.map(|c| c.page_count()))
    }

    pub fn checkpoints(&self, log: &LogId) -> Result<Option<CachedCheckpoints>, FjallStorageErr> {
        self._checkpoints().get(log)
    }

    pub fn checksum(&self, snapshot: &Snapshot) -> Result<Checksum, FjallStorageErr> {
        let pages = self._pages();
        let mut builder = ChecksumBuilder::new();
        let mut iter = self.iter_visible_pages(snapshot);
        while let Some((idx, pageset)) = iter.try_next()? {
            for pageidx in pageset.iter() {
                let key = PageKey::new(idx.sid.clone(), pageidx);
                if let Some(page) = pages.get(&key)? {
                    builder.write(&page);
                }
            }
        }
        Ok(builder.build())
    }

    pub fn find_missing_frames(
        &self,
        snapshot: &Snapshot,
    ) -> Result<Vec<SegmentRangeRef>, FjallStorageErr> {
        let mut missing_frames = vec![];
        let pages = self._pages();
        let mut iter = self.iter_visible_pages(snapshot);
        while let Some((idx, pageset)) = iter.try_next()? {
            // find candidate frames (intersects with the visible pageset)
            let frames = idx.iter_frames(|pages| pageset.contains_any(pages));

            // find frames for which we are missing the first page.
            // since we always download entire segment frames, if we are missing
            // the first page, we are missing all the pages (in the frame)
            for frame in frames {
                if let Some(first_page) = frame.pageset.first()
                    && !pages.contains(&PageKey::new(frame.sid.clone(), first_page))?
                {
                    missing_frames.push(frame);
                }
            }
        }
        Ok(missing_frames)
    }
}

pub struct WriteBatch<'a> {
    storage: &'a FjallStorage,
    batch: Batch,
}

impl<'a> WriteBatch<'a> {
    fn open(storage: &'a FjallStorage) -> Self {
        Self { storage, batch: storage.keyspace.batch() }
    }

    pub fn write_tag(&mut self, tag: &str, vid: VolumeId) {
        self.batch.insert_typed(&self.storage.tags, tag.into(), vid);
    }

    pub fn write_commit(&mut self, commit: Commit) {
        self.batch
            .insert_typed(&self.storage.log, commit.logref(), commit);
    }

    pub fn write_volume(&mut self, volume: Volume) {
        self.batch
            .insert_typed(&self.storage.volumes, volume.vid.clone(), volume);
    }

    pub fn write_page(&mut self, sid: SegmentId, pageidx: PageIdx, page: Page) {
        self.batch
            .insert_typed(&self.storage.pages, PageKey::new(sid, pageidx), page);
    }

    pub fn commit(self) -> Result<(), FjallStorageErr> {
        self.batch.commit().or_into_ctx()
    }
}

pub struct ReadWriteGuard<'a> {
    _permit: MutexGuard<'a, ()>,
    read: ReadGuard<'a>,
}

impl<'a> ReadWriteGuard<'a> {
    fn open(storage: &'a FjallStorage) -> Self {
        // TODO: consider adding a lock timeout for deadlock detection
        let _permit = storage.lock.lock();
        // IMPORTANT: take the read snapshot after taking the lock
        let read = storage.read();
        Self { _permit, read }
    }

    fn storage(&self) -> &'a FjallStorage {
        self.read.storage
    }

    pub fn tag_replace(
        &self,
        tag: &str,
        vid: VolumeId,
    ) -> Result<Option<VolumeId>, FjallStorageErr> {
        let out = self.read.get_tag(tag)?;
        self.storage().tags.insert(tag.into(), vid)?;
        Ok(out)
    }

    /// opens a volume. if any id is missing, it will be randomly
    /// generated. If the volume already exists, this function will fail if its
    /// remote Log doesn't match.
    pub fn volume_open(
        self,
        vid: Option<VolumeId>,
        local: Option<LogId>,
        remote: Option<LogId>,
    ) -> Result<Volume, FjallStorageErr> {
        // generate the local LogId if it's not specified
        let vid = vid.unwrap_or_else(VolumeId::random);

        // lookup the volume if specified
        if let Some(volume) = self.read._volumes().get(&vid)? {
            if let Some(remote) = remote
                && volume.remote != remote
            {
                return Err(LogicalErr::VolumeRemoteMismatch {
                    vid: volume.vid,
                    expected: remote,
                    actual: volume.remote,
                }
                .into());
            }
            return Ok(volume);
        }

        // determine the local and remote LogIds
        let local = local.unwrap_or_else(LogId::random);
        let remote = remote.unwrap_or_else(LogId::random);

        // if the remote exists, set the sync point to start from the latest
        // remote lsn
        let sync = self
            .read
            .latest_lsn(&remote)?
            .map(|latest_remote| SyncPoint {
                remote: latest_remote,
                local_watermark: None,
            });

        // create the new volume
        let volume = Volume::new(vid.clone(), local, remote, sync, None);
        self.storage().volumes.insert(vid, volume.clone())?;

        tracing::debug!(
            vid = ?volume.vid,
            local_log = ?volume.local,
            remote_log = ?volume.remote,
            "open volume"
        );

        Ok(volume)
    }

    /// Attempt to execute a local commit to the specified Volume's local Log.
    ///
    /// Returns the resulting `Snapshot` on success
    pub fn commit(
        self,
        vid: &VolumeId,
        snapshot: Snapshot,
        page_count: PageCount,
        segment: SegmentIdx,
    ) -> Result<Snapshot, FjallStorageErr> {
        // Verify that the commit was constructed using the latest snapshot for
        // the volume.
        if !self.read.is_latest_snapshot(vid, &snapshot)? {
            return Err(LogicalErr::VolumeConcurrentWrite(vid.clone()).into());
        }

        let volume = self.read.volume(vid)?;

        // the commit_lsn is the next lsn for the volume's local Log
        let commit_lsn = self
            .read
            .latest_lsn(&volume.local)?
            .map_or(LSN::FIRST, |lsn| lsn.next());

        tracing::debug!(vid=?volume.vid, log=?volume.local, %commit_lsn, "local commit");

        let commit = Commit::new(volume.local.clone(), commit_lsn, page_count)
            .with_segment_idx(Some(segment));

        // write the commit to storage
        self.read.storage.log.insert(commit.logref(), commit)?;

        // open a new ReadGuard to read an updated snapshot
        ReadGuard::open(self.storage()).snapshot(&volume.vid)
    }

    /// Verify we are ready to make a remote commit and update the volume
    /// with a `PendingCommit`
    pub fn remote_commit_prepare(
        self,
        vid: &VolumeId,
        pending_commit: PendingCommit,
    ) -> Result<(), FjallStorageErr> {
        let volume = self.read.volume(vid)?;

        assert!(
            volume.pending_commit().is_none(),
            "BUG: pending commit is present"
        );

        // ensure LSN monotonicity
        if let Some(local_watermark) = volume.local_watermark() {
            assert!(
                local_watermark < pending_commit.local,
                "BUG: local_watermark monotonicity violation"
            );
        }
        let latest_remote = self.read.latest_lsn(&volume.remote)?;
        assert_eq!(
            latest_remote,
            pending_commit.commit.checked_prev(),
            "BUG: remote lsn monotonicity violation"
        );

        // remember to set the commit hash
        assert!(pending_commit.commit_hash != CommitHash::ZERO);

        // save the new pending commit
        let volume = volume.with_pending_commit(Some(pending_commit));
        self.storage().volumes.insert(volume.vid.clone(), volume)?;

        Ok(())
    }

    /// Finish the remote commit process by writing out an updated volume
    /// and recording the remote commit locally
    pub fn remote_commit_success(
        self,
        vid: &VolumeId,
        remote_commit: Commit,
    ) -> Result<(), FjallStorageErr> {
        let volume = self.read.volume(vid)?;

        // verify the pending commit matches the remote commit
        let pending_commit = volume.pending_commit.unwrap();
        assert_eq!(remote_commit.lsn(), pending_commit.commit);
        assert_eq!(
            remote_commit.commit_hash(),
            Some(&pending_commit.commit_hash)
        );

        // fail if we somehow already know about this commit locally
        assert!(
            !self.read._log().contains(&remote_commit.logref())?,
            "BUG: remote commit already exists"
        );

        // update the volume with the new sync points and no pending_commit
        let volume = Volume {
            sync: Some(pending_commit.into()),
            pending_commit: None,
            ..volume
        };

        let mut batch = self.storage().batch();
        batch.write_commit(remote_commit);
        batch.write_volume(volume);
        batch.commit()
    }

    /// Drop a pending commit without applying it. This should only be called
    /// after receiving a rejection from the remote.
    pub fn drop_pending_commit(self, vid: &VolumeId) -> Result<(), FjallStorageErr> {
        let volume = self.read.volume(vid)?;
        self.storage()
            .volumes
            .insert(volume.vid.clone(), volume.with_pending_commit(None))
    }

    pub fn sync_remote_to_local(self, vid: VolumeId) -> Result<(), FjallStorageErr> {
        let volume = self.read.volume(&vid)?;

        // check to see if we have any changes to sync
        let latest_remote = self.read.latest_lsn(&volume.remote).or_into_ctx()?;
        let Some(remote_changes) = volume.remote_changes(latest_remote) else {
            // nothing to sync
            return Ok(());
        };

        // check for divergence
        let latest_local = self.read.latest_lsn(&volume.local).or_into_ctx()?;
        if volume.local_changes(latest_local).is_some() {
            // the remote and local logs have diverged
            let status = volume.status(latest_local, latest_remote);
            tracing::debug!("volume {} has diverged; status=`{status}`", volume.vid);
            return Err(LogicalErr::VolumeDiverged(volume.vid).into());
        }

        tracing::debug!(
            vid = ?volume.vid,
            sync = ?volume.sync(),
            lsns = %remote_changes.to_string(),
            local = ?volume.local,
            remote = ?volume.remote,
            "fast-forwarding volume"
        );

        // to perform the sync, we simply need to update the volume's SyncPoint
        // to reference the latest remote_lsn
        let remote_lsn = *remote_changes.end();

        let new_sync = match volume.sync() {
            Some(sync) => {
                assert!(
                    remote_lsn > sync.remote,
                    "BUG: attempt to sync volume to older version of the remote"
                );
                SyncPoint {
                    remote: remote_lsn,
                    local_watermark: sync.local_watermark,
                }
            }
            None => SyncPoint {
                remote: remote_lsn,
                local_watermark: None,
            },
        };

        // update the sync point
        self.storage()
            .volumes
            .insert(volume.vid.clone(), volume.with_sync(Some(new_sync)))
    }
}