graft_kernel/
graft.rs

1use std::{fmt::Display, ops::RangeInclusive};
2
3use bilrost::Message;
4
5use graft_core::{VolumeId, commit_hash::CommitHash, lsn::LSN};
6
7#[derive(Debug, Clone, Message, PartialEq, Eq)]
8pub struct SyncPoint {
9    /// This Graft is attached to the Remote Volume at this LSN
10    #[bilrost(1)]
11    pub remote: LSN,
12
13    /// All commits up to this watermark in the local volume have been written
14    /// to the remote.
15    #[bilrost(2)]
16    pub local_watermark: Option<LSN>,
17}
18
19#[derive(Debug, Clone, Message, PartialEq, Eq)]
20pub struct PendingCommit {
21    /// The LSN we are syncing from the local Volume
22    #[bilrost(1)]
23    pub local: LSN,
24
25    /// The LSN we are creating in the remote Volume
26    #[bilrost(2)]
27    pub commit: LSN,
28
29    /// The pending remote commit hash. This is used to determine whether or not
30    /// the commit has landed in the remote, in the case that we are interrupted
31    /// while attempting to push.
32    #[bilrost(3)]
33    pub commit_hash: CommitHash,
34}
35
36impl From<PendingCommit> for SyncPoint {
37    fn from(value: PendingCommit) -> Self {
38        Self {
39            remote: value.commit,
40            local_watermark: Some(value.local),
41        }
42    }
43}
44
45#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
46pub struct Graft {
47    /// The local Volume backing this Graft
48    #[bilrost(1)]
49    pub local: VolumeId,
50
51    /// The remote Volume backing this Graft.
52    #[bilrost(2)]
53    pub remote: VolumeId,
54
55    /// Metadata keeping track of which portion of the local and remote volume
56    /// this Graft cares about.
57    #[bilrost(3)]
58    pub sync: Option<SyncPoint>,
59
60    /// Presence of the `pending_commit` field means that the Push operation is in
61    /// the process of committing to the remote. If no such Push job is currently
62    /// running (i.e. it was interrupted), this field must be used to resume or
63    /// abort the commit process.
64    #[bilrost(4)]
65    pub pending_commit: Option<PendingCommit>,
66}
67
68impl Graft {
69    pub fn new(
70        local: VolumeId,
71        remote: VolumeId,
72        sync: Option<SyncPoint>,
73        pending_commit: Option<PendingCommit>,
74    ) -> Self {
75        Self { local, remote, sync, pending_commit }
76    }
77
78    pub fn with_sync(self, sync: Option<SyncPoint>) -> Self {
79        Self { sync, ..self }
80    }
81
82    pub fn sync(&self) -> Option<&SyncPoint> {
83        self.sync.as_ref()
84    }
85
86    pub fn with_pending_commit(self, pending_commit: Option<PendingCommit>) -> Self {
87        Self { pending_commit, ..self }
88    }
89
90    pub fn pending_commit(&self) -> Option<&PendingCommit> {
91        self.pending_commit.as_ref()
92    }
93
94    pub fn local_watermark(&self) -> Option<LSN> {
95        self.sync().and_then(|s| s.local_watermark)
96    }
97
98    pub fn remote_commit(&self) -> Option<LSN> {
99        self.sync().map(|s| s.remote)
100    }
101
102    pub fn local_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
103        AheadStatus { head, base: self.local_watermark() }.changes()
104    }
105
106    pub fn remote_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
107        AheadStatus {
108            head,
109            base: self.sync().map(|s| s.remote),
110        }
111        .changes()
112    }
113
114    pub fn status(&self, latest_local: Option<LSN>, latest_remote: Option<LSN>) -> GraftStatus {
115        GraftStatus {
116            local: self.local.clone(),
117            local_status: AheadStatus {
118                head: latest_local,
119                base: self.local_watermark(),
120            },
121            remote: self.remote.clone(),
122            remote_status: AheadStatus {
123                head: latest_remote,
124                base: self.sync().map(|s| s.remote),
125            },
126        }
127    }
128}
129
130#[derive(Debug, PartialEq, Eq)]
131pub struct AheadStatus {
132    pub head: Option<LSN>,
133    pub base: Option<LSN>,
134}
135
136impl AheadStatus {
137    pub fn new(head: Option<LSN>, base: Option<LSN>) -> Self {
138        Self { head, base }
139    }
140
141    pub fn changes(&self) -> Option<RangeInclusive<LSN>> {
142        match (self.base, self.head) {
143            (None, None) => None,
144            (None, Some(head)) => Some(LSN::FIRST..=head),
145            (Some(base), Some(head)) => (base < head).then(|| base.next()..=head),
146
147            (Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
148        }
149    }
150}
151
152impl Display for AheadStatus {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        match (self.base, self.head) {
155            (Some(base), Some(head)) => {
156                let ahead = head.since(base).expect("BUG: monotonicity violation");
157                if ahead == 0 {
158                    write!(f, "{head}")
159                } else {
160                    write!(f, "{head}+{ahead}")
161                }
162            }
163            (None, Some(head)) => write!(f, "{head}"),
164            (None, None) => write!(f, "_"),
165
166            (Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
167        }
168    }
169}
170
171#[derive(Debug)]
172pub struct GraftStatus {
173    pub local: VolumeId,
174    pub local_status: AheadStatus,
175    pub remote: VolumeId,
176    pub remote_status: AheadStatus,
177}
178
179/// Output a human readable concise description of the status of this named
180/// volume.
181///
182/// # Output examples:
183///  - `_ r_`: empty volume
184///  - `123 r_`: never synced
185///  - `123 r130`: remote and local in sync
186///  - `_ r130+7`: remote is 7 commits ahead, local is empty
187///  - `123+3 r130`: local is 3 commits ahead
188///  - `123 r130+3`: remote is 3 commits ahead
189///  - `123+2 r130+3`: local and remote have diverged
190impl Display for GraftStatus {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        write!(f, "{} r{}", self.local_status, self.remote_status)
193    }
194}
195
196impl GraftStatus {
197    pub fn has_diverged(&self) -> bool {
198        self.local_status.changes().is_some() && self.remote_status.changes().is_some()
199    }
200}