libsql_wal/segment/
mod.rs

1//! Libsql-wal is organized as a linked list of segments. Frames are appended to the head segments,
2//! and eventually, the head segment is swapped for a new empty one. The previous head segment is
3//! sealed and becomes immutable. The head segment is represented by the `CurrentSegment` type, and
4//! the sealed segments by the `SealedSegment` type.
5//!
6//! When a reader starts a transaction, it record the head segment current frame_no. This is the
7//! maximum frame_no that this reader is allowed to read. The reader also keeps a reference to the
8//! head segment at the moment it was created.
9#![allow(dead_code)]
10use std::future::Future;
11use std::hash::Hasher as _;
12use std::io;
13use std::mem::offset_of;
14use std::mem::size_of;
15use std::num::NonZeroU64;
16use std::sync::Arc;
17
18use chrono::DateTime;
19use chrono::Utc;
20use zerocopy::byteorder::little_endian::{U128, U16, U32, U64};
21use zerocopy::AsBytes;
22
23use crate::error::{Error, Result};
24use crate::io::buf::IoBufMut;
25use crate::io::FileExt;
26use crate::io::Io;
27use crate::LIBSQL_MAGIC;
28use crate::LIBSQL_PAGE_SIZE;
29
30pub(crate) mod compacted;
31pub mod current;
32pub mod list;
33pub mod sealed;
34
35bitflags::bitflags! {
36    pub struct SegmentFlags: u32 {
37        /// Frames in the segment are ordered in ascending frame_no.
38        /// This is true for a segment created by a primary, but a replica may insert frames in any
39        /// order, as long as commit boundaries are preserved.
40        const FRAME_UNORDERED = 1 << 0;
41        /// The segment is sealed. If this flag is set, then
42        const SEALED          = 1 << 1;
43    }
44}
45
46#[repr(C)]
47#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes, Clone, Copy)]
48pub struct SegmentHeader {
49    /// Set to LIBSQL_MAGIC
50    pub magic: U64,
51    /// header version
52    pub version: U16,
53    pub start_frame_no: U64,
54    pub last_commited_frame_no: U64,
55    /// number of frames in the segment
56    pub frame_count: U64,
57    /// size of the database in pages, after applying the segment.
58    pub size_after: U32,
59    /// byte offset of the index. If 0, then the index wasn't written, and must be recovered.
60    /// If non-0, the segment is sealed, and must not be written to anymore
61    /// the index is followed by its checksum
62    pub index_offset: U64,
63    pub index_size: U64,
64    pub flags: U32,
65    /// salt for the segment checksum
66    pub salt: U32,
67    /// right now we only support 4096, but if se decided to support other sizes,
68    /// we could do it without changing the header
69    pub page_size: U16,
70    pub log_id: U128,
71    /// ms, from unix epoch
72    pub sealed_at_timestamp: U64,
73
74    /// checksum of the header fields, excluding the checksum itself. This field must be the last
75    pub header_cheksum: U32,
76}
77
78impl SegmentHeader {
79    fn checksum(&self) -> u32 {
80        let field_bytes: &[u8] = &self.as_bytes()[..offset_of!(SegmentHeader, header_cheksum)];
81        let checksum = crc32fast::hash(field_bytes);
82        checksum
83    }
84
85    fn check(&self) -> Result<()> {
86        if self.page_size.get() != LIBSQL_PAGE_SIZE {
87            return Err(Error::InvalidPageSize);
88        }
89
90        if self.magic.get() != LIBSQL_MAGIC {
91            return Err(Error::InvalidHeaderChecksum);
92        }
93
94        if self.version.get() != 1 {
95            return Err(Error::InvalidHeaderVersion);
96        }
97
98        let computed = self.checksum();
99        if computed == self.header_cheksum.get() {
100            return Ok(());
101        } else {
102            return Err(Error::InvalidHeaderChecksum);
103        }
104    }
105
106    pub fn flags(&self) -> SegmentFlags {
107        SegmentFlags::from_bits(self.flags.get()).unwrap()
108    }
109
110    fn set_flags(&mut self, flags: SegmentFlags) {
111        self.flags = flags.bits().into();
112    }
113
114    fn recompute_checksum(&mut self) {
115        let checksum = self.checksum();
116        self.header_cheksum = checksum.into();
117    }
118
119    pub fn last_commited_frame_no(&self) -> u64 {
120        self.last_commited_frame_no.get()
121    }
122
123    /// size fo the db after applying this segment
124    pub fn size_after(&self) -> u32 {
125        self.size_after.get()
126    }
127
128    fn is_empty(&self) -> bool {
129        self.frame_count() == 0
130    }
131
132    pub fn frame_count(&self) -> usize {
133        self.frame_count.get() as usize
134    }
135
136    pub fn last_committed(&self) -> u64 {
137        // either the current segment is empty, and the start frame_no is the last frame_no commited on
138        // the previous segment (start_frame_no - 1), or it's the last committed frame_no from this
139        // segment.
140        if self.is_empty() {
141            self.start_frame_no.get() - 1
142        } else {
143            self.last_commited_frame_no.get()
144        }
145    }
146
147    pub(crate) fn next_frame_no(&self) -> NonZeroU64 {
148        if self.is_empty() {
149            assert!(self.start_frame_no.get() > 0);
150            NonZeroU64::new(self.start_frame_no.get()).unwrap()
151        } else {
152            NonZeroU64::new(self.last_commited_frame_no.get() + 1).unwrap()
153        }
154    }
155}
156
157pub trait Segment: Send + Sync + 'static {
158    fn compact(
159        &self,
160        out_file: &impl FileExt,
161        id: uuid::Uuid,
162    ) -> impl Future<Output = Result<Vec<u8>>> + Send;
163    fn start_frame_no(&self) -> u64;
164    fn last_committed(&self) -> u64;
165    fn index(&self) -> &fst::Map<Arc<[u8]>>;
166    fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool>;
167    /// returns the number of readers currently holding a reference to this log.
168    /// The read count must monotonically decrease.
169    fn is_checkpointable(&self) -> bool;
170    /// The size of the database after applying this segment.
171    fn size_after(&self) -> u32;
172    async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
173    where
174        B: IoBufMut + Send + 'static;
175    fn timestamp(&self) -> DateTime<Utc>;
176
177    fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
178}
179
180impl<T: Segment> Segment for Arc<T> {
181    fn compact(
182        &self,
183        out_file: &impl FileExt,
184        id: uuid::Uuid,
185    ) -> impl Future<Output = Result<Vec<u8>>> + Send {
186        self.as_ref().compact(out_file, id)
187    }
188
189    fn start_frame_no(&self) -> u64 {
190        self.as_ref().start_frame_no()
191    }
192
193    fn last_committed(&self) -> u64 {
194        self.as_ref().last_committed()
195    }
196
197    fn index(&self) -> &fst::Map<Arc<[u8]>> {
198        self.as_ref().index()
199    }
200
201    fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
202        self.as_ref().read_page(page_no, max_frame_no, buf)
203    }
204
205    fn is_checkpointable(&self) -> bool {
206        self.as_ref().is_checkpointable()
207    }
208
209    fn size_after(&self) -> u32 {
210        self.as_ref().size_after()
211    }
212
213    async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
214    where
215        B: IoBufMut + Send + 'static,
216    {
217        self.as_ref().read_frame_offset_async(offset, buf).await
218    }
219
220    fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
221        self.as_ref().destroy(io)
222    }
223
224    fn timestamp(&self) -> DateTime<Utc> {
225        self.as_ref().timestamp()
226    }
227}
228
229#[repr(C)]
230#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
231pub struct FrameHeader {
232    pub page_no: U32,
233    pub size_after: U32,
234    pub frame_no: U64,
235}
236
237impl FrameHeader {
238    pub fn page_no(&self) -> u32 {
239        self.page_no.get()
240    }
241
242    pub fn size_after(&self) -> u32 {
243        self.size_after.get()
244    }
245
246    pub fn frame_no(&self) -> u64 {
247        self.frame_no.get()
248    }
249
250    pub fn set_frame_no(&mut self, frame_no: u64) {
251        self.frame_no = frame_no.into();
252    }
253
254    pub fn set_page_no(&mut self, page_no: u32) {
255        self.page_no = page_no.into();
256    }
257
258    pub fn set_size_after(&mut self, size_after: u32) {
259        self.size_after = size_after.into();
260    }
261
262    pub fn is_commit(&self) -> bool {
263        self.size_after() != 0
264    }
265}
266
267/// A page with a running runnign checksum prepended.
268/// `checksum` is computed by taking the checksum of the previous frame and crc32'ing it with frame
269/// data (header and page content). The first page is hashed with the segment header salt.
270#[repr(C)]
271#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
272pub struct CheckedFrame {
273    checksum: U32,
274    // frame should always be the last field
275    frame: Frame,
276}
277
278impl CheckedFrame {
279    pub(crate) const fn offset_of_frame() -> usize {
280        offset_of!(Self, frame)
281    }
282}
283
284#[repr(C)]
285#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
286pub struct Frame {
287    header: FrameHeader,
288    data: [u8; LIBSQL_PAGE_SIZE as usize],
289}
290
291impl Frame {
292    pub(crate) fn checksum(&self, previous_checksum: u32) -> u32 {
293        let mut digest = crc32fast::Hasher::new_with_initial(previous_checksum);
294        digest.write(self.as_bytes());
295        digest.finalize()
296    }
297
298    pub fn data(&self) -> &[u8] {
299        &self.data
300    }
301
302    pub fn header(&self) -> &FrameHeader {
303        &self.header
304    }
305
306    pub fn header_mut(&mut self) -> &mut FrameHeader {
307        &mut self.header
308    }
309
310    pub(crate) fn size_after(&self) -> Option<u32> {
311        let size_after = self.header().size_after.get();
312        (size_after != 0).then_some(size_after)
313    }
314
315    pub fn data_mut(&mut self) -> &mut [u8] {
316        &mut self.data
317    }
318}
319
320/// offset of the CheckedFrame in a current of sealed segment
321#[inline]
322fn checked_frame_offset(offset: u32) -> u64 {
323    (size_of::<SegmentHeader>() + (offset as usize) * size_of::<CheckedFrame>()) as u64
324}
325/// offset of a Frame in a current or sealed segment.
326#[inline]
327fn frame_offset(offset: u32) -> u64 {
328    checked_frame_offset(offset) + CheckedFrame::offset_of_frame() as u64
329}
330
331/// offset of a frame's page in a current or sealed segment.
332#[inline]
333fn page_offset(offset: u32) -> u64 {
334    frame_offset(offset) + size_of::<FrameHeader>() as u64
335}
336
337#[cfg(test)]
338mod test {
339    use super::*;
340
341    #[test]
342    fn offsets() {
343        assert_eq!(checked_frame_offset(0) as usize, size_of::<SegmentHeader>());
344        assert_eq!(
345            frame_offset(0) as usize,
346            size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame()
347        );
348        assert_eq!(
349            page_offset(0) as usize,
350            size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame() + size_of::<FrameHeader>()
351        );
352    }
353}