libsql_wal/segment/
mod.rs1#![allow(dead_code)]
10use std::future::Future;
11use std::hash::Hasher as _;
12use std::io;
13use std::mem::offset_of;
14use std::mem::size_of;
15use std::num::NonZeroU64;
16use std::sync::Arc;
17
18use chrono::DateTime;
19use chrono::Utc;
20use zerocopy::byteorder::little_endian::{U128, U16, U32, U64};
21use zerocopy::AsBytes;
22
23use crate::error::{Error, Result};
24use crate::io::buf::IoBufMut;
25use crate::io::FileExt;
26use crate::io::Io;
27use crate::LIBSQL_MAGIC;
28use crate::LIBSQL_PAGE_SIZE;
29
30pub(crate) mod compacted;
31pub mod current;
32pub mod list;
33pub mod sealed;
34
35bitflags::bitflags! {
36 pub struct SegmentFlags: u32 {
37 const FRAME_UNORDERED = 1 << 0;
41 const SEALED = 1 << 1;
43 }
44}
45
46#[repr(C)]
47#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes, Clone, Copy)]
48pub struct SegmentHeader {
49 pub magic: U64,
51 pub version: U16,
53 pub start_frame_no: U64,
54 pub last_commited_frame_no: U64,
55 pub frame_count: U64,
57 pub size_after: U32,
59 pub index_offset: U64,
63 pub index_size: U64,
64 pub flags: U32,
65 pub salt: U32,
67 pub page_size: U16,
70 pub log_id: U128,
71 pub sealed_at_timestamp: U64,
73
74 pub header_cheksum: U32,
76}
77
78impl SegmentHeader {
79 fn checksum(&self) -> u32 {
80 let field_bytes: &[u8] = &self.as_bytes()[..offset_of!(SegmentHeader, header_cheksum)];
81 let checksum = crc32fast::hash(field_bytes);
82 checksum
83 }
84
85 fn check(&self) -> Result<()> {
86 if self.page_size.get() != LIBSQL_PAGE_SIZE {
87 return Err(Error::InvalidPageSize);
88 }
89
90 if self.magic.get() != LIBSQL_MAGIC {
91 return Err(Error::InvalidHeaderChecksum);
92 }
93
94 if self.version.get() != 1 {
95 return Err(Error::InvalidHeaderVersion);
96 }
97
98 let computed = self.checksum();
99 if computed == self.header_cheksum.get() {
100 return Ok(());
101 } else {
102 return Err(Error::InvalidHeaderChecksum);
103 }
104 }
105
106 pub fn flags(&self) -> SegmentFlags {
107 SegmentFlags::from_bits(self.flags.get()).unwrap()
108 }
109
110 fn set_flags(&mut self, flags: SegmentFlags) {
111 self.flags = flags.bits().into();
112 }
113
114 fn recompute_checksum(&mut self) {
115 let checksum = self.checksum();
116 self.header_cheksum = checksum.into();
117 }
118
119 pub fn last_commited_frame_no(&self) -> u64 {
120 self.last_commited_frame_no.get()
121 }
122
123 pub fn size_after(&self) -> u32 {
125 self.size_after.get()
126 }
127
128 fn is_empty(&self) -> bool {
129 self.frame_count() == 0
130 }
131
132 pub fn frame_count(&self) -> usize {
133 self.frame_count.get() as usize
134 }
135
136 pub fn last_committed(&self) -> u64 {
137 if self.is_empty() {
141 self.start_frame_no.get() - 1
142 } else {
143 self.last_commited_frame_no.get()
144 }
145 }
146
147 pub(crate) fn next_frame_no(&self) -> NonZeroU64 {
148 if self.is_empty() {
149 assert!(self.start_frame_no.get() > 0);
150 NonZeroU64::new(self.start_frame_no.get()).unwrap()
151 } else {
152 NonZeroU64::new(self.last_commited_frame_no.get() + 1).unwrap()
153 }
154 }
155}
156
157pub trait Segment: Send + Sync + 'static {
158 fn compact(
159 &self,
160 out_file: &impl FileExt,
161 id: uuid::Uuid,
162 ) -> impl Future<Output = Result<Vec<u8>>> + Send;
163 fn start_frame_no(&self) -> u64;
164 fn last_committed(&self) -> u64;
165 fn index(&self) -> &fst::Map<Arc<[u8]>>;
166 fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool>;
167 fn is_checkpointable(&self) -> bool;
170 fn size_after(&self) -> u32;
172 async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
173 where
174 B: IoBufMut + Send + 'static;
175 fn timestamp(&self) -> DateTime<Utc>;
176
177 fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
178}
179
180impl<T: Segment> Segment for Arc<T> {
181 fn compact(
182 &self,
183 out_file: &impl FileExt,
184 id: uuid::Uuid,
185 ) -> impl Future<Output = Result<Vec<u8>>> + Send {
186 self.as_ref().compact(out_file, id)
187 }
188
189 fn start_frame_no(&self) -> u64 {
190 self.as_ref().start_frame_no()
191 }
192
193 fn last_committed(&self) -> u64 {
194 self.as_ref().last_committed()
195 }
196
197 fn index(&self) -> &fst::Map<Arc<[u8]>> {
198 self.as_ref().index()
199 }
200
201 fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
202 self.as_ref().read_page(page_no, max_frame_no, buf)
203 }
204
205 fn is_checkpointable(&self) -> bool {
206 self.as_ref().is_checkpointable()
207 }
208
209 fn size_after(&self) -> u32 {
210 self.as_ref().size_after()
211 }
212
213 async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
214 where
215 B: IoBufMut + Send + 'static,
216 {
217 self.as_ref().read_frame_offset_async(offset, buf).await
218 }
219
220 fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
221 self.as_ref().destroy(io)
222 }
223
224 fn timestamp(&self) -> DateTime<Utc> {
225 self.as_ref().timestamp()
226 }
227}
228
229#[repr(C)]
230#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
231pub struct FrameHeader {
232 pub page_no: U32,
233 pub size_after: U32,
234 pub frame_no: U64,
235}
236
237impl FrameHeader {
238 pub fn page_no(&self) -> u32 {
239 self.page_no.get()
240 }
241
242 pub fn size_after(&self) -> u32 {
243 self.size_after.get()
244 }
245
246 pub fn frame_no(&self) -> u64 {
247 self.frame_no.get()
248 }
249
250 pub fn set_frame_no(&mut self, frame_no: u64) {
251 self.frame_no = frame_no.into();
252 }
253
254 pub fn set_page_no(&mut self, page_no: u32) {
255 self.page_no = page_no.into();
256 }
257
258 pub fn set_size_after(&mut self, size_after: u32) {
259 self.size_after = size_after.into();
260 }
261
262 pub fn is_commit(&self) -> bool {
263 self.size_after() != 0
264 }
265}
266
267#[repr(C)]
271#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
272pub struct CheckedFrame {
273 checksum: U32,
274 frame: Frame,
276}
277
278impl CheckedFrame {
279 pub(crate) const fn offset_of_frame() -> usize {
280 offset_of!(Self, frame)
281 }
282}
283
284#[repr(C)]
285#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
286pub struct Frame {
287 header: FrameHeader,
288 data: [u8; LIBSQL_PAGE_SIZE as usize],
289}
290
291impl Frame {
292 pub(crate) fn checksum(&self, previous_checksum: u32) -> u32 {
293 let mut digest = crc32fast::Hasher::new_with_initial(previous_checksum);
294 digest.write(self.as_bytes());
295 digest.finalize()
296 }
297
298 pub fn data(&self) -> &[u8] {
299 &self.data
300 }
301
302 pub fn header(&self) -> &FrameHeader {
303 &self.header
304 }
305
306 pub fn header_mut(&mut self) -> &mut FrameHeader {
307 &mut self.header
308 }
309
310 pub(crate) fn size_after(&self) -> Option<u32> {
311 let size_after = self.header().size_after.get();
312 (size_after != 0).then_some(size_after)
313 }
314
315 pub fn data_mut(&mut self) -> &mut [u8] {
316 &mut self.data
317 }
318}
319
320#[inline]
322fn checked_frame_offset(offset: u32) -> u64 {
323 (size_of::<SegmentHeader>() + (offset as usize) * size_of::<CheckedFrame>()) as u64
324}
325#[inline]
327fn frame_offset(offset: u32) -> u64 {
328 checked_frame_offset(offset) + CheckedFrame::offset_of_frame() as u64
329}
330
331#[inline]
333fn page_offset(offset: u32) -> u64 {
334 frame_offset(offset) + size_of::<FrameHeader>() as u64
335}
336
337#[cfg(test)]
338mod test {
339 use super::*;
340
341 #[test]
342 fn offsets() {
343 assert_eq!(checked_frame_offset(0) as usize, size_of::<SegmentHeader>());
344 assert_eq!(
345 frame_offset(0) as usize,
346 size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame()
347 );
348 assert_eq!(
349 page_offset(0) as usize,
350 size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame() + size_of::<FrameHeader>()
351 );
352 }
353}