1use std::{
2 ops::{Deref, DerefMut, Range, RangeInclusive},
3 time::SystemTime,
4};
5
6use bilrost::Message;
7use smallvec::SmallVec;
8use splinter_rs::Splinter;
9
10use crate::{
11 PageCount, PageIdx, SegmentId, VolumeId, commit_hash::CommitHash, lsn::LSN, pageset::PageSet,
12 volume_ref::VolumeRef,
13};
14
15#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
21pub struct Commit {
22 #[bilrost(1)]
24 pub vid: VolumeId,
25
26 #[bilrost(2)]
28 pub lsn: LSN,
29
30 #[bilrost(3)]
32 pub page_count: PageCount,
33
34 #[bilrost(4)]
38 pub commit_hash: Option<CommitHash>,
39
40 #[bilrost(5)]
43 pub segment_idx: Option<SegmentIdx>,
44
45 #[bilrost(6)]
48 pub checkpointed_at: Option<SystemTime>,
49}
50
51impl Commit {
52 pub fn new(vid: VolumeId, lsn: LSN, page_count: PageCount) -> Self {
54 Self {
55 vid,
56 lsn,
57 page_count,
58 commit_hash: None,
59 segment_idx: None,
60 checkpointed_at: None,
61 }
62 }
63
64 pub fn with_vid(self, vid: VolumeId) -> Self {
65 Self { vid, ..self }
66 }
67
68 pub fn with_lsn(self, lsn: LSN) -> Self {
69 Self { lsn, ..self }
70 }
71
72 pub fn with_commit_hash(self, commit_hash: Option<CommitHash>) -> Self {
73 Self { commit_hash, ..self }
74 }
75
76 pub fn with_segment_idx(self, segment_idx: Option<SegmentIdx>) -> Self {
78 Self { segment_idx, ..self }
79 }
80
81 pub fn with_checkpointed_at(self, checkpointed_at: Option<SystemTime>) -> Self {
83 Self { checkpointed_at, ..self }
84 }
85
86 pub fn vid(&self) -> &VolumeId {
87 &self.vid
88 }
89
90 pub fn lsn(&self) -> LSN {
91 self.lsn
92 }
93
94 pub fn vref(&self) -> VolumeRef {
95 VolumeRef::new(self.vid.clone(), self.lsn)
96 }
97
98 pub fn page_count(&self) -> PageCount {
99 self.page_count
100 }
101
102 pub fn commit_hash(&self) -> Option<&CommitHash> {
103 self.commit_hash.as_ref()
104 }
105
106 pub fn segment_idx(&self) -> Option<&SegmentIdx> {
107 self.segment_idx.as_ref()
108 }
109
110 pub fn checkpointed_at(&self) -> Option<&SystemTime> {
111 self.checkpointed_at.as_ref()
112 }
113
114 pub fn is_checkpoint(&self) -> bool {
115 self.checkpointed_at.is_some()
116 }
117}
118
119#[derive(Debug, Clone, Message, PartialEq, Eq)]
120pub struct SegmentIdx {
121 #[bilrost(1)]
123 pub sid: SegmentId,
124
125 #[bilrost(2)]
127 pub pageset: PageSet,
128
129 #[bilrost(3)]
132 pub frames: SmallVec<[SegmentFrameIdx; 1]>,
133}
134
135impl SegmentIdx {
136 pub fn new(sid: SegmentId, pageset: PageSet) -> Self {
137 SegmentIdx { sid, pageset, frames: SmallVec::new() }
138 }
139
140 pub fn with_frames(self, frames: SmallVec<[SegmentFrameIdx; 1]>) -> Self {
141 Self { frames, ..self }
142 }
143
144 pub fn sid(&self) -> &SegmentId {
145 &self.sid
146 }
147
148 pub fn pageset(&self) -> &PageSet {
149 &self.pageset
150 }
151
152 pub fn iter_frames(
153 &self,
154 mut filter: impl FnMut(&RangeInclusive<PageIdx>) -> bool,
155 ) -> impl Iterator<Item = SegmentRangeRef> {
156 let first_page = self.pageset.iter().next().unwrap_or(PageIdx::FIRST);
157 self.frames
158 .iter()
159 .scan((0, first_page), |(bytes_acc, pages_acc), frame| {
160 let bytes = *bytes_acc..(*bytes_acc + frame.frame_size);
161 let pages = *pages_acc..=frame.last_pageidx;
162
163 *bytes_acc += frame.frame_size;
164 *pages_acc = frame.last_pageidx.saturating_next();
165
166 Some((bytes, pages))
167 })
168 .filter(move |(_, pages)| filter(pages))
169 .map(|(bytes, pages)| {
170 let pages = pages.start().to_u32()..=pages.end().to_u32();
171 let graft = (Splinter::from(pages) & self.pageset.splinter()).into();
172 SegmentRangeRef {
173 sid: self.sid.clone(),
174 bytes,
175 pageset: graft,
176 }
177 })
178 }
179
180 pub fn frame_for_pageidx(&self, pageidx: PageIdx) -> Option<SegmentRangeRef> {
181 if !self.pageset.contains(pageidx) {
182 return None;
183 }
184 self.iter_frames(|pages| pages.contains(&pageidx)).next()
185 }
186}
187
188impl Deref for SegmentIdx {
189 type Target = PageSet;
190
191 fn deref(&self) -> &Self::Target {
192 &self.pageset
193 }
194}
195
196impl DerefMut for SegmentIdx {
197 fn deref_mut(&mut self) -> &mut Self::Target {
198 &mut self.pageset
199 }
200}
201
202#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
203pub struct SegmentFrameIdx {
204 #[bilrost(1)]
206 frame_size: usize,
207
208 #[bilrost(2)]
210 last_pageidx: PageIdx,
211}
212
213impl SegmentFrameIdx {
214 pub fn new(frame_size: usize, last_pageidx: PageIdx) -> Self {
215 Self { frame_size, last_pageidx }
216 }
217
218 pub fn frame_size(&self) -> usize {
219 self.frame_size
220 }
221
222 pub fn last_pageidx(&self) -> PageIdx {
223 self.last_pageidx
224 }
225}
226
227#[derive(Clone, PartialEq, Eq)]
231pub struct SegmentRangeRef {
232 pub sid: SegmentId,
233 pub bytes: Range<usize>,
234 pub pageset: PageSet,
235}
236
237impl std::fmt::Debug for SegmentRangeRef {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("SegmentRangeRef")
240 .field("sid", &self.sid)
241 .field("bytes", &self.bytes)
242 .field("pages", &self.pageset.cardinality())
243 .finish()
244 }
245}
246
247impl SegmentRangeRef {
248 pub fn size(&self) -> usize {
250 self.bytes.end - self.bytes.start
251 }
252
253 #[allow(clippy::result_large_err)]
256 pub fn coalesce(self, other: Self) -> Result<Self, (Self, Self)> {
257 if self.sid != other.sid {
258 return Err((self, other));
259 }
260
261 let (left, right) = if self.bytes.end == other.bytes.start {
262 (self, other)
263 } else if other.bytes.end == self.bytes.start {
264 (other, self)
265 } else {
266 return Err((self, other));
267 };
268
269 let left_splinter: Splinter = left.pageset.into();
270 let right_splinter: Splinter = right.pageset.into();
271 Ok(Self {
272 sid: left.sid,
273 bytes: left.bytes.start..right.bytes.end,
274 pageset: (left_splinter | right_splinter).into(),
275 })
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use crate::pageidx;
282
283 use super::*;
284
285 #[test]
286 fn test_frame_for_pageidx() {
287 let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
288 let mut frames = SmallVec::new();
289 frames.push(SegmentFrameIdx {
290 frame_size: 100,
291 last_pageidx: pageidx!(10),
292 });
293 frames.push(SegmentFrameIdx {
294 frame_size: 200,
295 last_pageidx: pageidx!(20),
296 });
297 frames.push(SegmentFrameIdx {
298 frame_size: 150,
299 last_pageidx: pageidx!(25),
300 });
301
302 let sid = SegmentId::random();
303 let segment_idx = SegmentIdx { sid: sid.clone(), pageset, frames };
304
305 let tests = [
306 (pageidx!(4), None),
307 (
308 pageidx!(5),
309 Some(SegmentRangeRef {
310 sid: sid.clone(),
311 bytes: 0..100,
312 pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
313 }),
314 ),
315 (
316 pageidx!(10),
317 Some(SegmentRangeRef {
318 sid: sid.clone(),
319 bytes: 0..100,
320 pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
321 }),
322 ),
323 (
324 pageidx!(11),
325 Some(SegmentRangeRef {
326 sid: sid.clone(),
327 bytes: 100..300,
328 pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
329 }),
330 ),
331 (
332 pageidx!(20),
333 Some(SegmentRangeRef {
334 sid: sid.clone(),
335 bytes: 100..300,
336 pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
337 }),
338 ),
339 (
340 pageidx!(25),
341 Some(SegmentRangeRef {
342 sid: sid.clone(),
343 bytes: 300..450,
344 pageset: PageSet::from_range(pageidx!(21)..=pageidx!(25)),
345 }),
346 ),
347 (pageidx!(26), None),
348 ];
349
350 for (pageidx, expected) in tests {
351 assert_eq!(
352 segment_idx.frame_for_pageidx(pageidx),
353 expected,
354 "wrong frame for pageidx {pageidx}"
355 );
356 }
357 }
358
359 #[test]
360 fn test_frame_for_pageidx_empty_frames() {
361 let segment_idx = SegmentIdx {
362 sid: SegmentId::random(),
363 pageset: PageSet::EMPTY,
364 frames: SmallVec::new(),
365 };
366
367 let result = segment_idx.frame_for_pageidx(pageidx!(1));
368 assert!(result.is_none());
369 }
370
371 #[test]
372 fn test_segment_range_ref_coalesce_adjacent() {
373 let sid = SegmentId::random();
374 let frame1 = SegmentRangeRef {
376 sid: sid.clone(),
377 bytes: 0..100,
378 pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
379 };
380 let frame2 = SegmentRangeRef {
381 sid: sid.clone(),
382 bytes: 100..200,
383 pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
384 };
385
386 let result = frame1.clone().coalesce(frame2.clone()).unwrap();
387 assert_eq!(result.bytes, 0..200);
388 assert_eq!(
389 result.pageset,
390 PageSet::from_range(pageidx!(5)..=pageidx!(20))
391 );
392
393 let result = frame2.coalesce(frame1).unwrap();
395 assert_eq!(result.bytes, 0..200);
396 assert_eq!(
397 result.pageset,
398 PageSet::from_range(pageidx!(5)..=pageidx!(20))
399 );
400 }
401
402 #[test]
403 fn test_segment_range_ref_coalesce_non_adjacent() {
404 let sid = SegmentId::random();
405 let frame1 = SegmentRangeRef {
407 sid: sid.clone(),
408 bytes: 0..100,
409 pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
410 };
411 let frame2 = SegmentRangeRef {
412 sid: sid.clone(),
413 bytes: 150..250,
414 pageset: PageSet::from_range(pageidx!(20)..=pageidx!(30)),
415 };
416
417 let result = frame1.clone().coalesce(frame2.clone());
418 assert!(result.is_err());
419 let (f1, f2) = result.unwrap_err();
420 assert_eq!(f1, frame1);
421 assert_eq!(f2, frame2);
422 }
423
424 #[test]
425 fn test_segment_range_ref_coalesce_diff_segment() {
426 let frame1 = SegmentRangeRef {
428 sid: SegmentId::random(),
429 bytes: 0..100,
430 pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
431 };
432 let frame2 = SegmentRangeRef {
433 sid: SegmentId::random(),
434 bytes: 100..200,
435 pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
436 };
437
438 let result = frame1.clone().coalesce(frame2.clone());
439 assert!(result.is_err());
440 let (f1, f2) = result.unwrap_err();
441 assert_eq!(f1, frame1);
442 assert_eq!(f2, frame2);
443 }
444
445 #[test]
446 fn test_iter_frames_no_filter() {
447 let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
448 let mut frames = SmallVec::new();
449 frames.push(SegmentFrameIdx {
450 frame_size: 100,
451 last_pageidx: pageidx!(10),
452 });
453 frames.push(SegmentFrameIdx {
454 frame_size: 200,
455 last_pageidx: pageidx!(20),
456 });
457 frames.push(SegmentFrameIdx {
458 frame_size: 150,
459 last_pageidx: pageidx!(25),
460 });
461
462 let segment_idx = SegmentIdx {
463 sid: SegmentId::random(),
464 pageset,
465 frames,
466 };
467
468 let all_frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
470 assert_eq!(all_frames.len(), 3);
471
472 assert_eq!(all_frames[0].bytes, 0..100);
473 assert_eq!(
474 all_frames[0].pageset,
475 PageSet::from_range(pageidx!(5)..=pageidx!(10))
476 );
477
478 assert_eq!(all_frames[1].bytes, 100..300);
479 assert_eq!(
480 all_frames[1].pageset,
481 PageSet::from_range(pageidx!(11)..=pageidx!(20))
482 );
483
484 assert_eq!(all_frames[2].bytes, 300..450);
485 assert_eq!(
486 all_frames[2].pageset,
487 PageSet::from_range(pageidx!(21)..=pageidx!(25))
488 );
489 }
490
491 #[test]
492 fn test_iter_frames_with_filter() {
493 let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
494 let mut frames = SmallVec::new();
495 frames.push(SegmentFrameIdx {
496 frame_size: 100,
497 last_pageidx: pageidx!(10),
498 });
499 frames.push(SegmentFrameIdx {
500 frame_size: 200,
501 last_pageidx: pageidx!(20),
502 });
503 frames.push(SegmentFrameIdx {
504 frame_size: 150,
505 last_pageidx: pageidx!(25),
506 });
507
508 let segment_idx = SegmentIdx {
509 sid: SegmentId::random(),
510 pageset,
511 frames,
512 };
513
514 let filtered_frames: Vec<_> = segment_idx
516 .iter_frames(|pages| pages.contains(&pageidx!(15)))
517 .collect();
518 assert_eq!(filtered_frames.len(), 1);
519 assert_eq!(filtered_frames[0].bytes, 100..300);
520 assert_eq!(
521 filtered_frames[0].pageset,
522 PageSet::from_range(pageidx!(11)..=pageidx!(20))
523 );
524 }
525
526 #[test]
527 fn test_iter_frames_empty() {
528 let segment_idx = SegmentIdx {
529 sid: SegmentId::random(),
530 pageset: PageSet::EMPTY,
531 frames: SmallVec::new(),
532 };
533
534 let frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
535 assert_eq!(frames.len(), 0);
536 }
537}