quill_sql/recovery/
control_file.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use parking_lot::Mutex;
5use rand::random;
6use serde::{Deserialize, Serialize};
7
8use crate::error::QuillSQLResult;
9use crate::recovery::Lsn;
10
11const CONTROL_FILE_NAME: &str = "control.dat";
12const CONTROL_FILE_VERSION: u32 = 2;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15// control.dat (bincode of ControlFileData)
16// struct ControlFileData {
17//   version: u32              // CONTROL_FILE_VERSION (currently 2)
18//   system_id: u128           // random system identifier
19//   wal_segment_size: u64     // current wal segment size
20//   durable_lsn: u64          // flushed & durable WAL LSN
21//   max_assigned_lsn: u64     // latest assigned LSN
22//   last_checkpoint_lsn: u64  // last checkpoint LSN
23//   last_record_start: u64    // last record's start LSN
24// }
25// Persist flow: write tmp file then rename (atomic-ish on POSIX)
26struct ControlFileData {
27    version: u32,
28    system_id: u128,
29    wal_segment_size: u64,
30    durable_lsn: Lsn,
31    max_assigned_lsn: Lsn,
32    last_checkpoint_lsn: Lsn,
33    last_record_start: Lsn,
34    checkpoint_redo_start: Lsn,
35}
36
37impl ControlFileData {
38    fn new(system_id: u128, wal_segment_size: u64) -> Self {
39        Self {
40            version: CONTROL_FILE_VERSION,
41            system_id,
42            wal_segment_size,
43            durable_lsn: 0,
44            max_assigned_lsn: 0,
45            last_checkpoint_lsn: 0,
46            last_record_start: 0,
47            checkpoint_redo_start: 0,
48        }
49    }
50}
51
52#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
53pub struct ControlFileSnapshot {
54    pub durable_lsn: Lsn,
55    pub max_assigned_lsn: Lsn,
56    pub last_checkpoint_lsn: Lsn,
57    pub last_record_start: Lsn,
58    pub checkpoint_redo_start: Lsn,
59}
60
61#[derive(Debug)]
62pub struct ControlFileManager {
63    path: PathBuf,
64    inner: Mutex<ControlFileData>,
65}
66
67#[derive(Debug, Clone, Copy)]
68pub struct WalInitState {
69    pub durable_lsn: Lsn,
70    pub max_assigned_lsn: Lsn,
71    pub last_checkpoint_lsn: Lsn,
72    pub last_record_start: Lsn,
73    pub checkpoint_redo_start: Lsn,
74}
75
76impl WalInitState {
77    pub fn next_lsn(&self) -> Lsn {
78        self.max_assigned_lsn
79    }
80}
81
82impl ControlFileManager {
83    pub fn load_or_init(
84        directory: &Path,
85        wal_segment_size: u64,
86    ) -> QuillSQLResult<(Self, WalInitState)> {
87        fs::create_dir_all(directory)?;
88        let path = directory.join(CONTROL_FILE_NAME);
89        let mut needs_persist = false;
90        let (data, newly_created) = if path.exists() {
91            let bytes = fs::read(&path)?;
92            let mut data: ControlFileData = bincode::deserialize(&bytes)?;
93            if data.version != CONTROL_FILE_VERSION {
94                return Err(crate::error::QuillSQLError::Internal(format!(
95                    "Unsupported control file version: {}",
96                    data.version
97                )));
98            }
99            if data.wal_segment_size != wal_segment_size {
100                data.wal_segment_size = wal_segment_size;
101                needs_persist = true;
102            }
103            (data, false)
104        } else {
105            let system_id = generate_system_id();
106            needs_persist = true;
107            (ControlFileData::new(system_id, wal_segment_size), true)
108        };
109
110        let snapshot = WalInitState {
111            durable_lsn: data.durable_lsn,
112            max_assigned_lsn: data.max_assigned_lsn,
113            last_checkpoint_lsn: data.last_checkpoint_lsn,
114            last_record_start: data.last_record_start,
115            checkpoint_redo_start: data.checkpoint_redo_start,
116        };
117
118        let manager = Self {
119            path,
120            inner: Mutex::new(data),
121        };
122
123        if newly_created || needs_persist {
124            manager.persist()?;
125        }
126
127        Ok((manager, snapshot))
128    }
129
130    pub fn update(
131        &self,
132        durable_lsn: Lsn,
133        max_assigned_lsn: Lsn,
134        last_checkpoint_lsn: Lsn,
135        last_record_start: Lsn,
136        checkpoint_redo_start: Lsn,
137    ) -> QuillSQLResult<()> {
138        {
139            let mut guard = self.inner.lock();
140            guard.durable_lsn = durable_lsn;
141            guard.max_assigned_lsn = max_assigned_lsn;
142            guard.last_checkpoint_lsn = last_checkpoint_lsn;
143            guard.last_record_start = last_record_start;
144            guard.checkpoint_redo_start = checkpoint_redo_start;
145        }
146        self.persist()
147    }
148
149    pub fn snapshot(&self) -> ControlFileSnapshot {
150        let guard = self.inner.lock();
151        ControlFileSnapshot {
152            durable_lsn: guard.durable_lsn,
153            max_assigned_lsn: guard.max_assigned_lsn,
154            last_checkpoint_lsn: guard.last_checkpoint_lsn,
155            last_record_start: guard.last_record_start,
156            checkpoint_redo_start: guard.checkpoint_redo_start,
157        }
158    }
159
160    fn persist(&self) -> QuillSQLResult<()> {
161        let guard = self.inner.lock();
162        let bytes = bincode::serialize(&*guard)?;
163        drop(guard);
164        let tmp_path = self.path.with_extension("tmp");
165        fs::write(&tmp_path, &bytes)?;
166        fs::rename(tmp_path, &self.path)?;
167        Ok(())
168    }
169}
170
171fn generate_system_id() -> u128 {
172    random::<u128>()
173}