1#![allow(non_camel_case_types)]
5
6use culprit::{Culprit, ResultExt};
7use fjall::{KvPair, Slice};
8use graft_core::{PageCount, VolumeId, lsn::LSN, zerocopy_ext::TryFromBytesExt};
9use serde::{Deserialize, Serialize};
10use std::{
11 fmt::{Debug, Display},
12 iter::FusedIterator,
13};
14use tryiter::TryIteratorExt;
15use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
16
17use super::{StorageErr, snapshot::Snapshot};
18
19#[derive(
20 Debug, KnownLayout, Immutable, TryFromBytes, IntoBytes, Unaligned, Clone, Copy, PartialEq, Eq,
21)]
22#[repr(u8)]
23pub enum VolumeStateTag {
24 Config = 1,
25 Status = 2,
26 Snapshot = 3,
27 Watermarks = 4,
28}
29
30#[derive(
31 Debug, KnownLayout, Immutable, TryFromBytes, IntoBytes, Unaligned, Clone, PartialEq, Eq,
32)]
33#[repr(C)]
34pub struct VolumeStateKey {
35 vid: VolumeId,
36 tag: VolumeStateTag,
37}
38
39impl From<VolumeStateKey> for Slice {
40 fn from(key: VolumeStateKey) -> Slice {
41 key.as_bytes().into()
42 }
43}
44
45impl AsRef<[u8]> for VolumeStateKey {
46 fn as_ref(&self) -> &[u8] {
47 self.as_bytes()
48 }
49}
50
51impl VolumeStateKey {
52 #[inline]
53 pub fn new(vid: VolumeId, tag: VolumeStateTag) -> Self {
54 Self { vid, tag }
55 }
56
57 pub(crate) fn ref_from_bytes(bytes: &[u8]) -> Result<&Self, Culprit<StorageErr>> {
58 Self::try_ref_from_unaligned_bytes(bytes).or_ctx(StorageErr::CorruptKey)
59 }
60
61 #[inline]
62 pub fn vid(&self) -> &VolumeId {
63 &self.vid
64 }
65
66 #[inline]
67 pub fn tag(&self) -> VolumeStateTag {
68 self.tag
69 }
70
71 #[inline]
72 pub fn with_tag(self, tag: VolumeStateTag) -> Self {
73 Self { tag, ..self }
74 }
75}
76
77#[derive(
78 Default,
79 Debug,
80 KnownLayout,
81 Immutable,
82 TryFromBytes,
83 IntoBytes,
84 Unaligned,
85 Clone,
86 Copy,
87 PartialEq,
88 Eq,
89 Serialize,
90)]
91#[repr(u8)]
92pub enum SyncDirection {
93 #[default]
94 Disabled = 0,
95 Push = 1,
96 Pull = 2,
97 Both = 3,
98}
99
100impl SyncDirection {
101 pub fn matches(self, other: SyncDirection) -> bool {
102 match (self, other) {
103 (SyncDirection::Disabled, SyncDirection::Disabled) => true,
104 (SyncDirection::Disabled, _) | (_, SyncDirection::Disabled) => false,
105 (SyncDirection::Both, _) | (_, SyncDirection::Both) => true,
106 (a, b) => a == b,
107 }
108 }
109}
110
111#[derive(
112 KnownLayout, Immutable, TryFromBytes, IntoBytes, Clone, PartialEq, Eq, Debug, Default, Serialize,
113)]
114#[repr(C)]
115pub struct VolumeConfig {
116 sync: SyncDirection,
117}
118
119impl VolumeConfig {
120 pub const DEFAULT: Self = Self { sync: SyncDirection::Disabled };
121
122 pub fn new(sync: SyncDirection) -> Self {
123 Self { sync }
124 }
125
126 pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
127 Self::try_read_from_bytes(bytes)
128 .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Config, e.into()))
129 }
130
131 pub fn sync(&self) -> SyncDirection {
132 self.sync
133 }
134
135 pub fn with_sync(self, sync: SyncDirection) -> Self {
136 Self { sync }
137 }
138}
139
140impl AsRef<[u8]> for VolumeConfig {
141 fn as_ref(&self) -> &[u8] {
142 self.as_bytes()
143 }
144}
145
146impl From<VolumeConfig> for Slice {
147 fn from(config: VolumeConfig) -> Slice {
148 config.as_bytes().into()
149 }
150}
151
152#[derive(
153 KnownLayout,
154 Immutable,
155 TryFromBytes,
156 IntoBytes,
157 Clone,
158 Copy,
159 PartialEq,
160 Eq,
161 Debug,
162 Serialize,
163 Default,
164)]
165#[repr(u8)]
166pub enum VolumeStatus {
167 #[default]
168 Ok = 0,
169
170 RejectedCommit = 1,
172
173 Conflict = 2,
175
176 InterruptedPush = 3,
178}
179
180impl VolumeStatus {
181 pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
182 Self::try_read_from_bytes(bytes)
183 .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Status, e.into()))
184 }
185}
186
187impl AsRef<[u8]> for VolumeStatus {
188 fn as_ref(&self) -> &[u8] {
189 self.as_bytes()
190 }
191}
192
193impl From<VolumeStatus> for Slice {
194 fn from(status: VolumeStatus) -> Slice {
195 status.as_bytes().into()
196 }
197}
198
199impl Display for VolumeStatus {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 match self {
202 VolumeStatus::Ok => write!(f, "ok"),
203 VolumeStatus::RejectedCommit => write!(f, "rejected commit"),
204 VolumeStatus::Conflict => write!(f, "conflict"),
205 VolumeStatus::InterruptedPush => write!(f, "interrupted push"),
206 }
207 }
208}
209
210#[derive(
211 KnownLayout,
212 Immutable,
213 TryFromBytes,
214 IntoBytes,
215 Clone,
216 Copy,
217 PartialEq,
218 Eq,
219 Serialize,
220 Deserialize,
221)]
222#[repr(u8)]
223pub enum Watermark {
224 Unmapped {
225 _padding: [u8; 15],
226 },
227 Mapped {
228 _padding: [u8; 3],
229 pages: PageCount,
230 lsn: LSN,
231 },
232}
233
234impl Watermark {
235 const UNMAPPED: Watermark = Watermark::Unmapped { _padding: [0; 15] };
236
237 pub fn new(lsn: LSN, pages: PageCount) -> Self {
238 Watermark::Mapped { _padding: [0; 3], pages, lsn }
239 }
240
241 #[inline]
242 pub fn pages(&self) -> Option<PageCount> {
243 match self {
244 Watermark::Mapped { pages, .. } => Some(*pages),
245 Watermark::Unmapped { .. } => None,
246 }
247 }
248
249 #[inline]
250 pub fn lsn(&self) -> Option<LSN> {
251 match self {
252 Watermark::Mapped { lsn, .. } => Some(*lsn),
253 Watermark::Unmapped { .. } => None,
254 }
255 }
256
257 #[inline]
258 pub fn splat(&self) -> Option<(LSN, PageCount)> {
259 match self {
260 Watermark::Mapped { lsn, pages, .. } => Some((*lsn, *pages)),
261 Watermark::Unmapped { .. } => None,
262 }
263 }
264}
265
266impl Debug for Watermark {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 match self {
269 Watermark::Unmapped { .. } => write!(f, "Watermark::Unmapped"),
270 Watermark::Mapped { pages, lsn, .. } => {
271 write!(f, "Watermark::Mapped(lsn: {lsn}, pages: {pages})")
272 }
273 }
274 }
275}
276
277impl Default for Watermark {
278 fn default() -> Self {
279 Self::UNMAPPED
280 }
281}
282
283#[derive(
284 Debug,
285 KnownLayout,
286 Immutable,
287 TryFromBytes,
288 IntoBytes,
289 Clone,
290 PartialEq,
291 Eq,
292 Serialize,
293 Deserialize,
294 Default,
295)]
296#[repr(C)]
297pub struct Watermarks {
298 pending_sync: Watermark,
300
301 checkpoint: Watermark,
303}
304
305impl Watermarks {
306 pub const DEFAULT: Self = Self {
307 pending_sync: Watermark::UNMAPPED,
308 checkpoint: Watermark::UNMAPPED,
309 };
310
311 pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, Culprit<StorageErr>> {
312 Self::try_read_from_bytes(bytes)
313 .or_ctx(|e| StorageErr::CorruptVolumeState(VolumeStateTag::Watermarks, e.into()))
314 }
315
316 #[inline]
317 pub fn pending_sync(&self) -> Watermark {
318 self.pending_sync
319 }
320
321 #[inline]
322 pub fn with_pending_sync(self, pending_sync: Watermark) -> Self {
323 Self { pending_sync, ..self }
324 }
325
326 #[inline]
327 pub fn checkpoint(&self) -> Watermark {
328 self.checkpoint
329 }
330
331 #[inline]
332 pub fn with_checkpoint(self, checkpoint: Watermark) -> Self {
333 Self { checkpoint, ..self }
334 }
335}
336
337impl From<Watermarks> for Slice {
338 fn from(watermarks: Watermarks) -> Slice {
339 watermarks.as_bytes().into()
340 }
341}
342
343#[derive(Debug, Clone, Serialize)]
344pub struct VolumeState {
345 vid: VolumeId,
346 config: Option<VolumeConfig>,
347 status: Option<VolumeStatus>,
348 snapshot: Option<Snapshot>,
349 watermarks: Option<Watermarks>,
350}
351
352impl VolumeState {
353 pub(crate) fn new(vid: VolumeId) -> Self {
354 Self {
355 vid,
356 config: None,
357 status: None,
358 snapshot: None,
359 watermarks: None,
360 }
361 }
362
363 #[inline]
364 pub fn vid(&self) -> &VolumeId {
365 &self.vid
366 }
367
368 #[inline]
369 pub fn config(&self) -> &VolumeConfig {
370 precept::expect_always_or_unreachable!(
371 self.config.is_some(),
372 "volume config should always be present",
373 { "state": self }
374 );
375 debug_assert!(
376 self.config.is_some(),
377 "volume config should always be present; got {self:?}"
378 );
379 self.config.as_ref().unwrap_or(&VolumeConfig::DEFAULT)
380 }
381
382 #[inline]
383 pub fn status(&self) -> VolumeStatus {
384 self.status.unwrap_or(VolumeStatus::Ok)
385 }
386
387 #[inline]
388 pub fn snapshot(&self) -> Option<&Snapshot> {
389 self.snapshot.as_ref()
390 }
391
392 #[inline]
393 pub fn watermarks(&self) -> &Watermarks {
394 self.watermarks.as_ref().unwrap_or(&Watermarks::DEFAULT)
395 }
396
397 pub fn is_syncing(&self) -> bool {
398 if let Some(pending_sync) = self.watermarks().pending_sync().lsn() {
399 let last_sync = self.snapshot().and_then(|s| s.remote_local());
400 debug_assert!(
401 last_sync <= Some(pending_sync),
402 "invariant violation: last_sync should never be larger than pending_sync"
403 );
404 last_sync < Some(pending_sync)
405 } else {
406 false
407 }
408 }
409
410 pub fn has_pending_commits(&self) -> bool {
411 let last_sync = self.snapshot().and_then(|s| s.remote_local());
412 let local = self.snapshot().map(|s| s.local());
413 debug_assert!(
414 last_sync <= local,
415 "invariant violation: last_sync should never be larger than local"
416 );
417 last_sync < local
418 }
419
420 pub(crate) fn accumulate(
421 &mut self,
422 tag: VolumeStateTag,
423 value: Slice,
424 ) -> Result<(), Culprit<StorageErr>> {
425 match tag {
426 VolumeStateTag::Config => {
427 self.config = Some(VolumeConfig::from_bytes(&value)?);
428 }
429 VolumeStateTag::Status => {
430 self.status = Some(VolumeStatus::from_bytes(&value)?);
431 }
432 VolumeStateTag::Snapshot => {
433 self.snapshot = Some(Snapshot::try_from_bytes(&value)?);
434 }
435 VolumeStateTag::Watermarks => {
436 self.watermarks = Some(Watermarks::from_bytes(&value)?);
437 }
438 }
439 Ok(())
440 }
441}
442
443pub struct VolumeQueryIter<I> {
444 current: Option<VolumeState>,
445 inner: I,
446}
447
448impl<I> VolumeQueryIter<I> {
449 pub fn new(inner: I) -> Self {
450 Self { current: None, inner }
451 }
452}
453
454impl<I> VolumeQueryIter<I>
455where
456 I: Iterator<Item = lsm_tree::Result<KvPair>>,
457{
458 fn next_inner(&mut self) -> Result<Option<VolumeState>, Culprit<StorageErr>> {
459 while let Some((key, value)) = self.inner.try_next().or_into_ctx()? {
461 let key = VolumeStateKey::ref_from_bytes(&key)?;
462
463 let current = self
464 .current
465 .get_or_insert_with(|| VolumeState::new(key.vid.clone()));
466
467 if current.vid != key.vid {
468 assert!(
469 key.vid > current.vid,
470 "iterator must return volume ids in ascending order"
471 );
472
473 let mut next_state = VolumeState::new(key.vid.clone());
476 next_state.accumulate(key.tag, value)?;
477 let state = self.current.replace(next_state);
478 return Ok(state);
479 } else {
480 current.accumulate(key.tag, value)?;
483 }
484 }
485
486 Ok(self.current.take())
489 }
490}
491
492impl<I> Iterator for VolumeQueryIter<I>
493where
494 I: Iterator<Item = lsm_tree::Result<KvPair>>,
495{
496 type Item = Result<VolumeState, Culprit<StorageErr>>;
497
498 fn next(&mut self) -> Option<Self::Item> {
499 self.next_inner().transpose()
500 }
501}
502
503impl<I> FusedIterator for VolumeQueryIter<I> where I: Iterator<Item = lsm_tree::Result<KvPair>> {}