1use bytes::{Bytes, BytesMut, BufMut};
11use parking_lot::Mutex;
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, BTreeMap};
14use std::fs::{File, OpenOptions};
15use std::io::{Read, Write, Seek, SeekFrom};
16use std::path::Path;
17use std::sync::Arc;
18use crate::{
19 Error, Result, Lsn, SeqNo,
20 layout::Region,
21 extent::Extent,
22 types::checksum,
23 sst_block::SstBlockHandle,
24 index::TableSchema,
25};
26
27pub type ManifestSeq = u64;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum ManifestRecord {
33 AddSst {
35 sst_id: u64,
36 stripe: u8,
37 extent: Extent,
38 handle: SstBlockHandle,
39 first_key: Bytes,
40 last_key: Bytes,
41 },
42
43 RemoveSst {
45 sst_id: u64,
46 },
47
48 Checkpoint {
50 lsn: Lsn,
51 seq: SeqNo,
52 },
53
54 AssignStripe {
56 stripe: u8,
57 sst_id: u64,
58 },
59
60 UpdateSchema {
62 schema: TableSchema,
63 },
64}
65
66#[derive(Debug, Clone)]
68pub struct SstMetadata {
69 pub sst_id: u64,
70 pub stripe: u8,
71 pub extent: Extent,
72 pub handle: SstBlockHandle,
73 pub first_key: Bytes,
74 pub last_key: Bytes,
75}
76
77#[derive(Debug, Clone)]
79pub struct ManifestState {
80 pub ssts: HashMap<u64, SstMetadata>,
82 pub checkpoint_lsn: Lsn,
84 pub checkpoint_seq: SeqNo,
86 pub stripe_assignments: BTreeMap<u8, Vec<u64>>, pub schema: TableSchema,
90}
91
92impl Default for ManifestState {
93 fn default() -> Self {
94 Self {
95 ssts: HashMap::new(),
96 checkpoint_lsn: 0,
97 checkpoint_seq: 0,
98 stripe_assignments: BTreeMap::new(),
99 schema: TableSchema::new(),
100 }
101 }
102}
103
104pub struct Manifest {
106 inner: Arc<Mutex<ManifestInner>>,
107}
108
109struct ManifestInner {
110 file: File,
111 region: Region,
112 state: ManifestState,
113 next_seq: ManifestSeq,
114 write_offset: u64,
115 pending: Vec<(ManifestSeq, ManifestRecord)>,
116}
117
118impl Manifest {
119 pub fn create(path: impl AsRef<Path>, region: Region) -> Result<Self> {
121 let mut file = OpenOptions::new()
122 .read(true)
123 .write(true)
124 .create(true)
125 .open(path)?;
126
127 file.seek(SeekFrom::Start(region.offset))?;
129 let zeros = vec![0u8; region.size as usize];
130 file.write_all(&zeros)?;
131 file.sync_all()?;
132
133 Ok(Self {
134 inner: Arc::new(Mutex::new(ManifestInner {
135 file,
136 region,
137 state: ManifestState::default(),
138 next_seq: 1,
139 write_offset: 0,
140 pending: Vec::new(),
141 })),
142 })
143 }
144
145 pub fn open(path: impl AsRef<Path>, region: Region) -> Result<Self> {
147 let mut file = OpenOptions::new()
148 .read(true)
149 .write(true)
150 .open(path)?;
151
152 let records = Self::recover(&mut file, ®ion)?;
154
155 let mut state = ManifestState::default();
157 let mut max_seq = 0;
158
159 for (seq, record) in records {
160 max_seq = max_seq.max(seq);
161 Self::apply_record(&mut state, record);
162 }
163
164 Ok(Self {
165 inner: Arc::new(Mutex::new(ManifestInner {
166 file,
167 region,
168 state,
169 next_seq: max_seq + 1,
170 write_offset: 0, pending: Vec::new(),
172 })),
173 })
174 }
175
176 pub fn append(&self, record: ManifestRecord) -> Result<ManifestSeq> {
178 let mut inner = self.inner.lock();
179
180 let seq = inner.next_seq;
181 inner.next_seq += 1;
182
183 inner.pending.push((seq, record.clone()));
184
185 Self::apply_record(&mut inner.state, record);
187
188 Ok(seq)
189 }
190
191 pub fn flush(&self) -> Result<()> {
193 let mut inner = self.inner.lock();
194
195 if inner.pending.is_empty() {
196 return Ok(());
197 }
198
199 let mut buf = BytesMut::new();
201
202 for (seq, record) in &inner.pending {
203 let data = bincode::serialize(record)
204 .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
205
206 buf.put_u64_le(*seq);
208 buf.put_u32_le(data.len() as u32);
209 buf.put_slice(&data);
210
211 let crc = checksum::compute(&data);
212 buf.put_u32_le(crc);
213 }
214
215 let total_size = buf.len() as u64;
216
217 if inner.write_offset + total_size > inner.region.size {
219 inner.write_offset = 0;
220 }
221
222 let file_offset = inner.region.offset + inner.write_offset;
224 inner.file.seek(SeekFrom::Start(file_offset))?;
225 inner.file.write_all(&buf)?;
226 inner.file.sync_all()?;
227
228 inner.write_offset += total_size;
229 inner.pending.clear();
230
231 Ok(())
232 }
233
234 pub fn state(&self) -> ManifestState {
236 let inner = self.inner.lock();
237 inner.state.clone()
238 }
239
240 pub fn get_sst(&self, sst_id: u64) -> Option<SstMetadata> {
242 let inner = self.inner.lock();
243 inner.state.ssts.get(&sst_id).cloned()
244 }
245
246 pub fn update_schema(&self, schema: TableSchema) -> Result<ManifestSeq> {
248 self.append(ManifestRecord::UpdateSchema { schema })
249 }
250
251 pub fn get_schema(&self) -> TableSchema {
253 let inner = self.inner.lock();
254 inner.state.schema.clone()
255 }
256
257 pub fn compact(&self) -> Result<()> {
259 let mut inner = self.inner.lock();
260
261 let ssts: Vec<_> = inner.state.ssts.values().cloned().collect();
263 let mut records = Vec::new();
264
265 for sst in ssts {
266 records.push((
267 inner.next_seq,
268 ManifestRecord::AddSst {
269 sst_id: sst.sst_id,
270 stripe: sst.stripe,
271 extent: sst.extent,
272 handle: sst.handle.clone(),
273 first_key: sst.first_key.clone(),
274 last_key: sst.last_key.clone(),
275 },
276 ));
277 inner.next_seq += 1;
278 }
279
280 records.push((
282 inner.next_seq,
283 ManifestRecord::Checkpoint {
284 lsn: inner.state.checkpoint_lsn,
285 seq: inner.state.checkpoint_seq,
286 },
287 ));
288 inner.next_seq += 1;
289
290 if !inner.state.schema.local_indexes.is_empty() || !inner.state.schema.global_indexes.is_empty() {
292 records.push((
293 inner.next_seq,
294 ManifestRecord::UpdateSchema {
295 schema: inner.state.schema.clone(),
296 },
297 ));
298 inner.next_seq += 1;
299 }
300
301 inner.pending = records;
303 inner.write_offset = 0; drop(inner);
306 self.flush()?;
307
308 Ok(())
309 }
310
311 fn apply_record(state: &mut ManifestState, record: ManifestRecord) {
314 match record {
315 ManifestRecord::AddSst {
316 sst_id,
317 stripe,
318 extent,
319 handle,
320 first_key,
321 last_key,
322 } => {
323 state.ssts.insert(
324 sst_id,
325 SstMetadata {
326 sst_id,
327 stripe,
328 extent,
329 handle,
330 first_key,
331 last_key,
332 },
333 );
334
335 state
336 .stripe_assignments
337 .entry(stripe)
338 .or_insert_with(Vec::new)
339 .push(sst_id);
340 }
341
342 ManifestRecord::RemoveSst { sst_id } => {
343 if let Some(meta) = state.ssts.remove(&sst_id) {
344 if let Some(ssts) = state.stripe_assignments.get_mut(&meta.stripe) {
345 ssts.retain(|&id| id != sst_id);
346 }
347 }
348 }
349
350 ManifestRecord::Checkpoint { lsn, seq } => {
351 state.checkpoint_lsn = lsn;
352 state.checkpoint_seq = seq;
353 }
354
355 ManifestRecord::AssignStripe { stripe, sst_id } => {
356 state
357 .stripe_assignments
358 .entry(stripe)
359 .or_insert_with(Vec::new)
360 .push(sst_id);
361 }
362
363 ManifestRecord::UpdateSchema { schema } => {
364 state.schema = schema;
365 }
366 }
367 }
368
369 fn recover(file: &mut File, region: &Region) -> Result<Vec<(ManifestSeq, ManifestRecord)>> {
370 let mut records = Vec::new();
371
372 file.seek(SeekFrom::Start(region.offset))?;
374 let mut ring_data = vec![0u8; region.size as usize];
375
376 let bytes_read = file.read(&mut ring_data)?;
377 if bytes_read == 0 {
378 return Ok(records);
379 }
380
381 let mut offset = 0usize;
382
383 while offset + 16 < ring_data.len() {
385 let seq = u64::from_le_bytes([
387 ring_data[offset],
388 ring_data[offset + 1],
389 ring_data[offset + 2],
390 ring_data[offset + 3],
391 ring_data[offset + 4],
392 ring_data[offset + 5],
393 ring_data[offset + 6],
394 ring_data[offset + 7],
395 ]);
396
397 if seq == 0 {
399 break;
400 }
401
402 let len = u32::from_le_bytes([
403 ring_data[offset + 8],
404 ring_data[offset + 9],
405 ring_data[offset + 10],
406 ring_data[offset + 11],
407 ]) as usize;
408
409 if offset + 12 + len + 4 > ring_data.len() {
410 break;
411 }
412
413 let data_start = offset + 12;
415 let data_end = data_start + len;
416 let data = &ring_data[data_start..data_end];
417
418 let crc_offset = data_end;
419 let expected_crc = u32::from_le_bytes([
420 ring_data[crc_offset],
421 ring_data[crc_offset + 1],
422 ring_data[crc_offset + 2],
423 ring_data[crc_offset + 3],
424 ]);
425
426 if checksum::verify(data, expected_crc) {
428 match bincode::deserialize::<ManifestRecord>(data) {
429 Ok(record) => {
430 records.push((seq, record));
431 offset = crc_offset + 4;
432 }
433 Err(_) => break,
434 }
435 } else {
436 break;
437 }
438 }
439
440 records.sort_by_key(|(seq, _)| *seq);
442
443 Ok(records)
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use tempfile::NamedTempFile;
451
452 #[test]
453 fn test_manifest_create_and_append() {
454 let tmp = NamedTempFile::new().unwrap();
455 let region = Region::new(0, 64 * 1024);
456
457 let manifest = Manifest::create(tmp.path(), region).unwrap();
458
459 let extent = Extent::new(1, 0, 4096);
460 let handle = SstBlockHandle {
461 extent,
462 num_data_blocks: 1,
463 index_offset: 0,
464 bloom_offset: 0,
465 compressed: false,
466 };
467
468 manifest
469 .append(ManifestRecord::AddSst {
470 sst_id: 1,
471 stripe: 0,
472 extent,
473 handle,
474 first_key: Bytes::from("a"),
475 last_key: Bytes::from("z"),
476 })
477 .unwrap();
478
479 manifest.flush().unwrap();
480
481 let state = manifest.state();
482 assert_eq!(state.ssts.len(), 1);
483 assert!(state.ssts.contains_key(&1));
484 }
485
486 #[test]
487 fn test_manifest_recovery() {
488 let tmp = NamedTempFile::new().unwrap();
489 let region = Region::new(0, 64 * 1024);
490
491 {
493 let manifest = Manifest::create(tmp.path(), region).unwrap();
494
495 for i in 1..=5 {
496 let extent = Extent::new(i, 0, 4096);
497 let handle = SstBlockHandle {
498 extent,
499 num_data_blocks: 1,
500 index_offset: 0,
501 bloom_offset: 0,
502 compressed: false,
503 };
504
505 manifest
506 .append(ManifestRecord::AddSst {
507 sst_id: i,
508 stripe: 0,
509 extent,
510 handle,
511 first_key: Bytes::from(format!("key{}", i)),
512 last_key: Bytes::from(format!("key{}", i + 100)),
513 })
514 .unwrap();
515 }
516
517 manifest.flush().unwrap();
518 }
519
520 let manifest = Manifest::open(tmp.path(), region).unwrap();
522 let state = manifest.state();
523 assert_eq!(state.ssts.len(), 5);
524 }
525
526 #[test]
527 fn test_manifest_remove_sst() {
528 let tmp = NamedTempFile::new().unwrap();
529 let region = Region::new(0, 64 * 1024);
530
531 let manifest = Manifest::create(tmp.path(), region).unwrap();
532
533 let extent = Extent::new(1, 0, 4096);
534 let handle = SstBlockHandle {
535 extent,
536 num_data_blocks: 1,
537 index_offset: 0,
538 bloom_offset: 0,
539 compressed: false,
540 };
541
542 manifest
543 .append(ManifestRecord::AddSst {
544 sst_id: 1,
545 stripe: 0,
546 extent,
547 handle,
548 first_key: Bytes::from("a"),
549 last_key: Bytes::from("z"),
550 })
551 .unwrap();
552
553 manifest.append(ManifestRecord::RemoveSst { sst_id: 1 }).unwrap();
554 manifest.flush().unwrap();
555
556 let state = manifest.state();
557 assert_eq!(state.ssts.len(), 0);
558 }
559
560 #[test]
561 fn test_manifest_checkpoint() {
562 let tmp = NamedTempFile::new().unwrap();
563 let region = Region::new(0, 64 * 1024);
564
565 let manifest = Manifest::create(tmp.path(), region).unwrap();
566
567 manifest
568 .append(ManifestRecord::Checkpoint { lsn: 100, seq: 50 })
569 .unwrap();
570
571 manifest.flush().unwrap();
572
573 let state = manifest.state();
574 assert_eq!(state.checkpoint_lsn, 100);
575 assert_eq!(state.checkpoint_seq, 50);
576 }
577
578 #[test]
579 fn test_manifest_compact() {
580 let tmp = NamedTempFile::new().unwrap();
581 let region = Region::new(0, 64 * 1024);
582
583 let manifest = Manifest::create(tmp.path(), region).unwrap();
584
585 for i in 1..=10 {
587 let extent = Extent::new(i, 0, 4096);
588 let handle = SstBlockHandle {
589 extent,
590 num_data_blocks: 1,
591 index_offset: 0,
592 bloom_offset: 0,
593 compressed: false,
594 };
595
596 manifest
597 .append(ManifestRecord::AddSst {
598 sst_id: i,
599 stripe: 0,
600 extent,
601 handle,
602 first_key: Bytes::from(format!("key{}", i)),
603 last_key: Bytes::from(format!("key{}", i + 100)),
604 })
605 .unwrap();
606 }
607
608 for i in 1..=5 {
610 manifest.append(ManifestRecord::RemoveSst { sst_id: i }).unwrap();
611 }
612
613 manifest.flush().unwrap();
614
615 manifest.compact().unwrap();
617
618 let state = manifest.state();
620 assert_eq!(state.ssts.len(), 5);
621 }
622}