graft_client/runtime/
storage.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::Debug,
4    io,
5    ops::RangeInclusive,
6    path::Path,
7    sync::Arc,
8};
9
10use bytes::Bytes;
11use changeset::ChangeSet;
12use commit::CommitKey;
13use culprit::{Culprit, ResultExt};
14use fjall::{KvSeparationOptions, PartitionCreateOptions, Slice};
15use graft_core::{
16    PageIdx, VolumeId,
17    byte_unit::ByteUnit,
18    lsn::{LSN, LSNRangeExt},
19    page::PageSizeErr,
20    page_count::PageCount,
21    page_idx::ConvertToPageIdxErr,
22    zerocopy_ext::ZerocopyErr,
23};
24use memtable::Memtable;
25use page::{PageKey, PageValue, PageValueConversionErr};
26use parking_lot::{Mutex, MutexGuard};
27use snapshot::{RemoteMapping, Snapshot};
28use splinter_rs::{DecodeErr, Splinter, SplinterRef};
29use tracing::field;
30use tryiter::{TryIterator, TryIteratorExt};
31use volume_state::{
32    SyncDirection, VolumeConfig, VolumeQueryIter, VolumeState, VolumeStateKey, VolumeStateTag,
33    VolumeStatus, Watermark, Watermarks,
34};
35use zerocopy::IntoBytes;
36
37pub mod changeset;
38pub(crate) mod commit;
39pub(crate) mod memtable;
40pub mod page;
41pub mod snapshot;
42pub mod volume_state;
43
44type Result<T> = std::result::Result<T, Culprit<StorageErr>>;
45
46#[derive(Debug, thiserror::Error)]
47pub enum StorageErr {
48    #[error("fjall error: {0}")]
49    FjallErr(#[from] fjall::Error),
50
51    #[error("io error: {0}")]
52    IoErr(io::ErrorKind),
53
54    #[error("Corrupt key: {0}")]
55    CorruptKey(ZerocopyErr),
56
57    #[error("Corrupt snapshot: {0}")]
58    CorruptSnapshot(ZerocopyErr),
59
60    #[error("Corrupt volume config: {0}")]
61    CorruptVolumeConfig(ZerocopyErr),
62
63    #[error("Volume state {0:?} is corrupt: {1}")]
64    CorruptVolumeState(VolumeStateTag, ZerocopyErr),
65
66    #[error("Corrupt page: {0}")]
67    CorruptPage(#[from] PageValueConversionErr),
68
69    #[error("Corrupt commit: {0}")]
70    CorruptCommit(#[from] DecodeErr),
71
72    #[error("Illegal concurrent write to volume")]
73    ConcurrentWrite,
74
75    #[error("Volume needs recovery")]
76    VolumeIsSyncing,
77
78    #[error(
79        "The local Volume state is ahead of the remote state, refusing to accept remote changes"
80    )]
81    RemoteConflict,
82
83    #[error("invalid page index")]
84    ConvertToPageIdxErr(#[from] ConvertToPageIdxErr),
85}
86
87impl From<io::Error> for StorageErr {
88    fn from(err: io::Error) -> Self {
89        StorageErr::IoErr(err.kind())
90    }
91}
92
93impl From<lsm_tree::Error> for StorageErr {
94    fn from(err: lsm_tree::Error) -> Self {
95        StorageErr::FjallErr(err.into())
96    }
97}
98
99impl From<PageSizeErr> for StorageErr {
100    fn from(err: PageSizeErr) -> Self {
101        StorageErr::CorruptPage(err.into())
102    }
103}
104
105pub struct Storage {
106    keyspace: fjall::Keyspace,
107
108    /// Used to store volume state broken out by tag.
109    /// Keyed by `VolumeStateKey`.
110    ///
111    /// ```text
112    /// {vid}/VolumeStateTag::Config -> VolumeConfig
113    /// {vid}/VolumeStateTag::Status -> VolumeStatus
114    /// {vid}/VolumeStateTag::Snapshot -> Snapshot
115    /// {vid}/VolumeStateTag::Watermarks -> Watermarks
116    /// ```
117    volumes: fjall::Partition,
118
119    /// Used to store page contents
120    /// maps from (`VolumeId`, `PageIdx`, LSN) to `PageValue`
121    pages: fjall::Partition,
122
123    /// Used to track changes made by local commits.
124    /// maps from (`VolumeId`, LSN) to Graft (Splinter of changed `PageIdxs`)
125    commits: fjall::Partition,
126
127    /// Must be held while performing read+write transactions.
128    /// Read-only and write-only transactions don't need to hold the lock as
129    /// long as they are safe:
130    /// To make read-only txns safe, always use fjall snapshots
131    /// To make write-only txns safe, they must be monotonic
132    commit_lock: Arc<Mutex<()>>,
133
134    /// Used to notify subscribers of new local commits
135    local_changeset: ChangeSet<VolumeId>,
136
137    /// Used to notify subscribers of new remote commits
138    remote_changeset: ChangeSet<VolumeId>,
139}
140
141impl Storage {
142    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
143        tracing::debug!("opening runtime storage at {}", path.as_ref().display());
144        Self::open_config(fjall::Config::new(path))
145    }
146
147    pub fn open_temporary() -> Result<Self> {
148        let path = tempfile::tempdir()?.keep();
149        tracing::debug!("opening temporary runtime storage at {}", path.display());
150        Self::open_config(fjall::Config::new(path).temporary(true))
151    }
152
153    fn open_config(config: fjall::Config) -> Result<Self> {
154        let keyspace = config.open()?;
155        let volumes = keyspace.open_partition("volumes", Default::default())?;
156        let pages = keyspace.open_partition(
157            "pages",
158            PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
159        )?;
160        let commits = keyspace.open_partition(
161            "commits",
162            PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
163        )?;
164        let storage = Storage {
165            keyspace,
166            volumes,
167            pages,
168            commits,
169            commit_lock: Default::default(),
170            local_changeset: Default::default(),
171            remote_changeset: Default::default(),
172        };
173        storage.check_for_interrupted_push()?;
174        Ok(storage)
175    }
176
177    fn check_for_interrupted_push(&self) -> Result<()> {
178        let _permit = self.commit_lock.lock();
179        let mut batch = self.keyspace.batch();
180        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
181
182        let iter = self.volumes.snapshot().iter().err_into();
183        let mut iter = VolumeQueryIter::new(iter);
184        while let Some(state) = iter.try_next()? {
185            if state.is_syncing() {
186                tracing::warn!(?state, "detected interrupted push for volume");
187                self.set_volume_status(&mut batch, state.vid(), VolumeStatus::InterruptedPush);
188            }
189        }
190        Ok(batch.commit()?)
191    }
192
193    /// Access the local commit changeset. This `ChangeSet` is updated whenever a
194    /// Volume receives a local commit.
195    pub fn local_changeset(&self) -> &ChangeSet<VolumeId> {
196        &self.local_changeset
197    }
198
199    /// Access the remote commit changeset. This `ChangeSet` is updated whenever a
200    /// Volume receives a remote commit.
201    pub fn remote_changeset(&self) -> &ChangeSet<VolumeId> {
202        &self.remote_changeset
203    }
204
205    /// Set the specified Volume's config
206    pub fn set_volume_config(&self, vid: &VolumeId, config: VolumeConfig) -> Result<()> {
207        let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Config);
208        Ok(self.volumes.insert(key, config)?)
209    }
210
211    /// Update a Volume's config
212    pub fn update_volume_config<F>(&self, vid: &VolumeId, mut f: F) -> Result<()>
213    where
214        F: FnMut(VolumeConfig) -> VolumeConfig,
215    {
216        let _permit = self.commit_lock.lock();
217        let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Config);
218        let config = self
219            .volumes
220            .get(&key)?
221            .map(|c| VolumeConfig::from_bytes(&c))
222            .transpose()?
223            .unwrap_or_default();
224        Ok(self.volumes.insert(key, f(config))?)
225    }
226
227    fn set_volume_status(&self, batch: &mut fjall::Batch, vid: &VolumeId, status: VolumeStatus) {
228        let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Status);
229        batch.insert(&self.volumes, key, status)
230    }
231
232    pub fn get_volume_status(&self, vid: &VolumeId) -> Result<VolumeStatus> {
233        let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Status);
234        if let Some(value) = self.volumes.get(key)? {
235            Ok(VolumeStatus::from_bytes(&value)?)
236        } else {
237            Ok(VolumeStatus::Ok)
238        }
239    }
240
241    pub fn volume_state(&self, vid: &VolumeId) -> Result<VolumeState> {
242        let mut state = VolumeState::new(vid.clone());
243        let mut iter = self.volumes.snapshot().prefix(vid);
244        while let Some((key, value)) = iter.try_next()? {
245            let key = VolumeStateKey::ref_from_bytes(&key)?;
246            debug_assert_eq!(key.vid(), vid, "vid mismatch");
247            state.accumulate(key.tag(), value)?;
248        }
249        Ok(state)
250    }
251
252    pub fn snapshot(&self, vid: &VolumeId) -> Result<Option<Snapshot>> {
253        let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot);
254        if let Some(snapshot) = self.volumes.get(key)? {
255            Ok(Some(Snapshot::try_from_bytes(&snapshot)?))
256        } else {
257            Ok(None)
258        }
259    }
260
261    pub fn iter_volumes(&self) -> impl TryIterator<Ok = VolumeState, Err = Culprit<StorageErr>> {
262        let iter = self.volumes.snapshot().iter().err_into();
263        VolumeQueryIter::new(iter)
264    }
265
266    pub fn volume_exists(&self, vid: VolumeId) -> Result<bool> {
267        let key = VolumeStateKey::new(vid, VolumeStateTag::Config);
268        Ok(self.volumes.contains_key(key)?)
269    }
270
271    pub fn query_volumes(
272        &self,
273        sync: SyncDirection,
274        vids: Option<HashSet<VolumeId>>,
275    ) -> impl TryIterator<Ok = VolumeState, Err = Culprit<StorageErr>> {
276        let iter = self.volumes.snapshot().iter().err_into();
277        let iter = VolumeQueryIter::new(iter);
278        iter.try_filter(move |state| {
279            let matches_vid = vids.as_ref().is_none_or(|s| s.contains(state.vid()));
280            let matches_dir = state.config().sync().matches(sync);
281            Ok(matches_vid && matches_dir)
282        })
283    }
284
285    /// Returns an iterator of `PageValue`'s at an exact LSN for a volume.
286    /// Notably, this function will not return a page at an earlier LSN that is
287    /// shadowed by this LSN.
288    pub fn query_pages<'a, I>(
289        &self,
290        vid: &'a VolumeId,
291        lsn: LSN,
292        pages: I,
293    ) -> impl TryIterator<Ok = (PageIdx, Option<PageValue>), Err = Culprit<StorageErr>> + 'a
294    where
295        I: TryIterator<Ok = PageIdx, Err = Culprit<StorageErr>> + 'a,
296    {
297        let snapshot = self.pages.snapshot();
298        pages.map_ok(move |pageidx| {
299            let key = PageKey::new(vid.clone(), pageidx, lsn);
300            if let Some(page) = snapshot.get(key)? {
301                Ok((pageidx, Some(PageValue::try_from(page).or_into_ctx()?)))
302            } else {
303                Ok((pageidx, None))
304            }
305        })
306    }
307
308    /// Returns the most recent visible page in a volume by LSN at a particular
309    /// `PageIdx`. Notably, this will return a page from an earlier LSN if the page
310    /// hasn't changed since then.
311    pub fn read(&self, vid: &VolumeId, lsn: LSN, pageidx: PageIdx) -> Result<(LSN, PageValue)> {
312        let first_key = PageKey::new(vid.clone(), pageidx, LSN::FIRST);
313        let key = PageKey::new(vid.clone(), pageidx, lsn);
314        let range = first_key..=key;
315
316        // Search for the latest page between LSN(0) and the requested LSN,
317        // returning PageValue::Pending if none found.
318        if let Some((key, page)) = self.pages.snapshot().range(range).next_back().transpose()? {
319            let lsn = PageKey::try_ref_from_bytes(&key)?.lsn();
320            let bytes: Bytes = page.into();
321            Ok((lsn, PageValue::try_from(bytes).or_into_ctx()?))
322        } else {
323            Ok((lsn, PageValue::Pending))
324        }
325    }
326
327    pub fn commit(
328        &self,
329        vid: &VolumeId,
330        snapshot: Option<Snapshot>,
331        pages: impl Into<PageCount>,
332        memtable: Memtable,
333    ) -> Result<Snapshot> {
334        let pages = pages.into();
335        let span = tracing::debug_span!(
336            "volume_commit",
337            ?vid,
338            ?snapshot,
339            %pages,
340            result = field::Empty
341        )
342        .entered();
343
344        let mut batch = self.keyspace.batch();
345        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
346
347        let read_lsn = snapshot.as_ref().map(|s| s.local());
348        let commit_lsn = read_lsn.map_or(LSN::FIRST, |lsn| lsn.next().expect("lsn overflow"));
349
350        // this Splinter will contain all of the PageIdxs this commit changed
351        let mut graft = Splinter::default();
352
353        // persist the memtable
354        let mut page_key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
355        for (pageidx, page) in memtable {
356            page_key = page_key.with_index(pageidx);
357            graft.insert(pageidx.into());
358            batch.insert(&self.pages, page_key.as_bytes(), PageValue::from(page));
359        }
360
361        // persist the new commit
362        let commit_key = CommitKey::new(vid.clone(), commit_lsn);
363        batch.insert(&self.commits, commit_key, graft.serialize_to_bytes());
364
365        // acquire the commit lock
366        let _permit = self.commit_lock.lock();
367
368        // check to see if the read snapshot is the latest local snapshot while
369        // holding the commit lock
370        let latest = self.snapshot(vid)?;
371        if latest.as_ref().map(|l| l.local()) != read_lsn {
372            precept::expect_reachable!(
373                "concurrent write to volume",
374                {
375                    "vid": vid,
376                    "snapshot": snapshot,
377                    "latest": latest,
378                }
379            );
380
381            return Err(Culprit::new_with_note(
382                StorageErr::ConcurrentWrite,
383                format!("Illegal concurrent write to Volume {vid}"),
384            ));
385        }
386
387        // persist the new volume snapshot
388        let snapshot_key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot);
389        let snapshot = Snapshot::new(
390            commit_lsn,
391            // don't change the remote mapping during a local commit
392            latest
393                .map(|l| l.remote_mapping().clone())
394                .unwrap_or_default(),
395            pages,
396        );
397        batch.insert(&self.volumes, snapshot_key, snapshot.as_bytes());
398
399        // commit the changes
400        batch.commit()?;
401
402        // notify listeners of the new local commit
403        self.local_changeset.mark_changed(vid);
404
405        // log the result
406        span.record("result", snapshot.to_string());
407
408        // return the new snapshot
409        Ok(snapshot)
410    }
411
412    /// Replicate a remote commit to local storage.
413    pub fn receive_remote_commit(
414        &self,
415        vid: &VolumeId,
416        remote_snapshot: graft_proto::Snapshot,
417        changed: SplinterRef<Bytes>,
418    ) -> Result<()> {
419        self.receive_remote_commit_holding_lock(
420            self.commit_lock.lock(),
421            vid,
422            remote_snapshot,
423            changed,
424        )
425    }
426
427    /// Receive a remote commit into storage; it's only safe to call this
428    /// function while holding the commit lock
429    fn receive_remote_commit_holding_lock(
430        &self,
431        _permit: MutexGuard<'_, ()>,
432        vid: &VolumeId,
433        remote_snapshot: graft_proto::Snapshot,
434        graft: SplinterRef<Bytes>,
435    ) -> Result<()> {
436        // resolve the remote lsn and page count
437        let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
438        let remote_pages = remote_snapshot.pages();
439
440        let span = tracing::debug_span!(
441            "receive_remote_commit",
442            ?vid,
443            ?remote_lsn,
444            result = field::Empty,
445        )
446        .entered();
447
448        let mut batch = self.keyspace.batch();
449        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
450
451        // retrieve the current volume state
452        let state = self.volume_state(vid)?;
453        let snapshot = state.snapshot();
454        let watermarks = state.watermarks();
455
456        // ensure that we can accept this remote commit
457        if state.is_syncing() {
458            return Err(Culprit::new_with_note(
459                StorageErr::VolumeIsSyncing,
460                format!("Volume {vid} is syncing, refusing to accept remote changes"),
461            ));
462        }
463        if state.has_pending_commits() {
464            precept::expect_reachable!(
465                "volume has pending commits while receiving remote commit",
466                { "vid": vid, "state": state }
467            );
468
469            // mark the volume as having a remote conflict
470            self.set_volume_status(&mut batch, vid, VolumeStatus::Conflict);
471
472            return Err(Culprit::new_with_note(
473                StorageErr::RemoteConflict,
474                format!("Volume {vid:?} has pending commits, refusing to accept remote changes"),
475            ));
476        }
477
478        // compute the next local lsn
479        let commit_lsn = snapshot.map_or(LSN::FIRST, |s| s.local().next().expect("lsn overflow"));
480        let remote_mapping = RemoteMapping::new(remote_lsn, commit_lsn);
481
482        // persist the new volume snapshot
483        let new_snapshot = Snapshot::new(commit_lsn, remote_mapping, remote_pages);
484        batch.insert(
485            &self.volumes,
486            VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
487            new_snapshot.as_bytes(),
488        );
489
490        // fast forward the pending sync watermark to ensure we don't roundtrip this
491        // commit back to the server
492        batch.insert(
493            &self.volumes,
494            VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
495            watermarks
496                .clone()
497                .with_pending_sync(Watermark::new(commit_lsn, remote_pages)),
498        );
499
500        // mark changed pages
501        let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
502        let pending = Bytes::from(PageValue::Pending);
503        for pageidx in graft.iter() {
504            key = key.with_index(pageidx.try_into()?);
505            batch.insert(&self.pages, key.as_ref(), pending.clone());
506        }
507
508        batch.commit()?;
509
510        // notify listeners of the new remote commit
511        self.remote_changeset.mark_changed(vid);
512
513        // log the result
514        span.record("result", new_snapshot.to_string());
515
516        Ok(())
517    }
518
519    /// Write a set of `PageValue`'s to storage.
520    pub fn receive_pages(
521        &self,
522        vid: &VolumeId,
523        pages: HashMap<PageIdx, (LSN, PageValue)>,
524    ) -> Result<()> {
525        let mut batch = self.keyspace.batch();
526        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
527
528        for (pageidx, (lsn, pagevalue)) in pages {
529            tracing::trace!("caching page {pageidx} into lsn {lsn} with value {pagevalue:?}");
530            let key = PageKey::new(vid.clone(), pageidx, lsn);
531            batch.insert(&self.pages, key.as_ref(), pagevalue);
532        }
533        Ok(batch.commit()?)
534    }
535
536    /// Prepare to sync a volume to the remote.
537    /// Returns:
538    /// - the last known remote LSN
539    /// - the local page count we are syncing
540    /// - the range of LSNs to sync
541    /// - an iterator of commits to sync
542    #[allow(clippy::type_complexity)]
543    pub fn prepare_sync_to_remote(
544        &self,
545        vid: &VolumeId,
546    ) -> Result<(
547        Option<LSN>,
548        PageCount,
549        RangeInclusive<LSN>,
550        impl TryIterator<Ok = (LSN, SplinterRef<Slice>), Err = Culprit<StorageErr>>,
551    )> {
552        // acquire the commit lock
553        let _permit = self.commit_lock.lock();
554
555        // retrieve the current volume state
556        let state = self.volume_state(vid)?;
557
558        // ensure that we only run this job when we actually have commits to sync
559        precept::expect_always_or_unreachable!(
560            state.has_pending_commits(),
561            "the sync push job only runs when we have local commits to push",
562            { "vid": vid, "state": state }
563        );
564
565        // resolve the snapshot; we can expect it to be available because this
566        // function should only run when we have local commits to sync
567        let snapshot = state.snapshot().expect("volume snapshot missing").clone();
568        let local_lsn = snapshot.local();
569
570        // calculate the end of the sync range
571        let (end_lsn, page_count) = if state.is_syncing() {
572            // if we are resuming a previously interrupted sync, use the
573            // existing pending_sync watermark
574            let pending_sync = state.watermarks().pending_sync();
575            tracing::debug!(
576                ?vid,
577                ?pending_sync,
578                %snapshot,
579                "resuming previously interrupted sync"
580            );
581            precept::expect_reachable!("resuming previously interrupted sync", state);
582            pending_sync.splat().expect("pending sync must be mapped")
583        } else {
584            // update pending_sync to the local LSN
585            self.volumes.insert(
586                VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
587                state
588                    .watermarks()
589                    .clone()
590                    .with_pending_sync(Watermark::new(local_lsn, snapshot.pages())),
591            )?;
592            (local_lsn, snapshot.pages())
593        };
594
595        // calculate the LSN range of commits to sync
596        let start_lsn = state
597            .snapshot()
598            .and_then(|s| s.remote_local())
599            .map_or(LSN::FIRST, |s| s.next().expect("LSN overflow"));
600        let lsns = start_lsn..=end_lsn;
601
602        // create a commit iterator
603        let commit_start = CommitKey::new(vid.clone(), *lsns.start());
604        let commit_end = CommitKey::new(vid.clone(), *lsns.end());
605        let mut cursor = commit_start.lsn();
606        let commits = self
607            .commits
608            .snapshot()
609            .range(commit_start..=commit_end)
610            .err_into()
611            .map_ok(move |(k, v)| {
612                let lsn = CommitKey::ref_from_bytes(&k)?.lsn();
613
614                // detect missing commits
615                assert_eq!(lsn, cursor, "missing commit detected");
616                cursor = cursor.next().expect("lsn overflow");
617
618                let splinter = SplinterRef::from_bytes(v).or_into_ctx()?;
619                Ok((lsn, splinter))
620            });
621
622        Ok((snapshot.remote(), page_count, lsns, commits))
623    }
624
625    /// Update storage after a rejected sync
626    pub fn rejected_sync_to_remote(&self, vid: &VolumeId) -> Result<()> {
627        // acquire the commit lock
628        let _permit = self.commit_lock.lock();
629        let mut batch = self.keyspace.batch();
630        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
631
632        // clear the pending sync watermark
633        let watermarks_key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks);
634        let watermarks = self
635            .volumes
636            .get(&watermarks_key)?
637            .map(|w| Watermarks::from_bytes(&w))
638            .transpose()?
639            .unwrap_or_default()
640            .with_pending_sync(Watermark::default());
641        batch.insert(&self.volumes, watermarks_key, watermarks);
642
643        // update the volume status
644        self.set_volume_status(&mut batch, vid, VolumeStatus::RejectedCommit);
645
646        Ok(batch.commit()?)
647    }
648
649    /// Complete a push operation by updating the volume snapshot and removing
650    /// all synced commits.
651    pub fn complete_sync_to_remote(
652        &self,
653        vid: &VolumeId,
654        remote_snapshot: graft_proto::Snapshot,
655        synced_lsns: RangeInclusive<LSN>,
656    ) -> Result<()> {
657        // acquire the commit lock and start a new batch
658        let _permit = self.commit_lock.lock();
659        let mut batch = self.keyspace.batch();
660        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
661
662        let state = self.volume_state(vid)?;
663
664        // resolve the snapshot; we can expect it to be available because this
665        // function should only run after we have synced a local commit
666        let snapshot = state.snapshot().expect("volume snapshot missing");
667
668        let local_lsn = snapshot.local();
669        let pages = snapshot.pages();
670        let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
671        let remote_local_lsn = synced_lsns.try_end().expect("lsn range is RangeInclusive");
672
673        // check invariants
674        assert!(
675            snapshot.remote() < Some(remote_lsn),
676            "remote LSN should be monotonically increasing"
677        );
678        assert_eq!(
679            state.watermarks().pending_sync().lsn(),
680            Some(remote_local_lsn),
681            "the pending_sync watermark doesn't match the synced LSN range"
682        );
683
684        // persist the new remote mapping to the snapshot
685        let remote_mapping = RemoteMapping::new(remote_lsn, remote_local_lsn);
686        let new_snapshot = Snapshot::new(local_lsn, remote_mapping, pages);
687        batch.insert(
688            &self.volumes,
689            VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
690            new_snapshot.as_bytes(),
691        );
692
693        // clear the pending_sync watermark
694        batch.insert(
695            &self.volumes,
696            VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
697            state
698                .watermarks()
699                .clone()
700                .with_pending_sync(Watermark::default()),
701        );
702
703        // if the status is interrupted push, clear the status
704        if state.status() == VolumeStatus::InterruptedPush {
705            batch.remove(
706                &self.volumes,
707                VolumeStateKey::new(vid.clone(), VolumeStateTag::Status),
708            );
709        }
710
711        // remove all commits in the synced range
712        let mut key = CommitKey::new(vid.clone(), LSN::FIRST);
713        for lsn in synced_lsns.iter() {
714            key = key.with_lsn(lsn);
715            batch.remove(&self.commits, key.as_ref());
716        }
717
718        batch.commit()?;
719
720        tracing::debug!(?synced_lsns, %remote_lsn, %new_snapshot, "completed sync to remote");
721
722        Ok(())
723    }
724
725    /// Reset the volume to the provided remote snapshot.
726    /// This will cause all pending commits to be rolled back and the volume
727    /// status to be cleared.
728    pub fn reset_volume_to_remote(
729        &self,
730        vid: &VolumeId,
731        remote_snapshot: graft_proto::Snapshot,
732        remote_graft: SplinterRef<Bytes>,
733    ) -> Result<()> {
734        // acquire the commit lock and start a new batch
735        let permit = self.commit_lock.lock();
736
737        let span = tracing::debug_span!(
738            "reset_volume_to_remote",
739            ?vid,
740            local_lsn = field::Empty,
741            reset_lsn = field::Empty,
742            remote_lsn = field::Empty,
743            commit_lsn = field::Empty,
744            result = field::Empty,
745        )
746        .entered();
747
748        // retrieve the current volume state
749        let state = self.volume_state(vid)?;
750        let snapshot = state.snapshot();
751
752        // the last local lsn
753        let local_lsn = snapshot.map(|s| s.local());
754        // the local lsn we are resetting to
755        let reset_lsn = snapshot.and_then(|s| s.remote_local());
756        // the remote lsn to receive
757        let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
758        // the new local lsn to commit the remote into
759        let commit_lsn = reset_lsn.map_or(LSN::FIRST, |lsn| lsn.next().expect("lsn overflow"));
760
761        span.record("local_lsn", format!("{local_lsn:?}"));
762        span.record("reset_lsn", format!("{reset_lsn:?}"));
763        span.record("remote_lsn", format!("{remote_lsn:?}"));
764        span.record("commit_lsn", format!("{commit_lsn:?}"));
765
766        if local_lsn == reset_lsn {
767            // if the local and remote LSNs are the same, we can just receive the
768            // remote commit normally
769            assert!(
770                !state.has_pending_commits(),
771                "bug: local lsn == reset lsn but state has pending commits"
772            );
773            span.record("result", format!("{snapshot:?}"));
774            drop(span);
775            return self.receive_remote_commit_holding_lock(
776                permit,
777                vid,
778                remote_snapshot,
779                remote_graft,
780            );
781        }
782
783        // ensure we never reset into the future
784        assert!(
785            reset_lsn < local_lsn,
786            "refusing to reset to a LSN larger than the current LSN; local={local_lsn:?}, target={reset_lsn:?}"
787        );
788
789        let mut batch = self.keyspace.batch();
790        batch = batch.durability(Some(fjall::PersistMode::SyncAll));
791
792        // persist the new volume snapshot
793        let remote_mapping = RemoteMapping::new(remote_lsn, commit_lsn);
794        let new_snapshot = Snapshot::new(commit_lsn, remote_mapping, remote_snapshot.pages());
795        batch.insert(
796            &self.volumes,
797            VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
798            new_snapshot.as_bytes(),
799        );
800
801        // clear the volume status
802        batch.remove(
803            &self.volumes,
804            VolumeStateKey::new(vid.clone(), VolumeStateTag::Status),
805        );
806
807        // clear the pending_sync watermark
808        batch.insert(
809            &self.volumes,
810            VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
811            state
812                .watermarks()
813                .clone()
814                .with_pending_sync(Watermark::default()),
815        );
816
817        // remove all pending commits
818        let mut commits = self.commits.snapshot().prefix(vid);
819        while let Some((key, graft)) = commits.try_next().or_into_ctx()? {
820            batch.remove(&self.commits, key.clone());
821
822            let key = CommitKey::ref_from_bytes(&key)?;
823            assert_eq!(
824                key.vid(),
825                vid,
826                "refusing to remove commit from another volume"
827            );
828            assert!(
829                Some(key.lsn()) > reset_lsn,
830                "invariant violation: no commits should exist at or below reset_lsn"
831            );
832
833            // remove the commit's changed PageIdxs
834            let graft = SplinterRef::from_bytes(graft).or_into_ctx()?;
835
836            let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, key.lsn());
837            for pageidx in graft.iter() {
838                key = key.with_index(pageidx.try_into()?);
839                batch.remove(&self.pages, key.as_ref());
840            }
841        }
842
843        // mark remotely changed pages
844        let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
845        let pending = Bytes::from(PageValue::Pending);
846        for pageidx in remote_graft.iter() {
847            key = key.with_index(pageidx.try_into()?);
848            batch.insert(&self.pages, key.as_ref(), pending.clone());
849        }
850
851        // commit the changes
852        batch.commit()?;
853
854        // post reset invariants
855        // these are expensive so we only run them when precept is enabled
856        if precept::ENABLED {
857            // scan all of the pages in the volume to verify two invariants:
858            // 1. all pages at commit_lsn must be pending
859            // 2. no pages exist at an lsn > commit_lsn
860            let mut iter = self.pages.snapshot().prefix(vid);
861            while let Some((key, val)) = iter.try_next().or_into_ctx()? {
862                let key = PageKey::try_ref_from_bytes(&key)?;
863                if key.lsn() == commit_lsn {
864                    // invariant 1: all pages at commit_lsn must be pending
865                    assert!(
866                        PageValue::is_pending(&val),
867                        "all pages at commit_lsn must be pending after reset"
868                    );
869                } else {
870                    // invariant 2: no pages exist at an lsn > commit_lsn
871                    assert!(
872                        key.lsn() < commit_lsn,
873                        "no pages should exist at a lsn > commit_lsn after reset"
874                    );
875                }
876            }
877        }
878
879        // notify listeners of the new remote commit
880        self.remote_changeset.mark_changed(vid);
881
882        // log the result
883        span.record("result", new_snapshot.to_string());
884
885        Ok(())
886    }
887}
888
889impl Debug for Storage {
890    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891        f.debug_struct("Storage")
892            .field("disk usage", &ByteUnit::new(self.keyspace.disk_space()))
893            .finish()
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use graft_core::{page::Page, pageidx};
900
901    use super::*;
902
903    #[graft_test::test]
904    fn test_query_volumes() {
905        let storage = Storage::open_temporary().unwrap();
906
907        let mut memtable = Memtable::default();
908        memtable.insert(pageidx!(1), Page::test_filled(0x42));
909
910        let mut vids = [VolumeId::random(), VolumeId::random()];
911        vids.sort();
912
913        // first volume has two commits, and is configured to pull
914        storage
915            .set_volume_config(&vids[0], VolumeConfig::new(SyncDirection::Pull))
916            .unwrap();
917        let snapshot = storage.commit(&vids[0], None, 1, memtable.clone()).unwrap();
918        storage
919            .commit(&vids[0], Some(snapshot), 1, memtable.clone())
920            .unwrap();
921
922        // second volume has one commit, and is configured to push
923        storage
924            .set_volume_config(&vids[1], VolumeConfig::new(SyncDirection::Push))
925            .unwrap();
926        storage.commit(&vids[1], None, 1, memtable.clone()).unwrap();
927
928        // ensure that we can query back out the snapshots
929        let sync = SyncDirection::Both;
930        let mut iter = storage.query_volumes(sync, None);
931
932        // check the first volume
933        let state = iter.try_next().unwrap().unwrap();
934        assert_eq!(state.vid(), &vids[0]);
935        assert_eq!(state.config().sync(), SyncDirection::Pull);
936        let snapshot = state.snapshot().unwrap();
937        assert_eq!(snapshot.local(), LSN::new(2));
938        assert_eq!(snapshot.pages(), 1);
939
940        // check the second volume
941        let state = iter.try_next().unwrap().unwrap();
942        assert_eq!(state.vid(), &vids[1]);
943        assert_eq!(state.config().sync(), SyncDirection::Push);
944        let snapshot = state.snapshot().unwrap();
945        assert_eq!(snapshot.local(), LSN::new(1));
946        assert_eq!(snapshot.pages(), 1);
947
948        // iter is empty
949        assert!(iter.next().is_none());
950
951        // verify that the sync direction filter works
952        let sync = SyncDirection::Push;
953        let mut iter = storage.query_volumes(sync, None);
954
955        // should be the second volume
956        let state = iter.try_next().unwrap().unwrap();
957        assert_eq!(state.vid(), &vids[1]);
958        assert_eq!(state.config().sync(), SyncDirection::Push);
959        let snapshot = state.snapshot().unwrap();
960        assert_eq!(snapshot.local(), LSN::new(1));
961        assert_eq!(snapshot.pages(), 1);
962
963        // iter is empty
964        assert!(iter.next().is_none());
965
966        // verify that the volume id filter works
967        let sync = SyncDirection::Both;
968        let vid_set = HashSet::from_iter([vids[0].clone()]);
969        let mut iter = storage.query_volumes(sync, Some(vid_set));
970
971        // should be the first volume
972        let state = iter.try_next().unwrap().unwrap();
973        assert_eq!(state.vid(), &vids[0]);
974
975        // iter is empty
976        assert!(iter.next().is_none());
977    }
978}