1use std::collections::BTreeMap;
2use std::hash::Hasher;
3use std::io::{BufWriter, ErrorKind, Write};
4use std::mem::size_of;
5use std::ops::Deref;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use chrono::prelude::{DateTime, Utc};
11use fst::{Map, MapBuilder, Streamer};
12use zerocopy::{AsBytes, FromZeroes};
13
14use crate::error::Result;
15use crate::io::buf::{IoBufMut, ZeroCopyBuf};
16use crate::io::file::{BufCopy, FileExt};
17use crate::io::Inspect;
18use crate::segment::{checked_frame_offset, CheckedFrame};
19use crate::{LIBSQL_MAGIC, LIBSQL_WAL_VERSION};
20
21use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader};
22use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader};
23
24#[derive(Debug)]
26pub struct SealedSegment<F> {
27 inner: Arc<SealedSegmentInner<F>>,
28}
29
30impl<F> Clone for SealedSegment<F> {
31 fn clone(&self) -> Self {
32 Self {
33 inner: self.inner.clone(),
34 }
35 }
36}
37
38pub struct SealedSegmentInner<F> {
39 pub read_locks: Arc<AtomicU64>,
40 header: SegmentHeader,
41 file: Arc<F>,
42 index: Map<Arc<[u8]>>,
43 path: PathBuf,
44}
45
46impl<F> std::fmt::Debug for SealedSegmentInner<F> {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("SealedSegment")
49 .field("read_locks", &self.read_locks)
50 .field("header", &self.header)
51 .field("index", &self.index)
52 .field("path", &self.path)
53 .finish()
54 }
55}
56
57impl<F> SealedSegment<F> {
58 pub fn empty(f: F) -> Self {
59 Self {
60 inner: SealedSegmentInner {
61 read_locks: Default::default(),
62 header: SegmentHeader::new_zeroed(),
63 file: Arc::new(f),
64 index: Map::default().map_data(Into::into).unwrap(),
65 path: PathBuf::new(),
66 }
67 .into(),
68 }
69 }
70}
71
72impl<F> Deref for SealedSegment<F> {
73 type Target = SealedSegmentInner<F>;
74
75 fn deref(&self) -> &Self::Target {
76 &self.inner
77 }
78}
79
80impl<F> Segment for SealedSegment<F>
81where
82 F: FileExt,
83{
84 async fn compact(&self, out_file: &impl FileExt, id: uuid::Uuid) -> Result<Vec<u8>> {
85 let mut hasher = crc32fast::Hasher::new();
86
87 let header = CompactedSegmentDataHeader {
88 frame_count: (self.index().len() as u32).into(),
89 segment_id: id.as_u128().into(),
90 start_frame_no: self.header().start_frame_no,
91 end_frame_no: self.header().last_commited_frame_no,
92 size_after: self.header.size_after,
93 version: LIBSQL_WAL_VERSION.into(),
94 magic: LIBSQL_MAGIC.into(),
95 page_size: self.header().page_size,
96 timestamp: self.header.sealed_at_timestamp,
97 };
98
99 hasher.update(header.as_bytes());
100 let (_, ret) = out_file
101 .write_all_at_async(ZeroCopyBuf::new_init(header), 0)
102 .await;
103 ret?;
104
105 let mut pages = self.index().stream();
106 let mut buffer = Box::new(ZeroCopyBuf::<Frame>::new_uninit());
107 let mut out_index = fst::MapBuilder::memory();
108 let mut current_offset = 0;
109
110 while let Some((page_no_bytes, offset)) = pages.next() {
111 let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await;
112 ret?;
113 b.get_mut().header_mut().set_size_after(0);
117 hasher.update(&b.get_ref().as_bytes());
118 let dest_offset =
119 size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
120 let (mut b, ret) = out_file.write_all_at_async(b, dest_offset as u64).await;
121 ret?;
122 out_index
123 .insert(page_no_bytes, current_offset as _)
124 .unwrap();
125 current_offset += 1;
126 b.deinit();
127 buffer = b;
128 }
129
130 let footer = CompactedSegmentDataFooter {
131 checksum: hasher.finalize().into(),
132 };
133
134 let footer_offset =
135 size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
136 let (_, ret) = out_file
137 .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _)
138 .await;
139 ret?;
140
141 Ok(out_index.into_inner().unwrap())
142 }
143
144 #[inline]
145 fn start_frame_no(&self) -> u64 {
146 self.header.start_frame_no.get()
147 }
148
149 #[inline]
150 fn last_committed(&self) -> u64 {
151 self.header.last_committed()
152 }
153
154 fn index(&self) -> &fst::Map<Arc<[u8]>> {
155 &self.index
156 }
157
158 fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result<bool> {
159 if self.header().start_frame_no.get() > max_frame_no {
160 return Ok(false);
161 }
162
163 let index = self.index();
164 if let Some(offset) = index.get(page_no.to_be_bytes()) {
165 self.read_page_offset(offset as u32, buf)?;
166
167 return Ok(true);
168 }
169
170 Ok(false)
171 }
172
173 fn is_checkpointable(&self) -> bool {
174 let read_locks = self.read_locks.load(Ordering::Relaxed);
175 tracing::debug!(read_locks);
176 read_locks == 0
177 }
178
179 fn size_after(&self) -> u32 {
180 self.header().size_after()
181 }
182
183 async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
184 where
185 B: IoBufMut + Send + 'static,
186 {
187 assert_eq!(buf.bytes_total(), size_of::<Frame>());
188 let frame_offset = frame_offset(offset);
189 let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await;
190 (buf, ret.map_err(Into::into))
191 }
192
193 fn destroy<IO: crate::io::Io>(&self, io: &IO) -> impl std::future::Future<Output = ()> {
194 async move {
195 if let Err(e) = io.remove_file_async(&self.path).await {
196 tracing::error!("failed to remove segment file {:?}: {e}", self.path);
197 }
198 }
199 }
200
201 fn timestamp(&self) -> DateTime<Utc> {
202 assert_ne!(
203 self.header().sealed_at_timestamp.get(),
204 0,
205 "segment was not sealed properly"
206 );
207 DateTime::from_timestamp_millis(self.header().sealed_at_timestamp.get() as _)
208 .expect("this should be a guaranteed roundtrip with DateTime::timestamp_millis")
209 }
210}
211
212impl<F: FileExt> SealedSegment<F> {
213 pub fn open(
214 file: Arc<F>,
215 path: PathBuf,
216 read_locks: Arc<AtomicU64>,
217 now: DateTime<Utc>,
218 ) -> Result<Option<Self>> {
219 let mut header: SegmentHeader = SegmentHeader::new_zeroed();
220 file.read_exact_at(header.as_bytes_mut(), 0)?;
221
222 header.check()?;
223
224 let index_offset = header.index_offset.get();
225 let index_len = header.index_size.get();
226
227 if header.is_empty() {
228 std::fs::remove_file(path)?;
229 return Ok(None);
230 }
231
232 if !header.flags().contains(SegmentFlags::SEALED) {
235 assert_eq!(header.index_offset.get(), 0, "{header:?}");
236 return Self::recover(file, path, header, now).map(Some);
237 }
238
239 let mut slice = vec![0; index_len as usize];
240 file.read_exact_at(&mut slice, index_offset)?;
241 let index = Map::new(slice.into())?;
242 Ok(Some(Self {
243 inner: SealedSegmentInner {
244 file,
245 path,
246 read_locks,
247 index,
248 header,
249 }
250 .into(),
251 }))
252 }
253
254 fn recover(
255 file: Arc<F>,
256 path: PathBuf,
257 mut header: SegmentHeader,
258 now: DateTime<Utc>,
259 ) -> Result<Self> {
260 assert!(!header.is_empty());
261 assert_eq!(header.index_size.get(), 0);
262 assert_eq!(header.index_offset.get(), 0);
263 assert!(!header.flags().contains(SegmentFlags::SEALED));
264
265 let mut current_checksum = header.salt.get();
266 tracing::trace!("recovering unsealed segment at {path:?}");
267 let mut index = BTreeMap::new();
268 let mut frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
269 let mut current_tx = Vec::new();
270 let mut last_committed = 0;
271 let mut size_after = 0;
272 let mut frame_count = 0;
273 let mut max_seen_frame_no = 0;
278 for i in 0.. {
279 let offset = checked_frame_offset(i as u32);
280 match file.read_exact_at(frame.as_bytes_mut(), offset) {
281 Ok(_) => {
282 let new_checksum = frame.frame.checksum(current_checksum);
283 if new_checksum != frame.checksum.get() {
286 tracing::warn!(
287 "found invalid checksum in segment, dropping {} frames",
288 header.last_committed() - last_committed
289 );
290 break;
291 }
292 current_checksum = new_checksum;
293 frame_count += 1;
294
295 #[cfg(debug_assertions)]
297 {
298 if !header.flags().contains(SegmentFlags::FRAME_UNORDERED) {
299 assert!(frame.frame.header().frame_no() > max_seen_frame_no);
300 }
301 }
302
303 max_seen_frame_no = max_seen_frame_no.max(frame.frame.header.frame_no());
304
305 current_tx.push(frame.frame.header().page_no());
306 if frame.frame.header.is_commit() {
307 last_committed = max_seen_frame_no;
308 size_after = frame.frame.header().size_after();
309 let base_offset = (i + 1) - current_tx.len();
310 for (frame_offset, page_no) in current_tx.drain(..).enumerate() {
311 index.insert(page_no, (base_offset + frame_offset) as u32);
312 }
313 }
314 }
315 Err(e) if e.kind() == ErrorKind::UnexpectedEof => break,
316 Err(e) => return Err(e.into()),
317 }
318 }
319
320 let index_offset = frame_count as u32;
321 let index_byte_offset = checked_frame_offset(index_offset);
322 let cursor = file.cursor(index_byte_offset);
323 let writer = BufCopy::new(cursor);
324 let writer = BufWriter::new(writer);
325 let mut digest = crc32fast::Hasher::new_with_initial(current_checksum);
326 let mut writer = Inspect::new(writer, |data: &[u8]| {
327 digest.write(data);
328 });
329 let mut builder = MapBuilder::new(&mut writer)?;
330 for (k, v) in index.into_iter() {
331 builder.insert(k.to_be_bytes(), v as u64).unwrap();
332 }
333 builder.finish().unwrap();
334 let writer = writer.into_inner();
335 let index_size = writer.get_ref().get_ref().count();
336 let index_checksum = digest.finalize();
337 let (mut cursor, index_bytes) = writer
338 .into_inner()
339 .map_err(|e| e.into_parts().0)?
340 .into_parts();
341 cursor.write_all(&index_checksum.to_le_bytes())?;
342 header.index_offset = index_byte_offset.into();
343 header.index_size = index_size.into();
344 header.last_commited_frame_no = last_committed.into();
345 header.size_after = size_after.into();
346 header.sealed_at_timestamp = (now.timestamp_millis() as u64).into();
347 let flags = header.flags();
348 header.set_flags(flags | SegmentFlags::SEALED);
349 header.recompute_checksum();
350 file.write_all_at(header.as_bytes(), 0)?;
351 let index = Map::new(index_bytes.into()).unwrap();
352
353 Ok(SealedSegment {
354 inner: SealedSegmentInner {
355 read_locks: Default::default(),
356 header,
357 file,
358 index,
359 path,
360 }
361 .into(),
362 })
363 }
364
365 pub fn path(&self) -> &Path {
366 &self.path
367 }
368
369 pub fn read_page_offset(&self, offset: u32, buf: &mut [u8]) -> std::io::Result<()> {
370 let page_offset = page_offset(offset) as usize;
371 self.file.read_exact_at(buf, page_offset as _)?;
372
373 Ok(())
374 }
375
376 pub fn read_frame_offset(&self, offset: u32, frame: &mut Frame) -> Result<()> {
377 let offset = frame_offset(offset);
378 self.file.read_exact_at(frame.as_bytes_mut(), offset as _)?;
379 Ok(())
380 }
381}
382
383impl<F> SealedSegment<F> {
384 pub fn header(&self) -> &SegmentHeader {
385 &self.header
386 }
387
388 pub async fn read_page_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
389 where
390 B: IoBufMut + Send + 'static,
391 F: FileExt,
392 {
393 assert_eq!(buf.bytes_total(), 4096);
394 let page_offset = page_offset(offset) as usize;
395 let (buf, ret) = self.file.read_exact_at_async(buf, page_offset as _).await;
396 (buf, ret.map_err(Into::into))
397 }
398}