Skip to main content

graft_client/runtime/storage/
volume_state.rs

1// seems like rust analyzer has a bug that causes this warning to spuriously
2// fire on camel case types that also use underscores which is what zerocopy
3// generates for enum struct variants
4#![allow(non_camel_case_types)]
5
6use culprit::{Culprit, ResultExt};
7use fjall::{KvPair, Slice};
8use graft_core::{PageCount, VolumeId, lsn::LSN, zerocopy_ext::TryFromBytesExt};
9use serde::{Deserialize, Serialize};
10use std::{
11    fmt::{Debug, Display},
12    iter::FusedIterator,
13};
14use tryiter::TryIteratorExt;
15use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
16
17use super::{StorageErr, snapshot::Snapshot};
18
19#[derive(
20    Debug, KnownLayout, Immutable, TryFromBytes, IntoBytes, Unaligned, Clone, Copy, PartialEq, Eq,
21)]
22#[repr(u8)]
23pub enum VolumeStateTag {
24    Config = 1,
25    Status = 2,
26    Snapshot = 3,
27    Watermarks = 4,
28}
29
30#[derive(
31    Debug, KnownLayout, Immutable, TryFromBytes, IntoBytes, Unaligned, Clone, PartialEq, Eq,
32)]
33#[repr(C)]
34pub struct VolumeStateKey {
35    vid: VolumeId,
36    tag: VolumeStateTag,
37}
38
39impl From<VolumeStateKey> for Slice {
40    fn from(key: VolumeStateKey) -> Slice {
41        key.as_bytes().into()
42    }
43}
44
45impl AsRef<[u8]> for VolumeStateKey {
46    fn as_ref(&self) -> &[u8] {
47        self.as_bytes()
48    }
49}
50
51impl VolumeStateKey {
52    #[inline]
53    pub fn new(vid: VolumeId, tag: VolumeStateTag) -> Self {
54        Self { vid, tag }
55    }
56
57    pub(crate) fn ref_from_bytes(bytes: &[u8]) -> Result<&Self, Culprit<StorageErr>> {
58        Self::try_ref_from_unaligned_bytes(bytes).or_ctx(StorageErr::CorruptKey)
59    }
60
61    #[inline]
62    pub fn vid(&self) -> &VolumeId {
63        &self.vid
64    }
65
66    #[inline]
67    pub fn tag(&self) -> VolumeStateTag {
68        self.tag
69    }
70
71    #[inline]
72    pub fn with_tag(self, tag: VolumeStateTag) -> Self {
73        Self { tag, ..self }
74    }
75}
76
77#[derive(
78    Default,
79    Debug,
80    KnownLayout,
81    Immutable,
82    TryFromBytes,
83    IntoBytes,
84    Unaligned,
85    Clone,
86    Copy,
87    PartialEq,
88    Eq,
89    Serialize,
90)]
91#[repr(u8)]
92pub enum SyncDirection {
93    #[default]
94    Disabled = 0,
95    Push = 1,
96    Pull = 2,
97    Both = 3,
98}
99
100impl SyncDirection {
101    pub fn matches(self, other: SyncDirection) -> bool {
102        match (self, other) {
103            (SyncDirection::Disabled, SyncDirection::Disabled) => true,
104            (SyncDirection::Disabled, _) | (_, SyncDirection::Disabled) => false,
105            (SyncDirection::Both, _) | (_, SyncDirection::Both) => true,
106            (a, b) => a == b,
107        }
108    }
109}
110
111#[derive(
112    KnownLayout, Immutable, TryFromBytes, IntoBytes, Clone, PartialEq, Eq, Debug, Default, Serialize,
113)]
114#[repr(C)]
115pub struct VolumeConfig {
116    sync: SyncDirection,
117}
118
119impl VolumeConfig {
120    pub const DEFAULT: Self = Self { sync: SyncDirection::Disabled };
121
122    pub fn new(sync: SyncDirection) -> Self {
123        Self { sync }
124    }
125
126    pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
127        Self::try_read_from_bytes(bytes)
128            .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Config, e.into()))
129    }
130
131    pub fn sync(&self) -> SyncDirection {
132        self.sync
133    }
134
135    pub fn with_sync(self, sync: SyncDirection) -> Self {
136        Self { sync }
137    }
138}
139
140impl AsRef<[u8]> for VolumeConfig {
141    fn as_ref(&self) -> &[u8] {
142        self.as_bytes()
143    }
144}
145
146impl From<VolumeConfig> for Slice {
147    fn from(config: VolumeConfig) -> Slice {
148        config.as_bytes().into()
149    }
150}
151
152#[derive(
153    KnownLayout,
154    Immutable,
155    TryFromBytes,
156    IntoBytes,
157    Clone,
158    Copy,
159    PartialEq,
160    Eq,
161    Debug,
162    Serialize,
163    Default,
164)]
165#[repr(u8)]
166pub enum VolumeStatus {
167    #[default]
168    Ok = 0,
169
170    /// The last commit graft attempted to push to the server was rejected
171    RejectedCommit = 1,
172
173    /// The local and remote volume state have diverged
174    Conflict = 2,
175
176    /// The volume was interrupted in the middle of a push operation
177    InterruptedPush = 3,
178}
179
180impl VolumeStatus {
181    pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
182        Self::try_read_from_bytes(bytes)
183            .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Status, e.into()))
184    }
185}
186
187impl AsRef<[u8]> for VolumeStatus {
188    fn as_ref(&self) -> &[u8] {
189        self.as_bytes()
190    }
191}
192
193impl From<VolumeStatus> for Slice {
194    fn from(status: VolumeStatus) -> Slice {
195        status.as_bytes().into()
196    }
197}
198
199impl Display for VolumeStatus {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        match self {
202            VolumeStatus::Ok => write!(f, "ok"),
203            VolumeStatus::RejectedCommit => write!(f, "rejected commit"),
204            VolumeStatus::Conflict => write!(f, "conflict"),
205            VolumeStatus::InterruptedPush => write!(f, "interrupted push"),
206        }
207    }
208}
209
210#[derive(
211    KnownLayout,
212    Immutable,
213    TryFromBytes,
214    IntoBytes,
215    Clone,
216    Copy,
217    PartialEq,
218    Eq,
219    Serialize,
220    Deserialize,
221)]
222#[repr(u8)]
223pub enum Watermark {
224    Unmapped {
225        _padding: [u8; 15],
226    },
227    Mapped {
228        _padding: [u8; 3],
229        pages: PageCount,
230        lsn: LSN,
231    },
232}
233
234impl Watermark {
235    const UNMAPPED: Watermark = Watermark::Unmapped { _padding: [0; 15] };
236
237    pub fn new(lsn: LSN, pages: PageCount) -> Self {
238        Watermark::Mapped { _padding: [0; 3], pages, lsn }
239    }
240
241    #[inline]
242    pub fn pages(&self) -> Option<PageCount> {
243        match self {
244            Watermark::Mapped { pages, .. } => Some(*pages),
245            Watermark::Unmapped { .. } => None,
246        }
247    }
248
249    #[inline]
250    pub fn lsn(&self) -> Option<LSN> {
251        match self {
252            Watermark::Mapped { lsn, .. } => Some(*lsn),
253            Watermark::Unmapped { .. } => None,
254        }
255    }
256
257    #[inline]
258    pub fn splat(&self) -> Option<(LSN, PageCount)> {
259        match self {
260            Watermark::Mapped { lsn, pages, .. } => Some((*lsn, *pages)),
261            Watermark::Unmapped { .. } => None,
262        }
263    }
264}
265
266impl Debug for Watermark {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        match self {
269            Watermark::Unmapped { .. } => write!(f, "Watermark::Unmapped"),
270            Watermark::Mapped { pages, lsn, .. } => {
271                write!(f, "Watermark::Mapped(lsn: {lsn}, pages: {pages})")
272            }
273        }
274    }
275}
276
277impl Default for Watermark {
278    fn default() -> Self {
279        Self::UNMAPPED
280    }
281}
282
283#[derive(
284    Debug,
285    KnownLayout,
286    Immutable,
287    TryFromBytes,
288    IntoBytes,
289    Clone,
290    PartialEq,
291    Eq,
292    Serialize,
293    Deserialize,
294    Default,
295)]
296#[repr(C)]
297pub struct Watermarks {
298    /// `pending_sync` is a local snapshot that is in the process of syncing to the server
299    pending_sync: Watermark,
300
301    /// checkpoint is the last local snapshot that represents a volume checkpoint
302    checkpoint: Watermark,
303}
304
305impl Watermarks {
306    pub const DEFAULT: Self = Self {
307        pending_sync: Watermark::UNMAPPED,
308        checkpoint: Watermark::UNMAPPED,
309    };
310
311    pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
312        Self::try_read_from_bytes(bytes)
313            .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Watermarks, e.into()))
314    }
315
316    #[inline]
317    pub fn pending_sync(&self) -> Watermark {
318        self.pending_sync
319    }
320
321    #[inline]
322    pub fn with_pending_sync(self, pending_sync: Watermark) -> Self {
323        Self { pending_sync, ..self }
324    }
325
326    #[inline]
327    pub fn checkpoint(&self) -> Watermark {
328        self.checkpoint
329    }
330
331    #[inline]
332    pub fn with_checkpoint(self, checkpoint: Watermark) -> Self {
333        Self { checkpoint, ..self }
334    }
335}
336
337impl From<Watermarks> for Slice {
338    fn from(watermarks: Watermarks) -> Slice {
339        watermarks.as_bytes().into()
340    }
341}
342
343#[derive(Debug, Clone, Serialize)]
344pub struct VolumeState {
345    vid: VolumeId,
346    config: Option<VolumeConfig>,
347    status: Option<VolumeStatus>,
348    snapshot: Option<Snapshot>,
349    watermarks: Option<Watermarks>,
350}
351
352impl VolumeState {
353    pub(crate) fn new(vid: VolumeId) -> Self {
354        Self {
355            vid,
356            config: None,
357            status: None,
358            snapshot: None,
359            watermarks: None,
360        }
361    }
362
363    #[inline]
364    pub fn vid(&self) -> &VolumeId {
365        &self.vid
366    }
367
368    #[inline]
369    pub fn config(&self) -> &VolumeConfig {
370        precept::expect_always_or_unreachable!(
371            self.config.is_some(),
372            "volume config should always be present",
373            { "state": self }
374        );
375        debug_assert!(
376            self.config.is_some(),
377            "volume config should always be present; got {self:?}"
378        );
379        self.config.as_ref().unwrap_or(&VolumeConfig::DEFAULT)
380    }
381
382    #[inline]
383    pub fn status(&self) -> VolumeStatus {
384        self.status.unwrap_or(VolumeStatus::Ok)
385    }
386
387    #[inline]
388    pub fn snapshot(&self) -> Option<&Snapshot> {
389        self.snapshot.as_ref()
390    }
391
392    #[inline]
393    pub fn watermarks(&self) -> &Watermarks {
394        self.watermarks.as_ref().unwrap_or(&Watermarks::DEFAULT)
395    }
396
397    pub fn is_syncing(&self) -> bool {
398        if let Some(pending_sync) = self.watermarks().pending_sync().lsn() {
399            let last_sync = self.snapshot().and_then(|s| s.remote_local());
400            debug_assert!(
401                last_sync <= Some(pending_sync),
402                "invariant violation: last_sync should never be larger than pending_sync"
403            );
404            last_sync < Some(pending_sync)
405        } else {
406            false
407        }
408    }
409
410    pub fn has_pending_commits(&self) -> bool {
411        let last_sync = self.snapshot().and_then(|s| s.remote_local());
412        let local = self.snapshot().map(|s| s.local());
413        debug_assert!(
414            last_sync <= local,
415            "invariant violation: last_sync should never be larger than local"
416        );
417        last_sync < local
418    }
419
420    pub(crate) fn accumulate(
421        &mut self,
422        tag: VolumeStateTag,
423        value: Slice,
424    ) -> Result<(), Culprit<StorageErr>> {
425        match tag {
426            VolumeStateTag::Config => {
427                self.config = Some(VolumeConfig::from_bytes(&value)?);
428            }
429            VolumeStateTag::Status => {
430                self.status = Some(VolumeStatus::from_bytes(&value)?);
431            }
432            VolumeStateTag::Snapshot => {
433                self.snapshot = Some(Snapshot::try_from_bytes(&value)?);
434            }
435            VolumeStateTag::Watermarks => {
436                self.watermarks = Some(Watermarks::from_bytes(&value)?);
437            }
438        }
439        Ok(())
440    }
441}
442
443pub struct VolumeQueryIter<I> {
444    current: Option<VolumeState>,
445    inner: I,
446}
447
448impl<I> VolumeQueryIter<I> {
449    pub fn new(inner: I) -> Self {
450        Self { current: None, inner }
451    }
452}
453
454impl<I> VolumeQueryIter<I>
455where
456    I: Iterator<Item = lsm_tree::Result<KvPair>>,
457{
458    fn next_inner(&mut self) -> Result<Option<VolumeState>, Culprit<StorageErr>> {
459        // pull from our inner iterator until we see the next vid, then emit
460        while let Some((key, value)) = self.inner.try_next().or_into_ctx()? {
461            let key = VolumeStateKey::ref_from_bytes(&key)?;
462
463            let current = self
464                .current
465                .get_or_insert_with(|| VolumeState::new(key.vid.clone()));
466
467            if current.vid != key.vid {
468                assert!(
469                    key.vid > current.vid,
470                    "iterator must return volume ids in ascending order"
471                );
472
473                // this key corresponds to the next volume, so let's initialize
474                // a new volume state and return the current state
475                let mut next_state = VolumeState::new(key.vid.clone());
476                next_state.accumulate(key.tag, value)?;
477                let state = self.current.replace(next_state);
478                return Ok(state);
479            } else {
480                // this key corresponds to the current volume, so let's
481                // accumulate it into the state
482                current.accumulate(key.tag, value)?;
483            }
484        }
485
486        // we've exhausted the iterator, so return the current state if one
487        // exists. this will also fuse the iterator.
488        Ok(self.current.take())
489    }
490}
491
492impl<I> Iterator for VolumeQueryIter<I>
493where
494    I: Iterator<Item = lsm_tree::Result<KvPair>>,
495{
496    type Item = Result<VolumeState, Culprit<StorageErr>>;
497
498    fn next(&mut self) -> Option<Self::Item> {
499        self.next_inner().transpose()
500    }
501}
502
503// VolumeQueryIter fuses
504impl<I> FusedIterator for VolumeQueryIter<I> where I: Iterator<Item = lsm_tree::Result<KvPair>> {}