1use std::{fmt::Display, ops::RangeInclusive};
2
3use bilrost::Message;
4
5use graft_core::{VolumeId, commit_hash::CommitHash, lsn::LSN};
6
7#[derive(Debug, Clone, Message, PartialEq, Eq)]
8pub struct SyncPoint {
9 #[bilrost(1)]
11 pub remote: LSN,
12
13 #[bilrost(2)]
16 pub local_watermark: Option<LSN>,
17}
18
19#[derive(Debug, Clone, Message, PartialEq, Eq)]
20pub struct PendingCommit {
21 #[bilrost(1)]
23 pub local: LSN,
24
25 #[bilrost(2)]
27 pub commit: LSN,
28
29 #[bilrost(3)]
33 pub commit_hash: CommitHash,
34}
35
36impl From<PendingCommit> for SyncPoint {
37 fn from(value: PendingCommit) -> Self {
38 Self {
39 remote: value.commit,
40 local_watermark: Some(value.local),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
46pub struct Graft {
47 #[bilrost(1)]
49 pub local: VolumeId,
50
51 #[bilrost(2)]
53 pub remote: VolumeId,
54
55 #[bilrost(3)]
58 pub sync: Option<SyncPoint>,
59
60 #[bilrost(4)]
65 pub pending_commit: Option<PendingCommit>,
66}
67
68impl Graft {
69 pub fn new(
70 local: VolumeId,
71 remote: VolumeId,
72 sync: Option<SyncPoint>,
73 pending_commit: Option<PendingCommit>,
74 ) -> Self {
75 Self { local, remote, sync, pending_commit }
76 }
77
78 pub fn with_sync(self, sync: Option<SyncPoint>) -> Self {
79 Self { sync, ..self }
80 }
81
82 pub fn sync(&self) -> Option<&SyncPoint> {
83 self.sync.as_ref()
84 }
85
86 pub fn with_pending_commit(self, pending_commit: Option<PendingCommit>) -> Self {
87 Self { pending_commit, ..self }
88 }
89
90 pub fn pending_commit(&self) -> Option<&PendingCommit> {
91 self.pending_commit.as_ref()
92 }
93
94 pub fn local_watermark(&self) -> Option<LSN> {
95 self.sync().and_then(|s| s.local_watermark)
96 }
97
98 pub fn remote_commit(&self) -> Option<LSN> {
99 self.sync().map(|s| s.remote)
100 }
101
102 pub fn local_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
103 AheadStatus { head, base: self.local_watermark() }.changes()
104 }
105
106 pub fn remote_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
107 AheadStatus {
108 head,
109 base: self.sync().map(|s| s.remote),
110 }
111 .changes()
112 }
113
114 pub fn status(&self, latest_local: Option<LSN>, latest_remote: Option<LSN>) -> GraftStatus {
115 GraftStatus {
116 local: self.local.clone(),
117 local_status: AheadStatus {
118 head: latest_local,
119 base: self.local_watermark(),
120 },
121 remote: self.remote.clone(),
122 remote_status: AheadStatus {
123 head: latest_remote,
124 base: self.sync().map(|s| s.remote),
125 },
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq)]
131pub struct AheadStatus {
132 pub head: Option<LSN>,
133 pub base: Option<LSN>,
134}
135
136impl AheadStatus {
137 pub fn new(head: Option<LSN>, base: Option<LSN>) -> Self {
138 Self { head, base }
139 }
140
141 pub fn changes(&self) -> Option<RangeInclusive<LSN>> {
142 match (self.base, self.head) {
143 (None, None) => None,
144 (None, Some(head)) => Some(LSN::FIRST..=head),
145 (Some(base), Some(head)) => (base < head).then(|| base.next()..=head),
146
147 (Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
148 }
149 }
150}
151
152impl Display for AheadStatus {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 match (self.base, self.head) {
155 (Some(base), Some(head)) => {
156 let ahead = head.since(base).expect("BUG: monotonicity violation");
157 if ahead == 0 {
158 write!(f, "{head}")
159 } else {
160 write!(f, "{head}+{ahead}")
161 }
162 }
163 (None, Some(head)) => write!(f, "{head}"),
164 (None, None) => write!(f, "_"),
165
166 (Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct GraftStatus {
173 pub local: VolumeId,
174 pub local_status: AheadStatus,
175 pub remote: VolumeId,
176 pub remote_status: AheadStatus,
177}
178
179impl Display for GraftStatus {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 write!(f, "{} r{}", self.local_status, self.remote_status)
193 }
194}
195
196impl GraftStatus {
197 pub fn has_diverged(&self) -> bool {
198 self.local_status.changes().is_some() && self.remote_status.changes().is_some()
199 }
200}