1use std::{
2 collections::{HashMap, HashSet},
3 fmt::Debug,
4 io,
5 ops::RangeInclusive,
6 path::Path,
7 sync::Arc,
8};
9
10use bytes::Bytes;
11use changeset::ChangeSet;
12use commit::CommitKey;
13use culprit::{Culprit, ResultExt};
14use fjall::{KvSeparationOptions, PartitionCreateOptions, Slice};
15use graft_core::{
16 PageIdx, VolumeId,
17 byte_unit::ByteUnit,
18 lsn::{LSN, LSNRangeExt},
19 page::PageSizeErr,
20 page_count::PageCount,
21 page_idx::ConvertToPageIdxErr,
22 zerocopy_ext::ZerocopyErr,
23};
24use memtable::Memtable;
25use page::{PageKey, PageValue, PageValueConversionErr};
26use parking_lot::{Mutex, MutexGuard};
27use snapshot::{RemoteMapping, Snapshot};
28use splinter_rs::{DecodeErr, Splinter, SplinterRef};
29use tracing::field;
30use tryiter::{TryIterator, TryIteratorExt};
31use volume_state::{
32 SyncDirection, VolumeConfig, VolumeQueryIter, VolumeState, VolumeStateKey, VolumeStateTag,
33 VolumeStatus, Watermark, Watermarks,
34};
35use zerocopy::IntoBytes;
36
37pub mod changeset;
38pub(crate) mod commit;
39pub(crate) mod memtable;
40pub mod page;
41pub mod snapshot;
42pub mod volume_state;
43
44type Result<T> = std::result::Result<T, Culprit<StorageErr>>;
45
46#[derive(Debug, thiserror::Error)]
47pub enum StorageErr {
48 #[error("fjall error: {0}")]
49 FjallErr(#[from] fjall::Error),
50
51 #[error("io error: {0}")]
52 IoErr(io::ErrorKind),
53
54 #[error("Corrupt key: {0}")]
55 CorruptKey(ZerocopyErr),
56
57 #[error("Corrupt snapshot: {0}")]
58 CorruptSnapshot(ZerocopyErr),
59
60 #[error("Corrupt volume config: {0}")]
61 CorruptVolumeConfig(ZerocopyErr),
62
63 #[error("Volume state {0:?} is corrupt: {1}")]
64 CorruptVolumeState(VolumeStateTag, ZerocopyErr),
65
66 #[error("Corrupt page: {0}")]
67 CorruptPage(#[from] PageValueConversionErr),
68
69 #[error("Corrupt commit: {0}")]
70 CorruptCommit(#[from] DecodeErr),
71
72 #[error("Illegal concurrent write to volume")]
73 ConcurrentWrite,
74
75 #[error("Volume needs recovery")]
76 VolumeIsSyncing,
77
78 #[error(
79 "The local Volume state is ahead of the remote state, refusing to accept remote changes"
80 )]
81 RemoteConflict,
82
83 #[error("invalid page index")]
84 ConvertToPageIdxErr(#[from] ConvertToPageIdxErr),
85}
86
87impl From<io::Error> for StorageErr {
88 fn from(err: io::Error) -> Self {
89 StorageErr::IoErr(err.kind())
90 }
91}
92
93impl From<lsm_tree::Error> for StorageErr {
94 fn from(err: lsm_tree::Error) -> Self {
95 StorageErr::FjallErr(err.into())
96 }
97}
98
99impl From<PageSizeErr> for StorageErr {
100 fn from(err: PageSizeErr) -> Self {
101 StorageErr::CorruptPage(err.into())
102 }
103}
104
105pub struct Storage {
106 keyspace: fjall::Keyspace,
107
108 volumes: fjall::Partition,
118
119 pages: fjall::Partition,
122
123 commits: fjall::Partition,
126
127 commit_lock: Arc<Mutex<()>>,
133
134 local_changeset: ChangeSet<VolumeId>,
136
137 remote_changeset: ChangeSet<VolumeId>,
139}
140
141impl Storage {
142 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
143 tracing::debug!("opening runtime storage at {}", path.as_ref().display());
144 Self::open_config(fjall::Config::new(path))
145 }
146
147 pub fn open_temporary() -> Result<Self> {
148 let path = tempfile::tempdir()?.keep();
149 tracing::debug!("opening temporary runtime storage at {}", path.display());
150 Self::open_config(fjall::Config::new(path).temporary(true))
151 }
152
153 fn open_config(config: fjall::Config) -> Result<Self> {
154 let keyspace = config.open()?;
155 let volumes = keyspace.open_partition("volumes", Default::default())?;
156 let pages = keyspace.open_partition(
157 "pages",
158 PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
159 )?;
160 let commits = keyspace.open_partition(
161 "commits",
162 PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
163 )?;
164 let storage = Storage {
165 keyspace,
166 volumes,
167 pages,
168 commits,
169 commit_lock: Default::default(),
170 local_changeset: Default::default(),
171 remote_changeset: Default::default(),
172 };
173 storage.check_for_interrupted_push()?;
174 Ok(storage)
175 }
176
177 fn check_for_interrupted_push(&self) -> Result<()> {
178 let _permit = self.commit_lock.lock();
179 let mut batch = self.keyspace.batch();
180 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
181
182 let iter = self.volumes.snapshot().iter().err_into();
183 let mut iter = VolumeQueryIter::new(iter);
184 while let Some(state) = iter.try_next()? {
185 if state.is_syncing() {
186 tracing::warn!(?state, "detected interrupted push for volume");
187 self.set_volume_status(&mut batch, state.vid(), VolumeStatus::InterruptedPush);
188 }
189 }
190 Ok(batch.commit()?)
191 }
192
193 pub fn local_changeset(&self) -> &ChangeSet<VolumeId> {
196 &self.local_changeset
197 }
198
199 pub fn remote_changeset(&self) -> &ChangeSet<VolumeId> {
202 &self.remote_changeset
203 }
204
205 pub fn set_volume_config(&self, vid: &VolumeId, config: VolumeConfig) -> Result<()> {
207 let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Config);
208 Ok(self.volumes.insert(key, config)?)
209 }
210
211 pub fn update_volume_config<F>(&self, vid: &VolumeId, mut f: F) -> Result<()>
213 where
214 F: FnMut(VolumeConfig) -> VolumeConfig,
215 {
216 let _permit = self.commit_lock.lock();
217 let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Config);
218 let config = self
219 .volumes
220 .get(&key)?
221 .map(|c| VolumeConfig::from_bytes(&c))
222 .transpose()?
223 .unwrap_or_default();
224 Ok(self.volumes.insert(key, f(config))?)
225 }
226
227 fn set_volume_status(&self, batch: &mut fjall::Batch, vid: &VolumeId, status: VolumeStatus) {
228 let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Status);
229 batch.insert(&self.volumes, key, status)
230 }
231
232 pub fn get_volume_status(&self, vid: &VolumeId) -> Result<VolumeStatus> {
233 let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Status);
234 if let Some(value) = self.volumes.get(key)? {
235 Ok(VolumeStatus::from_bytes(&value)?)
236 } else {
237 Ok(VolumeStatus::Ok)
238 }
239 }
240
241 pub fn volume_state(&self, vid: &VolumeId) -> Result<VolumeState> {
242 let mut state = VolumeState::new(vid.clone());
243 let mut iter = self.volumes.snapshot().prefix(vid);
244 while let Some((key, value)) = iter.try_next()? {
245 let key = VolumeStateKey::ref_from_bytes(&key)?;
246 debug_assert_eq!(key.vid(), vid, "vid mismatch");
247 state.accumulate(key.tag(), value)?;
248 }
249 Ok(state)
250 }
251
252 pub fn snapshot(&self, vid: &VolumeId) -> Result<Option<Snapshot>> {
253 let key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot);
254 if let Some(snapshot) = self.volumes.get(key)? {
255 Ok(Some(Snapshot::try_from_bytes(&snapshot)?))
256 } else {
257 Ok(None)
258 }
259 }
260
261 pub fn iter_volumes(&self) -> impl TryIterator<Ok = VolumeState, Err = Culprit<StorageErr>> {
262 let iter = self.volumes.snapshot().iter().err_into();
263 VolumeQueryIter::new(iter)
264 }
265
266 pub fn volume_exists(&self, vid: VolumeId) -> Result<bool> {
267 let key = VolumeStateKey::new(vid, VolumeStateTag::Config);
268 Ok(self.volumes.contains_key(key)?)
269 }
270
271 pub fn query_volumes(
272 &self,
273 sync: SyncDirection,
274 vids: Option<HashSet<VolumeId>>,
275 ) -> impl TryIterator<Ok = VolumeState, Err = Culprit<StorageErr>> {
276 let iter = self.volumes.snapshot().iter().err_into();
277 let iter = VolumeQueryIter::new(iter);
278 iter.try_filter(move |state| {
279 let matches_vid = vids.as_ref().is_none_or(|s| s.contains(state.vid()));
280 let matches_dir = state.config().sync().matches(sync);
281 Ok(matches_vid && matches_dir)
282 })
283 }
284
285 pub fn query_pages<'a, I>(
289 &self,
290 vid: &'a VolumeId,
291 lsn: LSN,
292 pages: I,
293 ) -> impl TryIterator<Ok = (PageIdx, Option<PageValue>), Err = Culprit<StorageErr>> + 'a
294 where
295 I: TryIterator<Ok = PageIdx, Err = Culprit<StorageErr>> + 'a,
296 {
297 let snapshot = self.pages.snapshot();
298 pages.map_ok(move |pageidx| {
299 let key = PageKey::new(vid.clone(), pageidx, lsn);
300 if let Some(page) = snapshot.get(key)? {
301 Ok((pageidx, Some(PageValue::try_from(page).or_into_ctx()?)))
302 } else {
303 Ok((pageidx, None))
304 }
305 })
306 }
307
308 pub fn read(&self, vid: &VolumeId, lsn: LSN, pageidx: PageIdx) -> Result<(LSN, PageValue)> {
312 let first_key = PageKey::new(vid.clone(), pageidx, LSN::FIRST);
313 let key = PageKey::new(vid.clone(), pageidx, lsn);
314 let range = first_key..=key;
315
316 if let Some((key, page)) = self.pages.snapshot().range(range).next_back().transpose()? {
319 let lsn = PageKey::try_ref_from_bytes(&key)?.lsn();
320 let bytes: Bytes = page.into();
321 Ok((lsn, PageValue::try_from(bytes).or_into_ctx()?))
322 } else {
323 Ok((lsn, PageValue::Pending))
324 }
325 }
326
327 pub fn commit(
328 &self,
329 vid: &VolumeId,
330 snapshot: Option<Snapshot>,
331 pages: impl Into<PageCount>,
332 memtable: Memtable,
333 ) -> Result<Snapshot> {
334 let pages = pages.into();
335 let span = tracing::debug_span!(
336 "volume_commit",
337 ?vid,
338 ?snapshot,
339 %pages,
340 result = field::Empty
341 )
342 .entered();
343
344 let mut batch = self.keyspace.batch();
345 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
346
347 let read_lsn = snapshot.as_ref().map(|s| s.local());
348 let commit_lsn = read_lsn.map_or(LSN::FIRST, |lsn| lsn.next().expect("lsn overflow"));
349
350 let mut graft = Splinter::default();
352
353 let mut page_key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
355 for (pageidx, page) in memtable {
356 page_key = page_key.with_index(pageidx);
357 graft.insert(pageidx.into());
358 batch.insert(&self.pages, page_key.as_bytes(), PageValue::from(page));
359 }
360
361 let commit_key = CommitKey::new(vid.clone(), commit_lsn);
363 batch.insert(&self.commits, commit_key, graft.serialize_to_bytes());
364
365 let _permit = self.commit_lock.lock();
367
368 let latest = self.snapshot(vid)?;
371 if latest.as_ref().map(|l| l.local()) != read_lsn {
372 precept::expect_reachable!(
373 "concurrent write to volume",
374 {
375 "vid": vid,
376 "snapshot": snapshot,
377 "latest": latest,
378 }
379 );
380
381 return Err(Culprit::new_with_note(
382 StorageErr::ConcurrentWrite,
383 format!("Illegal concurrent write to Volume {vid}"),
384 ));
385 }
386
387 let snapshot_key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot);
389 let snapshot = Snapshot::new(
390 commit_lsn,
391 latest
393 .map(|l| l.remote_mapping().clone())
394 .unwrap_or_default(),
395 pages,
396 );
397 batch.insert(&self.volumes, snapshot_key, snapshot.as_bytes());
398
399 batch.commit()?;
401
402 self.local_changeset.mark_changed(vid);
404
405 span.record("result", snapshot.to_string());
407
408 Ok(snapshot)
410 }
411
412 pub fn receive_remote_commit(
414 &self,
415 vid: &VolumeId,
416 remote_snapshot: graft_proto::Snapshot,
417 changed: SplinterRef<Bytes>,
418 ) -> Result<()> {
419 self.receive_remote_commit_holding_lock(
420 self.commit_lock.lock(),
421 vid,
422 remote_snapshot,
423 changed,
424 )
425 }
426
427 fn receive_remote_commit_holding_lock(
430 &self,
431 _permit: MutexGuard<'_, ()>,
432 vid: &VolumeId,
433 remote_snapshot: graft_proto::Snapshot,
434 graft: SplinterRef<Bytes>,
435 ) -> Result<()> {
436 let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
438 let remote_pages = remote_snapshot.pages();
439
440 let span = tracing::debug_span!(
441 "receive_remote_commit",
442 ?vid,
443 ?remote_lsn,
444 result = field::Empty,
445 )
446 .entered();
447
448 let mut batch = self.keyspace.batch();
449 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
450
451 let state = self.volume_state(vid)?;
453 let snapshot = state.snapshot();
454 let watermarks = state.watermarks();
455
456 if state.is_syncing() {
458 return Err(Culprit::new_with_note(
459 StorageErr::VolumeIsSyncing,
460 format!("Volume {vid} is syncing, refusing to accept remote changes"),
461 ));
462 }
463 if state.has_pending_commits() {
464 precept::expect_reachable!(
465 "volume has pending commits while receiving remote commit",
466 { "vid": vid, "state": state }
467 );
468
469 self.set_volume_status(&mut batch, vid, VolumeStatus::Conflict);
471
472 return Err(Culprit::new_with_note(
473 StorageErr::RemoteConflict,
474 format!("Volume {vid:?} has pending commits, refusing to accept remote changes"),
475 ));
476 }
477
478 let commit_lsn = snapshot.map_or(LSN::FIRST, |s| s.local().next().expect("lsn overflow"));
480 let remote_mapping = RemoteMapping::new(remote_lsn, commit_lsn);
481
482 let new_snapshot = Snapshot::new(commit_lsn, remote_mapping, remote_pages);
484 batch.insert(
485 &self.volumes,
486 VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
487 new_snapshot.as_bytes(),
488 );
489
490 batch.insert(
493 &self.volumes,
494 VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
495 watermarks
496 .clone()
497 .with_pending_sync(Watermark::new(commit_lsn, remote_pages)),
498 );
499
500 let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
502 let pending = Bytes::from(PageValue::Pending);
503 for pageidx in graft.iter() {
504 key = key.with_index(pageidx.try_into()?);
505 batch.insert(&self.pages, key.as_ref(), pending.clone());
506 }
507
508 batch.commit()?;
509
510 self.remote_changeset.mark_changed(vid);
512
513 span.record("result", new_snapshot.to_string());
515
516 Ok(())
517 }
518
519 pub fn receive_pages(
521 &self,
522 vid: &VolumeId,
523 pages: HashMap<PageIdx, (LSN, PageValue)>,
524 ) -> Result<()> {
525 let mut batch = self.keyspace.batch();
526 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
527
528 for (pageidx, (lsn, pagevalue)) in pages {
529 tracing::trace!("caching page {pageidx} into lsn {lsn} with value {pagevalue:?}");
530 let key = PageKey::new(vid.clone(), pageidx, lsn);
531 batch.insert(&self.pages, key.as_ref(), pagevalue);
532 }
533 Ok(batch.commit()?)
534 }
535
536 #[allow(clippy::type_complexity)]
543 pub fn prepare_sync_to_remote(
544 &self,
545 vid: &VolumeId,
546 ) -> Result<(
547 Option<LSN>,
548 PageCount,
549 RangeInclusive<LSN>,
550 impl TryIterator<Ok = (LSN, SplinterRef<Slice>), Err = Culprit<StorageErr>>,
551 )> {
552 let _permit = self.commit_lock.lock();
554
555 let state = self.volume_state(vid)?;
557
558 precept::expect_always_or_unreachable!(
560 state.has_pending_commits(),
561 "the sync push job only runs when we have local commits to push",
562 { "vid": vid, "state": state }
563 );
564
565 let snapshot = state.snapshot().expect("volume snapshot missing").clone();
568 let local_lsn = snapshot.local();
569
570 let (end_lsn, page_count) = if state.is_syncing() {
572 let pending_sync = state.watermarks().pending_sync();
575 tracing::debug!(
576 ?vid,
577 ?pending_sync,
578 %snapshot,
579 "resuming previously interrupted sync"
580 );
581 precept::expect_reachable!("resuming previously interrupted sync", state);
582 pending_sync.splat().expect("pending sync must be mapped")
583 } else {
584 self.volumes.insert(
586 VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
587 state
588 .watermarks()
589 .clone()
590 .with_pending_sync(Watermark::new(local_lsn, snapshot.pages())),
591 )?;
592 (local_lsn, snapshot.pages())
593 };
594
595 let start_lsn = state
597 .snapshot()
598 .and_then(|s| s.remote_local())
599 .map_or(LSN::FIRST, |s| s.next().expect("LSN overflow"));
600 let lsns = start_lsn..=end_lsn;
601
602 let commit_start = CommitKey::new(vid.clone(), *lsns.start());
604 let commit_end = CommitKey::new(vid.clone(), *lsns.end());
605 let mut cursor = commit_start.lsn();
606 let commits = self
607 .commits
608 .snapshot()
609 .range(commit_start..=commit_end)
610 .err_into()
611 .map_ok(move |(k, v)| {
612 let lsn = CommitKey::ref_from_bytes(&k)?.lsn();
613
614 assert_eq!(lsn, cursor, "missing commit detected");
616 cursor = cursor.next().expect("lsn overflow");
617
618 let splinter = SplinterRef::from_bytes(v).or_into_ctx()?;
619 Ok((lsn, splinter))
620 });
621
622 Ok((snapshot.remote(), page_count, lsns, commits))
623 }
624
625 pub fn rejected_sync_to_remote(&self, vid: &VolumeId) -> Result<()> {
627 let _permit = self.commit_lock.lock();
629 let mut batch = self.keyspace.batch();
630 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
631
632 let watermarks_key = VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks);
634 let watermarks = self
635 .volumes
636 .get(&watermarks_key)?
637 .map(|w| Watermarks::from_bytes(&w))
638 .transpose()?
639 .unwrap_or_default()
640 .with_pending_sync(Watermark::default());
641 batch.insert(&self.volumes, watermarks_key, watermarks);
642
643 self.set_volume_status(&mut batch, vid, VolumeStatus::RejectedCommit);
645
646 Ok(batch.commit()?)
647 }
648
649 pub fn complete_sync_to_remote(
652 &self,
653 vid: &VolumeId,
654 remote_snapshot: graft_proto::Snapshot,
655 synced_lsns: RangeInclusive<LSN>,
656 ) -> Result<()> {
657 let _permit = self.commit_lock.lock();
659 let mut batch = self.keyspace.batch();
660 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
661
662 let state = self.volume_state(vid)?;
663
664 let snapshot = state.snapshot().expect("volume snapshot missing");
667
668 let local_lsn = snapshot.local();
669 let pages = snapshot.pages();
670 let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
671 let remote_local_lsn = synced_lsns.try_end().expect("lsn range is RangeInclusive");
672
673 assert!(
675 snapshot.remote() < Some(remote_lsn),
676 "remote LSN should be monotonically increasing"
677 );
678 assert_eq!(
679 state.watermarks().pending_sync().lsn(),
680 Some(remote_local_lsn),
681 "the pending_sync watermark doesn't match the synced LSN range"
682 );
683
684 let remote_mapping = RemoteMapping::new(remote_lsn, remote_local_lsn);
686 let new_snapshot = Snapshot::new(local_lsn, remote_mapping, pages);
687 batch.insert(
688 &self.volumes,
689 VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
690 new_snapshot.as_bytes(),
691 );
692
693 batch.insert(
695 &self.volumes,
696 VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
697 state
698 .watermarks()
699 .clone()
700 .with_pending_sync(Watermark::default()),
701 );
702
703 if state.status() == VolumeStatus::InterruptedPush {
705 batch.remove(
706 &self.volumes,
707 VolumeStateKey::new(vid.clone(), VolumeStateTag::Status),
708 );
709 }
710
711 let mut key = CommitKey::new(vid.clone(), LSN::FIRST);
713 for lsn in synced_lsns.iter() {
714 key = key.with_lsn(lsn);
715 batch.remove(&self.commits, key.as_ref());
716 }
717
718 batch.commit()?;
719
720 tracing::debug!(?synced_lsns, %remote_lsn, %new_snapshot, "completed sync to remote");
721
722 Ok(())
723 }
724
725 pub fn reset_volume_to_remote(
729 &self,
730 vid: &VolumeId,
731 remote_snapshot: graft_proto::Snapshot,
732 remote_graft: SplinterRef<Bytes>,
733 ) -> Result<()> {
734 let permit = self.commit_lock.lock();
736
737 let span = tracing::debug_span!(
738 "reset_volume_to_remote",
739 ?vid,
740 local_lsn = field::Empty,
741 reset_lsn = field::Empty,
742 remote_lsn = field::Empty,
743 commit_lsn = field::Empty,
744 result = field::Empty,
745 )
746 .entered();
747
748 let state = self.volume_state(vid)?;
750 let snapshot = state.snapshot();
751
752 let local_lsn = snapshot.map(|s| s.local());
754 let reset_lsn = snapshot.and_then(|s| s.remote_local());
756 let remote_lsn = remote_snapshot.lsn().expect("invalid remote LSN");
758 let commit_lsn = reset_lsn.map_or(LSN::FIRST, |lsn| lsn.next().expect("lsn overflow"));
760
761 span.record("local_lsn", format!("{local_lsn:?}"));
762 span.record("reset_lsn", format!("{reset_lsn:?}"));
763 span.record("remote_lsn", format!("{remote_lsn:?}"));
764 span.record("commit_lsn", format!("{commit_lsn:?}"));
765
766 if local_lsn == reset_lsn {
767 assert!(
770 !state.has_pending_commits(),
771 "bug: local lsn == reset lsn but state has pending commits"
772 );
773 span.record("result", format!("{snapshot:?}"));
774 drop(span);
775 return self.receive_remote_commit_holding_lock(
776 permit,
777 vid,
778 remote_snapshot,
779 remote_graft,
780 );
781 }
782
783 assert!(
785 reset_lsn < local_lsn,
786 "refusing to reset to a LSN larger than the current LSN; local={local_lsn:?}, target={reset_lsn:?}"
787 );
788
789 let mut batch = self.keyspace.batch();
790 batch = batch.durability(Some(fjall::PersistMode::SyncAll));
791
792 let remote_mapping = RemoteMapping::new(remote_lsn, commit_lsn);
794 let new_snapshot = Snapshot::new(commit_lsn, remote_mapping, remote_snapshot.pages());
795 batch.insert(
796 &self.volumes,
797 VolumeStateKey::new(vid.clone(), VolumeStateTag::Snapshot),
798 new_snapshot.as_bytes(),
799 );
800
801 batch.remove(
803 &self.volumes,
804 VolumeStateKey::new(vid.clone(), VolumeStateTag::Status),
805 );
806
807 batch.insert(
809 &self.volumes,
810 VolumeStateKey::new(vid.clone(), VolumeStateTag::Watermarks),
811 state
812 .watermarks()
813 .clone()
814 .with_pending_sync(Watermark::default()),
815 );
816
817 let mut commits = self.commits.snapshot().prefix(vid);
819 while let Some((key, graft)) = commits.try_next().or_into_ctx()? {
820 batch.remove(&self.commits, key.clone());
821
822 let key = CommitKey::ref_from_bytes(&key)?;
823 assert_eq!(
824 key.vid(),
825 vid,
826 "refusing to remove commit from another volume"
827 );
828 assert!(
829 Some(key.lsn()) > reset_lsn,
830 "invariant violation: no commits should exist at or below reset_lsn"
831 );
832
833 let graft = SplinterRef::from_bytes(graft).or_into_ctx()?;
835
836 let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, key.lsn());
837 for pageidx in graft.iter() {
838 key = key.with_index(pageidx.try_into()?);
839 batch.remove(&self.pages, key.as_ref());
840 }
841 }
842
843 let mut key = PageKey::new(vid.clone(), PageIdx::FIRST, commit_lsn);
845 let pending = Bytes::from(PageValue::Pending);
846 for pageidx in remote_graft.iter() {
847 key = key.with_index(pageidx.try_into()?);
848 batch.insert(&self.pages, key.as_ref(), pending.clone());
849 }
850
851 batch.commit()?;
853
854 if precept::ENABLED {
857 let mut iter = self.pages.snapshot().prefix(vid);
861 while let Some((key, val)) = iter.try_next().or_into_ctx()? {
862 let key = PageKey::try_ref_from_bytes(&key)?;
863 if key.lsn() == commit_lsn {
864 assert!(
866 PageValue::is_pending(&val),
867 "all pages at commit_lsn must be pending after reset"
868 );
869 } else {
870 assert!(
872 key.lsn() < commit_lsn,
873 "no pages should exist at a lsn > commit_lsn after reset"
874 );
875 }
876 }
877 }
878
879 self.remote_changeset.mark_changed(vid);
881
882 span.record("result", new_snapshot.to_string());
884
885 Ok(())
886 }
887}
888
889impl Debug for Storage {
890 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891 f.debug_struct("Storage")
892 .field("disk usage", &ByteUnit::new(self.keyspace.disk_space()))
893 .finish()
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use graft_core::{page::Page, pageidx};
900
901 use super::*;
902
903 #[graft_test::test]
904 fn test_query_volumes() {
905 let storage = Storage::open_temporary().unwrap();
906
907 let mut memtable = Memtable::default();
908 memtable.insert(pageidx!(1), Page::test_filled(0x42));
909
910 let mut vids = [VolumeId::random(), VolumeId::random()];
911 vids.sort();
912
913 storage
915 .set_volume_config(&vids[0], VolumeConfig::new(SyncDirection::Pull))
916 .unwrap();
917 let snapshot = storage.commit(&vids[0], None, 1, memtable.clone()).unwrap();
918 storage
919 .commit(&vids[0], Some(snapshot), 1, memtable.clone())
920 .unwrap();
921
922 storage
924 .set_volume_config(&vids[1], VolumeConfig::new(SyncDirection::Push))
925 .unwrap();
926 storage.commit(&vids[1], None, 1, memtable.clone()).unwrap();
927
928 let sync = SyncDirection::Both;
930 let mut iter = storage.query_volumes(sync, None);
931
932 let state = iter.try_next().unwrap().unwrap();
934 assert_eq!(state.vid(), &vids[0]);
935 assert_eq!(state.config().sync(), SyncDirection::Pull);
936 let snapshot = state.snapshot().unwrap();
937 assert_eq!(snapshot.local(), LSN::new(2));
938 assert_eq!(snapshot.pages(), 1);
939
940 let state = iter.try_next().unwrap().unwrap();
942 assert_eq!(state.vid(), &vids[1]);
943 assert_eq!(state.config().sync(), SyncDirection::Push);
944 let snapshot = state.snapshot().unwrap();
945 assert_eq!(snapshot.local(), LSN::new(1));
946 assert_eq!(snapshot.pages(), 1);
947
948 assert!(iter.next().is_none());
950
951 let sync = SyncDirection::Push;
953 let mut iter = storage.query_volumes(sync, None);
954
955 let state = iter.try_next().unwrap().unwrap();
957 assert_eq!(state.vid(), &vids[1]);
958 assert_eq!(state.config().sync(), SyncDirection::Push);
959 let snapshot = state.snapshot().unwrap();
960 assert_eq!(snapshot.local(), LSN::new(1));
961 assert_eq!(snapshot.pages(), 1);
962
963 assert!(iter.next().is_none());
965
966 let sync = SyncDirection::Both;
968 let vid_set = HashSet::from_iter([vids[0].clone()]);
969 let mut iter = storage.query_volumes(sync, Some(vid_set));
970
971 let state = iter.try_next().unwrap().unwrap();
973 assert_eq!(state.vid(), &vids[0]);
974
975 assert!(iter.next().is_none());
977 }
978}