quill_sql/recovery/
control_file.rs1use std::fs;
2use std::path::{Path, PathBuf};
3
4use parking_lot::Mutex;
5use rand::random;
6use serde::{Deserialize, Serialize};
7
8use crate::error::QuillSQLResult;
9use crate::recovery::Lsn;
10
11const CONTROL_FILE_NAME: &str = "control.dat";
12const CONTROL_FILE_VERSION: u32 = 2;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct ControlFileData {
27 version: u32,
28 system_id: u128,
29 wal_segment_size: u64,
30 durable_lsn: Lsn,
31 max_assigned_lsn: Lsn,
32 last_checkpoint_lsn: Lsn,
33 last_record_start: Lsn,
34 checkpoint_redo_start: Lsn,
35}
36
37impl ControlFileData {
38 fn new(system_id: u128, wal_segment_size: u64) -> Self {
39 Self {
40 version: CONTROL_FILE_VERSION,
41 system_id,
42 wal_segment_size,
43 durable_lsn: 0,
44 max_assigned_lsn: 0,
45 last_checkpoint_lsn: 0,
46 last_record_start: 0,
47 checkpoint_redo_start: 0,
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
53pub struct ControlFileSnapshot {
54 pub durable_lsn: Lsn,
55 pub max_assigned_lsn: Lsn,
56 pub last_checkpoint_lsn: Lsn,
57 pub last_record_start: Lsn,
58 pub checkpoint_redo_start: Lsn,
59}
60
61#[derive(Debug)]
62pub struct ControlFileManager {
63 path: PathBuf,
64 inner: Mutex<ControlFileData>,
65}
66
67#[derive(Debug, Clone, Copy)]
68pub struct WalInitState {
69 pub durable_lsn: Lsn,
70 pub max_assigned_lsn: Lsn,
71 pub last_checkpoint_lsn: Lsn,
72 pub last_record_start: Lsn,
73 pub checkpoint_redo_start: Lsn,
74}
75
76impl WalInitState {
77 pub fn next_lsn(&self) -> Lsn {
78 self.max_assigned_lsn
79 }
80}
81
82impl ControlFileManager {
83 pub fn load_or_init(
84 directory: &Path,
85 wal_segment_size: u64,
86 ) -> QuillSQLResult<(Self, WalInitState)> {
87 fs::create_dir_all(directory)?;
88 let path = directory.join(CONTROL_FILE_NAME);
89 let mut needs_persist = false;
90 let (data, newly_created) = if path.exists() {
91 let bytes = fs::read(&path)?;
92 let mut data: ControlFileData = bincode::deserialize(&bytes)?;
93 if data.version != CONTROL_FILE_VERSION {
94 return Err(crate::error::QuillSQLError::Internal(format!(
95 "Unsupported control file version: {}",
96 data.version
97 )));
98 }
99 if data.wal_segment_size != wal_segment_size {
100 data.wal_segment_size = wal_segment_size;
101 needs_persist = true;
102 }
103 (data, false)
104 } else {
105 let system_id = generate_system_id();
106 needs_persist = true;
107 (ControlFileData::new(system_id, wal_segment_size), true)
108 };
109
110 let snapshot = WalInitState {
111 durable_lsn: data.durable_lsn,
112 max_assigned_lsn: data.max_assigned_lsn,
113 last_checkpoint_lsn: data.last_checkpoint_lsn,
114 last_record_start: data.last_record_start,
115 checkpoint_redo_start: data.checkpoint_redo_start,
116 };
117
118 let manager = Self {
119 path,
120 inner: Mutex::new(data),
121 };
122
123 if newly_created || needs_persist {
124 manager.persist()?;
125 }
126
127 Ok((manager, snapshot))
128 }
129
130 pub fn update(
131 &self,
132 durable_lsn: Lsn,
133 max_assigned_lsn: Lsn,
134 last_checkpoint_lsn: Lsn,
135 last_record_start: Lsn,
136 checkpoint_redo_start: Lsn,
137 ) -> QuillSQLResult<()> {
138 {
139 let mut guard = self.inner.lock();
140 guard.durable_lsn = durable_lsn;
141 guard.max_assigned_lsn = max_assigned_lsn;
142 guard.last_checkpoint_lsn = last_checkpoint_lsn;
143 guard.last_record_start = last_record_start;
144 guard.checkpoint_redo_start = checkpoint_redo_start;
145 }
146 self.persist()
147 }
148
149 pub fn snapshot(&self) -> ControlFileSnapshot {
150 let guard = self.inner.lock();
151 ControlFileSnapshot {
152 durable_lsn: guard.durable_lsn,
153 max_assigned_lsn: guard.max_assigned_lsn,
154 last_checkpoint_lsn: guard.last_checkpoint_lsn,
155 last_record_start: guard.last_record_start,
156 checkpoint_redo_start: guard.checkpoint_redo_start,
157 }
158 }
159
160 fn persist(&self) -> QuillSQLResult<()> {
161 let guard = self.inner.lock();
162 let bytes = bincode::serialize(&*guard)?;
163 drop(guard);
164 let tmp_path = self.path.with_extension("tmp");
165 fs::write(&tmp_path, &bytes)?;
166 fs::rename(tmp_path, &self.path)?;
167 Ok(())
168 }
169}
170
171fn generate_system_id() -> u128 {
172 random::<u128>()
173}