kstone_core/
manifest.rs

1/// Manifest system for metadata management (Phase 1.5+)
2///
3/// Tracks:
4/// - SST metadata (extent, stripe, key range)
5/// - Checkpoint LSN and SeqNo
6/// - Stripe assignments
7///
8/// Uses a ring buffer format with copy-on-write updates.
9
10use bytes::{Bytes, BytesMut, BufMut};
11use parking_lot::Mutex;
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, BTreeMap};
14use std::fs::{File, OpenOptions};
15use std::io::{Read, Write, Seek, SeekFrom};
16use std::path::Path;
17use std::sync::Arc;
18use crate::{
19    Error, Result, Lsn, SeqNo,
20    layout::Region,
21    extent::Extent,
22    types::checksum,
23    sst_block::SstBlockHandle,
24    index::TableSchema,
25};
26
27/// Manifest sequence number
28pub type ManifestSeq = u64;
29
30/// Manifest record types
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum ManifestRecord {
33    /// Register a new SST
34    AddSst {
35        sst_id: u64,
36        stripe: u8,
37        extent: Extent,
38        handle: SstBlockHandle,
39        first_key: Bytes,
40        last_key: Bytes,
41    },
42
43    /// Remove an SST (for compaction)
44    RemoveSst {
45        sst_id: u64,
46    },
47
48    /// Update checkpoint
49    Checkpoint {
50        lsn: Lsn,
51        seq: SeqNo,
52    },
53
54    /// Stripe assignment (for Phase 1.6)
55    AssignStripe {
56        stripe: u8,
57        sst_id: u64,
58    },
59
60    /// Update table schema (Phase 3.1+)
61    UpdateSchema {
62        schema: TableSchema,
63    },
64}
65
66/// SST metadata
67#[derive(Debug, Clone)]
68pub struct SstMetadata {
69    pub sst_id: u64,
70    pub stripe: u8,
71    pub extent: Extent,
72    pub handle: SstBlockHandle,
73    pub first_key: Bytes,
74    pub last_key: Bytes,
75}
76
77/// Manifest state
78#[derive(Debug, Clone)]
79pub struct ManifestState {
80    /// Active SSTs
81    pub ssts: HashMap<u64, SstMetadata>,
82    /// Checkpoint LSN
83    pub checkpoint_lsn: Lsn,
84    /// Checkpoint SeqNo
85    pub checkpoint_seq: SeqNo,
86    /// Stripe assignments
87    pub stripe_assignments: BTreeMap<u8, Vec<u64>>, // stripe -> sst_ids
88    /// Table schema with index definitions (Phase 3.1+)
89    pub schema: TableSchema,
90}
91
92impl Default for ManifestState {
93    fn default() -> Self {
94        Self {
95            ssts: HashMap::new(),
96            checkpoint_lsn: 0,
97            checkpoint_seq: 0,
98            stripe_assignments: BTreeMap::new(),
99            schema: TableSchema::new(),
100        }
101    }
102}
103
104/// Manifest ring buffer
105pub struct Manifest {
106    inner: Arc<Mutex<ManifestInner>>,
107}
108
109struct ManifestInner {
110    file: File,
111    region: Region,
112    state: ManifestState,
113    next_seq: ManifestSeq,
114    write_offset: u64,
115    pending: Vec<(ManifestSeq, ManifestRecord)>,
116}
117
118impl Manifest {
119    /// Create a new manifest
120    pub fn create(path: impl AsRef<Path>, region: Region) -> Result<Self> {
121        let mut file = OpenOptions::new()
122            .read(true)
123            .write(true)
124            .create(true)
125            .open(path)?;
126
127        // Initialize ring buffer with zeros
128        file.seek(SeekFrom::Start(region.offset))?;
129        let zeros = vec![0u8; region.size as usize];
130        file.write_all(&zeros)?;
131        file.sync_all()?;
132
133        Ok(Self {
134            inner: Arc::new(Mutex::new(ManifestInner {
135                file,
136                region,
137                state: ManifestState::default(),
138                next_seq: 1,
139                write_offset: 0,
140                pending: Vec::new(),
141            })),
142        })
143    }
144
145    /// Open existing manifest and recover state
146    pub fn open(path: impl AsRef<Path>, region: Region) -> Result<Self> {
147        let mut file = OpenOptions::new()
148            .read(true)
149            .write(true)
150            .open(path)?;
151
152        // Recover all records
153        let records = Self::recover(&mut file, &region)?;
154
155        // Replay to build state
156        let mut state = ManifestState::default();
157        let mut max_seq = 0;
158
159        for (seq, record) in records {
160            max_seq = max_seq.max(seq);
161            Self::apply_record(&mut state, record);
162        }
163
164        Ok(Self {
165            inner: Arc::new(Mutex::new(ManifestInner {
166                file,
167                region,
168                state,
169                next_seq: max_seq + 1,
170                write_offset: 0, // Reset to beginning
171                pending: Vec::new(),
172            })),
173        })
174    }
175
176    /// Append a manifest record
177    pub fn append(&self, record: ManifestRecord) -> Result<ManifestSeq> {
178        let mut inner = self.inner.lock();
179
180        let seq = inner.next_seq;
181        inner.next_seq += 1;
182
183        inner.pending.push((seq, record.clone()));
184
185        // Apply to in-memory state
186        Self::apply_record(&mut inner.state, record);
187
188        Ok(seq)
189    }
190
191    /// Flush pending records to disk
192    pub fn flush(&self) -> Result<()> {
193        let mut inner = self.inner.lock();
194
195        if inner.pending.is_empty() {
196            return Ok(());
197        }
198
199        // Serialize all pending records
200        let mut buf = BytesMut::new();
201
202        for (seq, record) in &inner.pending {
203            let data = bincode::serialize(record)
204                .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
205
206            // Record: [seq(8) | len(4) | data | crc32c(4)]
207            buf.put_u64_le(*seq);
208            buf.put_u32_le(data.len() as u32);
209            buf.put_slice(&data);
210
211            let crc = checksum::compute(&data);
212            buf.put_u32_le(crc);
213        }
214
215        let total_size = buf.len() as u64;
216
217        // Check if we need to wrap around
218        if inner.write_offset + total_size > inner.region.size {
219            inner.write_offset = 0;
220        }
221
222        // Write to file
223        let file_offset = inner.region.offset + inner.write_offset;
224        inner.file.seek(SeekFrom::Start(file_offset))?;
225        inner.file.write_all(&buf)?;
226        inner.file.sync_all()?;
227
228        inner.write_offset += total_size;
229        inner.pending.clear();
230
231        Ok(())
232    }
233
234    /// Get current manifest state
235    pub fn state(&self) -> ManifestState {
236        let inner = self.inner.lock();
237        inner.state.clone()
238    }
239
240    /// Get SST metadata
241    pub fn get_sst(&self, sst_id: u64) -> Option<SstMetadata> {
242        let inner = self.inner.lock();
243        inner.state.ssts.get(&sst_id).cloned()
244    }
245
246    /// Update table schema (Phase 3.1+)
247    pub fn update_schema(&self, schema: TableSchema) -> Result<ManifestSeq> {
248        self.append(ManifestRecord::UpdateSchema { schema })
249    }
250
251    /// Get current table schema (Phase 3.1+)
252    pub fn get_schema(&self) -> TableSchema {
253        let inner = self.inner.lock();
254        inner.state.schema.clone()
255    }
256
257    /// Compact manifest by rewriting only active records
258    pub fn compact(&self) -> Result<()> {
259        let mut inner = self.inner.lock();
260
261        // Collect all active SSTs first to avoid borrow checker issues
262        let ssts: Vec<_> = inner.state.ssts.values().cloned().collect();
263        let mut records = Vec::new();
264
265        for sst in ssts {
266            records.push((
267                inner.next_seq,
268                ManifestRecord::AddSst {
269                    sst_id: sst.sst_id,
270                    stripe: sst.stripe,
271                    extent: sst.extent,
272                    handle: sst.handle.clone(),
273                    first_key: sst.first_key.clone(),
274                    last_key: sst.last_key.clone(),
275                },
276            ));
277            inner.next_seq += 1;
278        }
279
280        // Add checkpoint record
281        records.push((
282            inner.next_seq,
283            ManifestRecord::Checkpoint {
284                lsn: inner.state.checkpoint_lsn,
285                seq: inner.state.checkpoint_seq,
286            },
287        ));
288        inner.next_seq += 1;
289
290        // Add schema record (Phase 3.1+)
291        if !inner.state.schema.local_indexes.is_empty() || !inner.state.schema.global_indexes.is_empty() {
292            records.push((
293                inner.next_seq,
294                ManifestRecord::UpdateSchema {
295                    schema: inner.state.schema.clone(),
296                },
297            ));
298            inner.next_seq += 1;
299        }
300
301        // Write compacted records
302        inner.pending = records;
303        inner.write_offset = 0; // Start fresh
304
305        drop(inner);
306        self.flush()?;
307
308        Ok(())
309    }
310
311    // Internal helpers
312
313    fn apply_record(state: &mut ManifestState, record: ManifestRecord) {
314        match record {
315            ManifestRecord::AddSst {
316                sst_id,
317                stripe,
318                extent,
319                handle,
320                first_key,
321                last_key,
322            } => {
323                state.ssts.insert(
324                    sst_id,
325                    SstMetadata {
326                        sst_id,
327                        stripe,
328                        extent,
329                        handle,
330                        first_key,
331                        last_key,
332                    },
333                );
334
335                state
336                    .stripe_assignments
337                    .entry(stripe)
338                    .or_insert_with(Vec::new)
339                    .push(sst_id);
340            }
341
342            ManifestRecord::RemoveSst { sst_id } => {
343                if let Some(meta) = state.ssts.remove(&sst_id) {
344                    if let Some(ssts) = state.stripe_assignments.get_mut(&meta.stripe) {
345                        ssts.retain(|&id| id != sst_id);
346                    }
347                }
348            }
349
350            ManifestRecord::Checkpoint { lsn, seq } => {
351                state.checkpoint_lsn = lsn;
352                state.checkpoint_seq = seq;
353            }
354
355            ManifestRecord::AssignStripe { stripe, sst_id } => {
356                state
357                    .stripe_assignments
358                    .entry(stripe)
359                    .or_insert_with(Vec::new)
360                    .push(sst_id);
361            }
362
363            ManifestRecord::UpdateSchema { schema } => {
364                state.schema = schema;
365            }
366        }
367    }
368
369    fn recover(file: &mut File, region: &Region) -> Result<Vec<(ManifestSeq, ManifestRecord)>> {
370        let mut records = Vec::new();
371
372        // Read entire ring buffer
373        file.seek(SeekFrom::Start(region.offset))?;
374        let mut ring_data = vec![0u8; region.size as usize];
375
376        let bytes_read = file.read(&mut ring_data)?;
377        if bytes_read == 0 {
378            return Ok(records);
379        }
380
381        let mut offset = 0usize;
382
383        // Scan for valid records
384        while offset + 16 < ring_data.len() {
385            // Record header: seq(8) + len(4)
386            let seq = u64::from_le_bytes([
387                ring_data[offset],
388                ring_data[offset + 1],
389                ring_data[offset + 2],
390                ring_data[offset + 3],
391                ring_data[offset + 4],
392                ring_data[offset + 5],
393                ring_data[offset + 6],
394                ring_data[offset + 7],
395            ]);
396
397            // Seq 0 indicates empty space
398            if seq == 0 {
399                break;
400            }
401
402            let len = u32::from_le_bytes([
403                ring_data[offset + 8],
404                ring_data[offset + 9],
405                ring_data[offset + 10],
406                ring_data[offset + 11],
407            ]) as usize;
408
409            if offset + 12 + len + 4 > ring_data.len() {
410                break;
411            }
412
413            // Extract data and CRC
414            let data_start = offset + 12;
415            let data_end = data_start + len;
416            let data = &ring_data[data_start..data_end];
417
418            let crc_offset = data_end;
419            let expected_crc = u32::from_le_bytes([
420                ring_data[crc_offset],
421                ring_data[crc_offset + 1],
422                ring_data[crc_offset + 2],
423                ring_data[crc_offset + 3],
424            ]);
425
426            // Verify checksum
427            if checksum::verify(data, expected_crc) {
428                match bincode::deserialize::<ManifestRecord>(data) {
429                    Ok(record) => {
430                        records.push((seq, record));
431                        offset = crc_offset + 4;
432                    }
433                    Err(_) => break,
434                }
435            } else {
436                break;
437            }
438        }
439
440        // Sort by seq
441        records.sort_by_key(|(seq, _)| *seq);
442
443        Ok(records)
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use tempfile::NamedTempFile;
451
452    #[test]
453    fn test_manifest_create_and_append() {
454        let tmp = NamedTempFile::new().unwrap();
455        let region = Region::new(0, 64 * 1024);
456
457        let manifest = Manifest::create(tmp.path(), region).unwrap();
458
459        let extent = Extent::new(1, 0, 4096);
460        let handle = SstBlockHandle {
461            extent,
462            num_data_blocks: 1,
463            index_offset: 0,
464            bloom_offset: 0,
465            compressed: false,
466        };
467
468        manifest
469            .append(ManifestRecord::AddSst {
470                sst_id: 1,
471                stripe: 0,
472                extent,
473                handle,
474                first_key: Bytes::from("a"),
475                last_key: Bytes::from("z"),
476            })
477            .unwrap();
478
479        manifest.flush().unwrap();
480
481        let state = manifest.state();
482        assert_eq!(state.ssts.len(), 1);
483        assert!(state.ssts.contains_key(&1));
484    }
485
486    #[test]
487    fn test_manifest_recovery() {
488        let tmp = NamedTempFile::new().unwrap();
489        let region = Region::new(0, 64 * 1024);
490
491        // Write some records
492        {
493            let manifest = Manifest::create(tmp.path(), region).unwrap();
494
495            for i in 1..=5 {
496                let extent = Extent::new(i, 0, 4096);
497                let handle = SstBlockHandle {
498                    extent,
499                    num_data_blocks: 1,
500                    index_offset: 0,
501                    bloom_offset: 0,
502                    compressed: false,
503                };
504
505                manifest
506                    .append(ManifestRecord::AddSst {
507                        sst_id: i,
508                        stripe: 0,
509                        extent,
510                        handle,
511                        first_key: Bytes::from(format!("key{}", i)),
512                        last_key: Bytes::from(format!("key{}", i + 100)),
513                    })
514                    .unwrap();
515            }
516
517            manifest.flush().unwrap();
518        }
519
520        // Reopen and verify
521        let manifest = Manifest::open(tmp.path(), region).unwrap();
522        let state = manifest.state();
523        assert_eq!(state.ssts.len(), 5);
524    }
525
526    #[test]
527    fn test_manifest_remove_sst() {
528        let tmp = NamedTempFile::new().unwrap();
529        let region = Region::new(0, 64 * 1024);
530
531        let manifest = Manifest::create(tmp.path(), region).unwrap();
532
533        let extent = Extent::new(1, 0, 4096);
534        let handle = SstBlockHandle {
535            extent,
536            num_data_blocks: 1,
537            index_offset: 0,
538            bloom_offset: 0,
539            compressed: false,
540        };
541
542        manifest
543            .append(ManifestRecord::AddSst {
544                sst_id: 1,
545                stripe: 0,
546                extent,
547                handle,
548                first_key: Bytes::from("a"),
549                last_key: Bytes::from("z"),
550            })
551            .unwrap();
552
553        manifest.append(ManifestRecord::RemoveSst { sst_id: 1 }).unwrap();
554        manifest.flush().unwrap();
555
556        let state = manifest.state();
557        assert_eq!(state.ssts.len(), 0);
558    }
559
560    #[test]
561    fn test_manifest_checkpoint() {
562        let tmp = NamedTempFile::new().unwrap();
563        let region = Region::new(0, 64 * 1024);
564
565        let manifest = Manifest::create(tmp.path(), region).unwrap();
566
567        manifest
568            .append(ManifestRecord::Checkpoint { lsn: 100, seq: 50 })
569            .unwrap();
570
571        manifest.flush().unwrap();
572
573        let state = manifest.state();
574        assert_eq!(state.checkpoint_lsn, 100);
575        assert_eq!(state.checkpoint_seq, 50);
576    }
577
578    #[test]
579    fn test_manifest_compact() {
580        let tmp = NamedTempFile::new().unwrap();
581        let region = Region::new(0, 64 * 1024);
582
583        let manifest = Manifest::create(tmp.path(), region).unwrap();
584
585        // Add and remove SSTs
586        for i in 1..=10 {
587            let extent = Extent::new(i, 0, 4096);
588            let handle = SstBlockHandle {
589                extent,
590                num_data_blocks: 1,
591                index_offset: 0,
592                bloom_offset: 0,
593                compressed: false,
594            };
595
596            manifest
597                .append(ManifestRecord::AddSst {
598                    sst_id: i,
599                    stripe: 0,
600                    extent,
601                    handle,
602                    first_key: Bytes::from(format!("key{}", i)),
603                    last_key: Bytes::from(format!("key{}", i + 100)),
604                })
605                .unwrap();
606        }
607
608        // Remove half
609        for i in 1..=5 {
610            manifest.append(ManifestRecord::RemoveSst { sst_id: i }).unwrap();
611        }
612
613        manifest.flush().unwrap();
614
615        // Compact
616        manifest.compact().unwrap();
617
618        // Should still have 5 SSTs
619        let state = manifest.state();
620        assert_eq!(state.ssts.len(), 5);
621    }
622}