graft_core/
commit.rs

1use std::{
2    ops::{Deref, DerefMut, Range, RangeInclusive},
3    time::SystemTime,
4};
5
6use bilrost::Message;
7use smallvec::SmallVec;
8use splinter_rs::Splinter;
9
10use crate::{
11    PageCount, PageIdx, SegmentId, VolumeId, commit_hash::CommitHash, lsn::LSN, pageset::PageSet,
12    volume_ref::VolumeRef,
13};
14
15/// Commits are stored at `{prefix}/{vid}/log/{lsn}`.
16/// A commit may not include a `SegmentRef` if only the Volume's page count has
17/// changed. This happens when the Volume is extended or truncated without
18/// additional writes.
19/// Commits are immutable.
20#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
21pub struct Commit {
22    /// The Volume's ID.
23    #[bilrost(1)]
24    pub vid: VolumeId,
25
26    /// The LSN of the Commit.
27    #[bilrost(2)]
28    pub lsn: LSN,
29
30    /// The Volume's `PageCount` as of this Commit.
31    #[bilrost(3)]
32    pub page_count: PageCount,
33
34    /// An optional `CommitHash` for this Commit.
35    /// Always present on Remote Volume commits.
36    /// May be omitted on Local commits.
37    #[bilrost(4)]
38    pub commit_hash: Option<CommitHash>,
39
40    /// If this Commit contains any pages, `segment_idx` records details on the
41    /// relevant Segment.
42    #[bilrost(5)]
43    pub segment_idx: Option<SegmentIdx>,
44
45    /// If this commit is a checkpoint, this timestamp is set and records the time
46    /// the commit was made a checkpoint
47    #[bilrost(6)]
48    pub checkpointed_at: Option<SystemTime>,
49}
50
51impl Commit {
52    /// Creates a new Commit for the given snapshot info
53    pub fn new(vid: VolumeId, lsn: LSN, page_count: PageCount) -> Self {
54        Self {
55            vid,
56            lsn,
57            page_count,
58            commit_hash: None,
59            segment_idx: None,
60            checkpointed_at: None,
61        }
62    }
63
64    pub fn with_vid(self, vid: VolumeId) -> Self {
65        Self { vid, ..self }
66    }
67
68    pub fn with_lsn(self, lsn: LSN) -> Self {
69        Self { lsn, ..self }
70    }
71
72    pub fn with_commit_hash(self, commit_hash: Option<CommitHash>) -> Self {
73        Self { commit_hash, ..self }
74    }
75
76    /// Sets the segment index for this commit.
77    pub fn with_segment_idx(self, segment_idx: Option<SegmentIdx>) -> Self {
78        Self { segment_idx, ..self }
79    }
80
81    /// Sets the checkpointed timestamp for this commit.
82    pub fn with_checkpointed_at(self, checkpointed_at: Option<SystemTime>) -> Self {
83        Self { checkpointed_at, ..self }
84    }
85
86    pub fn vid(&self) -> &VolumeId {
87        &self.vid
88    }
89
90    pub fn lsn(&self) -> LSN {
91        self.lsn
92    }
93
94    pub fn vref(&self) -> VolumeRef {
95        VolumeRef::new(self.vid.clone(), self.lsn)
96    }
97
98    pub fn page_count(&self) -> PageCount {
99        self.page_count
100    }
101
102    pub fn commit_hash(&self) -> Option<&CommitHash> {
103        self.commit_hash.as_ref()
104    }
105
106    pub fn segment_idx(&self) -> Option<&SegmentIdx> {
107        self.segment_idx.as_ref()
108    }
109
110    pub fn checkpointed_at(&self) -> Option<&SystemTime> {
111        self.checkpointed_at.as_ref()
112    }
113
114    pub fn is_checkpoint(&self) -> bool {
115        self.checkpointed_at.is_some()
116    }
117}
118
119#[derive(Debug, Clone, Message, PartialEq, Eq)]
120pub struct SegmentIdx {
121    /// The Segment ID
122    #[bilrost(1)]
123    pub sid: SegmentId,
124
125    /// The set of `PageIdxs` contained by this Segment.
126    #[bilrost(2)]
127    pub pageset: PageSet,
128
129    /// An index of `SegmentFrameIdxs` contained by this Segment.
130    /// Empty on local Segments which have not been encoded and uploaded to object storage.
131    #[bilrost(3)]
132    pub frames: SmallVec<[SegmentFrameIdx; 1]>,
133}
134
135impl SegmentIdx {
136    pub fn new(sid: SegmentId, pageset: PageSet) -> Self {
137        SegmentIdx { sid, pageset, frames: SmallVec::new() }
138    }
139
140    pub fn with_frames(self, frames: SmallVec<[SegmentFrameIdx; 1]>) -> Self {
141        Self { frames, ..self }
142    }
143
144    pub fn sid(&self) -> &SegmentId {
145        &self.sid
146    }
147
148    pub fn pageset(&self) -> &PageSet {
149        &self.pageset
150    }
151
152    pub fn iter_frames(
153        &self,
154        mut filter: impl FnMut(&RangeInclusive<PageIdx>) -> bool,
155    ) -> impl Iterator<Item = SegmentRangeRef> {
156        let first_page = self.pageset.iter().next().unwrap_or(PageIdx::FIRST);
157        self.frames
158            .iter()
159            .scan((0, first_page), |(bytes_acc, pages_acc), frame| {
160                let bytes = *bytes_acc..(*bytes_acc + frame.frame_size);
161                let pages = *pages_acc..=frame.last_pageidx;
162
163                *bytes_acc += frame.frame_size;
164                *pages_acc = frame.last_pageidx.saturating_next();
165
166                Some((bytes, pages))
167            })
168            .filter(move |(_, pages)| filter(pages))
169            .map(|(bytes, pages)| {
170                let pages = pages.start().to_u32()..=pages.end().to_u32();
171                let graft = (Splinter::from(pages) & self.pageset.splinter()).into();
172                SegmentRangeRef {
173                    sid: self.sid.clone(),
174                    bytes,
175                    pageset: graft,
176                }
177            })
178    }
179
180    pub fn frame_for_pageidx(&self, pageidx: PageIdx) -> Option<SegmentRangeRef> {
181        if !self.pageset.contains(pageidx) {
182            return None;
183        }
184        self.iter_frames(|pages| pages.contains(&pageidx)).next()
185    }
186}
187
188impl Deref for SegmentIdx {
189    type Target = PageSet;
190
191    fn deref(&self) -> &Self::Target {
192        &self.pageset
193    }
194}
195
196impl DerefMut for SegmentIdx {
197    fn deref_mut(&mut self) -> &mut Self::Target {
198        &mut self.pageset
199    }
200}
201
202#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
203pub struct SegmentFrameIdx {
204    /// The length of the compressed frame in bytes.
205    #[bilrost(1)]
206    frame_size: usize,
207
208    /// The last `PageIdx` contained by this `SegmentFrame`.
209    #[bilrost(2)]
210    last_pageidx: PageIdx,
211}
212
213impl SegmentFrameIdx {
214    pub fn new(frame_size: usize, last_pageidx: PageIdx) -> Self {
215        Self { frame_size, last_pageidx }
216    }
217
218    pub fn frame_size(&self) -> usize {
219        self.frame_size
220    }
221
222    pub fn last_pageidx(&self) -> PageIdx {
223        self.last_pageidx
224    }
225}
226
227/// A `SegmentRangeRef` contains the byte range and corresponding pages for a
228/// subset of a segment. The subset must correspond to one or more entire
229/// `SegmentFrames`.
230#[derive(Clone, PartialEq, Eq)]
231pub struct SegmentRangeRef {
232    pub sid: SegmentId,
233    pub bytes: Range<usize>,
234    pub pageset: PageSet,
235}
236
237impl std::fmt::Debug for SegmentRangeRef {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("SegmentRangeRef")
240            .field("sid", &self.sid)
241            .field("bytes", &self.bytes)
242            .field("pages", &self.pageset.cardinality())
243            .finish()
244    }
245}
246
247impl SegmentRangeRef {
248    /// The size of the frame in bytes
249    pub fn size(&self) -> usize {
250        self.bytes.end - self.bytes.start
251    }
252
253    /// Attempt to coalesce two frame refs together.
254    /// Returns the two frame refs unmodified if coalescing is impossible.
255    #[allow(clippy::result_large_err)]
256    pub fn coalesce(self, other: Self) -> Result<Self, (Self, Self)> {
257        if self.sid != other.sid {
258            return Err((self, other));
259        }
260
261        let (left, right) = if self.bytes.end == other.bytes.start {
262            (self, other)
263        } else if other.bytes.end == self.bytes.start {
264            (other, self)
265        } else {
266            return Err((self, other));
267        };
268
269        let left_splinter: Splinter = left.pageset.into();
270        let right_splinter: Splinter = right.pageset.into();
271        Ok(Self {
272            sid: left.sid,
273            bytes: left.bytes.start..right.bytes.end,
274            pageset: (left_splinter | right_splinter).into(),
275        })
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use crate::pageidx;
282
283    use super::*;
284
285    #[test]
286    fn test_frame_for_pageidx() {
287        let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
288        let mut frames = SmallVec::new();
289        frames.push(SegmentFrameIdx {
290            frame_size: 100,
291            last_pageidx: pageidx!(10),
292        });
293        frames.push(SegmentFrameIdx {
294            frame_size: 200,
295            last_pageidx: pageidx!(20),
296        });
297        frames.push(SegmentFrameIdx {
298            frame_size: 150,
299            last_pageidx: pageidx!(25),
300        });
301
302        let sid = SegmentId::random();
303        let segment_idx = SegmentIdx { sid: sid.clone(), pageset, frames };
304
305        let tests = [
306            (pageidx!(4), None),
307            (
308                pageidx!(5),
309                Some(SegmentRangeRef {
310                    sid: sid.clone(),
311                    bytes: 0..100,
312                    pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
313                }),
314            ),
315            (
316                pageidx!(10),
317                Some(SegmentRangeRef {
318                    sid: sid.clone(),
319                    bytes: 0..100,
320                    pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
321                }),
322            ),
323            (
324                pageidx!(11),
325                Some(SegmentRangeRef {
326                    sid: sid.clone(),
327                    bytes: 100..300,
328                    pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
329                }),
330            ),
331            (
332                pageidx!(20),
333                Some(SegmentRangeRef {
334                    sid: sid.clone(),
335                    bytes: 100..300,
336                    pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
337                }),
338            ),
339            (
340                pageidx!(25),
341                Some(SegmentRangeRef {
342                    sid: sid.clone(),
343                    bytes: 300..450,
344                    pageset: PageSet::from_range(pageidx!(21)..=pageidx!(25)),
345                }),
346            ),
347            (pageidx!(26), None),
348        ];
349
350        for (pageidx, expected) in tests {
351            assert_eq!(
352                segment_idx.frame_for_pageidx(pageidx),
353                expected,
354                "wrong frame for pageidx {pageidx}"
355            );
356        }
357    }
358
359    #[test]
360    fn test_frame_for_pageidx_empty_frames() {
361        let segment_idx = SegmentIdx {
362            sid: SegmentId::random(),
363            pageset: PageSet::EMPTY,
364            frames: SmallVec::new(),
365        };
366
367        let result = segment_idx.frame_for_pageidx(pageidx!(1));
368        assert!(result.is_none());
369    }
370
371    #[test]
372    fn test_segment_range_ref_coalesce_adjacent() {
373        let sid = SegmentId::random();
374        // Test coalescing two adjacent ranges (first before second)
375        let frame1 = SegmentRangeRef {
376            sid: sid.clone(),
377            bytes: 0..100,
378            pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
379        };
380        let frame2 = SegmentRangeRef {
381            sid: sid.clone(),
382            bytes: 100..200,
383            pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
384        };
385
386        let result = frame1.clone().coalesce(frame2.clone()).unwrap();
387        assert_eq!(result.bytes, 0..200);
388        assert_eq!(
389            result.pageset,
390            PageSet::from_range(pageidx!(5)..=pageidx!(20))
391        );
392
393        // Test coalescing in reverse order (second before first)
394        let result = frame2.coalesce(frame1).unwrap();
395        assert_eq!(result.bytes, 0..200);
396        assert_eq!(
397            result.pageset,
398            PageSet::from_range(pageidx!(5)..=pageidx!(20))
399        );
400    }
401
402    #[test]
403    fn test_segment_range_ref_coalesce_non_adjacent() {
404        let sid = SegmentId::random();
405        // Test that non-adjacent ranges cannot be coalesced
406        let frame1 = SegmentRangeRef {
407            sid: sid.clone(),
408            bytes: 0..100,
409            pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
410        };
411        let frame2 = SegmentRangeRef {
412            sid: sid.clone(),
413            bytes: 150..250,
414            pageset: PageSet::from_range(pageidx!(20)..=pageidx!(30)),
415        };
416
417        let result = frame1.clone().coalesce(frame2.clone());
418        assert!(result.is_err());
419        let (f1, f2) = result.unwrap_err();
420        assert_eq!(f1, frame1);
421        assert_eq!(f2, frame2);
422    }
423
424    #[test]
425    fn test_segment_range_ref_coalesce_diff_segment() {
426        // Test that adjacent ranges from different segments don't combine
427        let frame1 = SegmentRangeRef {
428            sid: SegmentId::random(),
429            bytes: 0..100,
430            pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
431        };
432        let frame2 = SegmentRangeRef {
433            sid: SegmentId::random(),
434            bytes: 100..200,
435            pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
436        };
437
438        let result = frame1.clone().coalesce(frame2.clone());
439        assert!(result.is_err());
440        let (f1, f2) = result.unwrap_err();
441        assert_eq!(f1, frame1);
442        assert_eq!(f2, frame2);
443    }
444
445    #[test]
446    fn test_iter_frames_no_filter() {
447        let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
448        let mut frames = SmallVec::new();
449        frames.push(SegmentFrameIdx {
450            frame_size: 100,
451            last_pageidx: pageidx!(10),
452        });
453        frames.push(SegmentFrameIdx {
454            frame_size: 200,
455            last_pageidx: pageidx!(20),
456        });
457        frames.push(SegmentFrameIdx {
458            frame_size: 150,
459            last_pageidx: pageidx!(25),
460        });
461
462        let segment_idx = SegmentIdx {
463            sid: SegmentId::random(),
464            pageset,
465            frames,
466        };
467
468        // Collect all frames
469        let all_frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
470        assert_eq!(all_frames.len(), 3);
471
472        assert_eq!(all_frames[0].bytes, 0..100);
473        assert_eq!(
474            all_frames[0].pageset,
475            PageSet::from_range(pageidx!(5)..=pageidx!(10))
476        );
477
478        assert_eq!(all_frames[1].bytes, 100..300);
479        assert_eq!(
480            all_frames[1].pageset,
481            PageSet::from_range(pageidx!(11)..=pageidx!(20))
482        );
483
484        assert_eq!(all_frames[2].bytes, 300..450);
485        assert_eq!(
486            all_frames[2].pageset,
487            PageSet::from_range(pageidx!(21)..=pageidx!(25))
488        );
489    }
490
491    #[test]
492    fn test_iter_frames_with_filter() {
493        let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
494        let mut frames = SmallVec::new();
495        frames.push(SegmentFrameIdx {
496            frame_size: 100,
497            last_pageidx: pageidx!(10),
498        });
499        frames.push(SegmentFrameIdx {
500            frame_size: 200,
501            last_pageidx: pageidx!(20),
502        });
503        frames.push(SegmentFrameIdx {
504            frame_size: 150,
505            last_pageidx: pageidx!(25),
506        });
507
508        let segment_idx = SegmentIdx {
509            sid: SegmentId::random(),
510            pageset,
511            frames,
512        };
513
514        // Filter for frames containing page 15
515        let filtered_frames: Vec<_> = segment_idx
516            .iter_frames(|pages| pages.contains(&pageidx!(15)))
517            .collect();
518        assert_eq!(filtered_frames.len(), 1);
519        assert_eq!(filtered_frames[0].bytes, 100..300);
520        assert_eq!(
521            filtered_frames[0].pageset,
522            PageSet::from_range(pageidx!(11)..=pageidx!(20))
523        );
524    }
525
526    #[test]
527    fn test_iter_frames_empty() {
528        let segment_idx = SegmentIdx {
529            sid: SegmentId::random(),
530            pageset: PageSet::EMPTY,
531            frames: SmallVec::new(),
532        };
533
534        let frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
535        assert_eq!(frames.len(), 0);
536    }
537}