libsql_wal/segment/
sealed.rs

1use std::collections::BTreeMap;
2use std::hash::Hasher;
3use std::io::{BufWriter, ErrorKind, Write};
4use std::mem::size_of;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use chrono::prelude::{DateTime, Utc};
11use fst::{Map, MapBuilder, Streamer};
12use zerocopy::{AsBytes, FromZeroes};
13
14use crate::error::Result;
15use crate::io::buf::{IoBufMut, ZeroCopyBuf};
16use crate::io::file::{BufCopy, FileExt};
17use crate::io::Inspect;
18use crate::segment::{checked_frame_offset, CheckedFrame};
19use crate::{LIBSQL_MAGIC, LIBSQL_WAL_VERSION};
20
21use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader};
22use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader};
23
24/// an immutable, wal segment
25#[derive(Debug)]
26pub struct SealedSegment<F> {
27    inner: Arc<SealedSegmentInner<F>>,
28}
29
30impl<F> Clone for SealedSegment<F> {
31    fn clone(&self) -> Self {
32        Self {
33            inner: self.inner.clone(),
34        }
35    }
36}
37
38pub struct SealedSegmentInner<F> {
39    pub read_locks: Arc<AtomicU64>,
40    header: SegmentHeader,
41    file: Arc<F>,
42    index: Map<Arc<[u8]>>,
43    path: PathBuf,
44}
45
46impl<F> std::fmt::Debug for SealedSegmentInner<F> {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("SealedSegment")
49            .field("read_locks", &self.read_locks)
50            .field("header", &self.header)
51            .field("index", &self.index)
52            .field("path", &self.path)
53            .finish()
54    }
55}
56
57impl<F> SealedSegment<F> {
58    pub fn empty(f: F) -> Self {
59        Self {
60            inner: SealedSegmentInner {
61                read_locks: Default::default(),
62                header: SegmentHeader::new_zeroed(),
63                file: Arc::new(f),
64                index: Map::default().map_data(Into::into).unwrap(),
65                path: PathBuf::new(),
66            }
67            .into(),
68        }
69    }
70}
71
72impl<F> Deref for SealedSegment<F> {
73    type Target = SealedSegmentInner<F>;
74
75    fn deref(&self) -> &Self::Target {
76        &self.inner
77    }
78}
79
80impl<F> Segment for SealedSegment<F>
81where
82    F: FileExt,
83{
84    async fn compact(&self, out_file: &impl FileExt, id: uuid::Uuid) -> Result<Vec<u8>> {
85        let mut hasher = crc32fast::Hasher::new();
86
87        let header = CompactedSegmentDataHeader {
88            frame_count: (self.index().len() as u32).into(),
89            segment_id: id.as_u128().into(),
90            start_frame_no: self.header().start_frame_no,
91            end_frame_no: self.header().last_commited_frame_no,
92            size_after: self.header.size_after,
93            version: LIBSQL_WAL_VERSION.into(),
94            magic: LIBSQL_MAGIC.into(),
95            page_size: self.header().page_size,
96            timestamp: self.header.sealed_at_timestamp,
97        };
98
99        hasher.update(header.as_bytes());
100        let (_, ret) = out_file
101            .write_all_at_async(ZeroCopyBuf::new_init(header), 0)
102            .await;
103        ret?;
104
105        let mut pages = self.index().stream();
106        let mut buffer = Box::new(ZeroCopyBuf::<Frame>::new_uninit());
107        let mut out_index = fst::MapBuilder::memory();
108        let mut current_offset = 0;
109
110        while let Some((page_no_bytes, offset)) = pages.next() {
111            let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await;
112            ret?;
113            // transaction boundaries in a segment are completely erased. The responsibility is on
114            // the consumer of the segment to place the transaction boundary such that all frames from
115            // the segment are applied within the same transaction.
116            b.get_mut().header_mut().set_size_after(0);
117            hasher.update(&b.get_ref().as_bytes());
118            let dest_offset =
119                size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
120            let (mut b, ret) = out_file.write_all_at_async(b, dest_offset as u64).await;
121            ret?;
122            out_index
123                .insert(page_no_bytes, current_offset as _)
124                .unwrap();
125            current_offset += 1;
126            b.deinit();
127            buffer = b;
128        }
129
130        let footer = CompactedSegmentDataFooter {
131            checksum: hasher.finalize().into(),
132        };
133
134        let footer_offset =
135            size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
136        let (_, ret) = out_file
137            .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _)
138            .await;
139        ret?;
140
141        Ok(out_index.into_inner().unwrap())
142    }
143
144    #[inline]
145    fn start_frame_no(&self) -> u64 {
146        self.header.start_frame_no.get()
147    }
148
149    #[inline]
150    fn last_committed(&self) -> u64 {
151        self.header.last_committed()
152    }
153
154    fn index(&self) -> &fst::Map<Arc<[u8]>> {
155        &self.index
156    }
157
158    fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result<bool> {
159        if self.header().start_frame_no.get() > max_frame_no {
160            return Ok(false);
161        }
162
163        let index = self.index();
164        if let Some(offset) = index.get(page_no.to_be_bytes()) {
165            self.read_page_offset(offset as u32, buf)?;
166
167            return Ok(true);
168        }
169
170        Ok(false)
171    }
172
173    fn is_checkpointable(&self) -> bool {
174        let read_locks = self.read_locks.load(Ordering::Relaxed);
175        tracing::debug!(read_locks);
176        read_locks == 0
177    }
178
179    fn size_after(&self) -> u32 {
180        self.header().size_after()
181    }
182
183    async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
184    where
185        B: IoBufMut + Send + 'static,
186    {
187        assert_eq!(buf.bytes_total(), size_of::<Frame>());
188        let frame_offset = frame_offset(offset);
189        let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await;
190        (buf, ret.map_err(Into::into))
191    }
192
193    fn destroy<IO: crate::io::Io>(&self, io: &IO) -> impl std::future::Future<Output = ()> {
194        async move {
195            if let Err(e) = io.remove_file_async(&self.path).await {
196                tracing::error!("failed to remove segment file {:?}: {e}", self.path);
197            }
198        }
199    }
200
201    fn timestamp(&self) -> DateTime<Utc> {
202        assert_ne!(
203            self.header().sealed_at_timestamp.get(),
204            0,
205            "segment was not sealed properly"
206        );
207        DateTime::from_timestamp_millis(self.header().sealed_at_timestamp.get() as _)
208            .expect("this should be a guaranteed roundtrip with DateTime::timestamp_millis")
209    }
210}
211
212impl<F: FileExt> SealedSegment<F> {
213    pub fn open(
214        file: Arc<F>,
215        path: PathBuf,
216        read_locks: Arc<AtomicU64>,
217        now: DateTime<Utc>,
218    ) -> Result<Option<Self>> {
219        let mut header: SegmentHeader = SegmentHeader::new_zeroed();
220        file.read_exact_at(header.as_bytes_mut(), 0)?;
221
222        header.check()?;
223
224        let index_offset = header.index_offset.get();
225        let index_len = header.index_size.get();
226
227        if header.is_empty() {
228            std::fs::remove_file(path)?;
229            return Ok(None);
230        }
231
232        // This happens in case of crash: the segment is not empty, but it wasn't sealed. We need to
233        // recover the index, and seal the segment.
234        if !header.flags().contains(SegmentFlags::SEALED) {
235            assert_eq!(header.index_offset.get(), 0, "{header:?}");
236            return Self::recover(file, path, header, now).map(Some);
237        }
238
239        let mut slice = vec![0; index_len as usize];
240        file.read_exact_at(&mut slice, index_offset)?;
241        let index = Map::new(slice.into())?;
242        Ok(Some(Self {
243            inner: SealedSegmentInner {
244                file,
245                path,
246                read_locks,
247                index,
248                header,
249            }
250            .into(),
251        }))
252    }
253
254    fn recover(
255        file: Arc<F>,
256        path: PathBuf,
257        mut header: SegmentHeader,
258        now: DateTime<Utc>,
259    ) -> Result<Self> {
260        assert!(!header.is_empty());
261        assert_eq!(header.index_size.get(), 0);
262        assert_eq!(header.index_offset.get(), 0);
263        assert!(!header.flags().contains(SegmentFlags::SEALED));
264
265        let mut current_checksum = header.salt.get();
266        tracing::trace!("recovering unsealed segment at {path:?}");
267        let mut index = BTreeMap::new();
268        let mut frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
269        let mut current_tx = Vec::new();
270        let mut last_committed = 0;
271        let mut size_after = 0;
272        let mut frame_count = 0;
273        // When the segment is ordered, then the biggest frame_no is the last commited
274        // frame. This is not the case for an unordered segment (in case of recovery or
275        // a replica), so we track the biggest frame_no and set last_commited to that
276        // value on a commit frame
277        let mut max_seen_frame_no = 0;
278        for i in 0.. {
279            let offset = checked_frame_offset(i as u32);
280            match file.read_exact_at(frame.as_bytes_mut(), offset) {
281                Ok(_) => {
282                    let new_checksum = frame.frame.checksum(current_checksum);
283                    // this is the first checksum that doesn't match the checksum chain, drop the
284                    // transaction and any frame after that.
285                    if new_checksum != frame.checksum.get() {
286                        tracing::warn!(
287                            "found invalid checksum in segment, dropping {} frames",
288                            header.last_committed() - last_committed
289                        );
290                        break;
291                    }
292                    current_checksum = new_checksum;
293                    frame_count += 1;
294
295                    // this must always hold for a ordered segment.
296                    #[cfg(debug_assertions)]
297                    {
298                        if !header.flags().contains(SegmentFlags::FRAME_UNORDERED) {
299                            assert!(frame.frame.header().frame_no() > max_seen_frame_no);
300                        }
301                    }
302
303                    max_seen_frame_no = max_seen_frame_no.max(frame.frame.header.frame_no());
304
305                    current_tx.push(frame.frame.header().page_no());
306                    if frame.frame.header.is_commit() {
307                        last_committed = max_seen_frame_no;
308                        size_after = frame.frame.header().size_after();
309                        let base_offset = (i + 1) - current_tx.len();
310                        for (frame_offset, page_no) in current_tx.drain(..).enumerate() {
311                            index.insert(page_no, (base_offset + frame_offset) as u32);
312                        }
313                    }
314                }
315                Err(e) if e.kind() == ErrorKind::UnexpectedEof => break,
316                Err(e) => return Err(e.into()),
317            }
318        }
319
320        let index_offset = frame_count as u32;
321        let index_byte_offset = checked_frame_offset(index_offset);
322        let cursor = file.cursor(index_byte_offset);
323        let writer = BufCopy::new(cursor);
324        let writer = BufWriter::new(writer);
325        let mut digest = crc32fast::Hasher::new_with_initial(current_checksum);
326        let mut writer = Inspect::new(writer, |data: &[u8]| {
327            digest.write(data);
328        });
329        let mut builder = MapBuilder::new(&mut writer)?;
330        for (k, v) in index.into_iter() {
331            builder.insert(k.to_be_bytes(), v as u64).unwrap();
332        }
333        builder.finish().unwrap();
334        let writer = writer.into_inner();
335        let index_size = writer.get_ref().get_ref().count();
336        let index_checksum = digest.finalize();
337        let (mut cursor, index_bytes) = writer
338            .into_inner()
339            .map_err(|e| e.into_parts().0)?
340            .into_parts();
341        cursor.write_all(&index_checksum.to_le_bytes())?;
342        header.index_offset = index_byte_offset.into();
343        header.index_size = index_size.into();
344        header.last_commited_frame_no = last_committed.into();
345        header.size_after = size_after.into();
346        header.sealed_at_timestamp = (now.timestamp_millis() as u64).into();
347        let flags = header.flags();
348        header.set_flags(flags | SegmentFlags::SEALED);
349        header.recompute_checksum();
350        file.write_all_at(header.as_bytes(), 0)?;
351        let index = Map::new(index_bytes.into()).unwrap();
352
353        Ok(SealedSegment {
354            inner: SealedSegmentInner {
355                read_locks: Default::default(),
356                header,
357                file,
358                index,
359                path,
360            }
361            .into(),
362        })
363    }
364
365    pub fn path(&self) -> &Path {
366        &self.path
367    }
368
369    pub fn read_page_offset(&self, offset: u32, buf: &mut [u8]) -> std::io::Result<()> {
370        let page_offset = page_offset(offset) as usize;
371        self.file.read_exact_at(buf, page_offset as _)?;
372
373        Ok(())
374    }
375
376    pub fn read_frame_offset(&self, offset: u32, frame: &mut Frame) -> Result<()> {
377        let offset = frame_offset(offset);
378        self.file.read_exact_at(frame.as_bytes_mut(), offset as _)?;
379        Ok(())
380    }
381}
382
383impl<F> SealedSegment<F> {
384    pub fn header(&self) -> &SegmentHeader {
385        &self.header
386    }
387
388    pub async fn read_page_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
389    where
390        B: IoBufMut + Send + 'static,
391        F: FileExt,
392    {
393        assert_eq!(buf.bytes_total(), 4096);
394        let page_offset = page_offset(offset) as usize;
395        let (buf, ret) = self.file.read_exact_at_async(buf, page_offset as _).await;
396        (buf, ret.map_err(Into::into))
397    }
398}