1use std::fs::OpenOptions;
24use std::io::Write;
25use std::path::Path;
26
27use serde::{Deserialize, Serialize};
28
29use crate::error::{CoreError, Result};
30use crate::ids::{CollectionId, Lsn};
31use crate::page::{PageCodec, PageType};
32use crate::paged::{fsync_dir, read_paged, write_paged};
33
34pub const MANIFEST_FORMAT_VERSION: u16 = 2;
38
39const CURRENT_FILE: &str = "CURRENT";
40const CURRENT_TMP: &str = "CURRENT.tmp";
41
42fn manifest_file_name(version: u64) -> String {
43 format!("manifest-{version:010}")
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct SegmentRef {
50 pub id: u64,
52 pub row_count: u64,
54 pub lsn_low: Lsn,
56 pub lsn_high: Lsn,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct IndexSnapshotRef {
63 pub id: u64,
66 pub lsn: Lsn,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct CollectionEntry {
75 pub id: CollectionId,
77 pub name: String,
79 pub descriptor: Vec<u8>,
81 pub segments: Vec<SegmentRef>,
83 pub index_snapshot: Option<IndexSnapshotRef>,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct Manifest {
93 pub format_version: u16,
95 pub version: u64,
97 pub last_checkpointed_lsn: Lsn,
99 pub next_collection_id: u64,
101 pub next_segment_id: u64,
103 pub collections: Vec<CollectionEntry>,
105}
106
107impl Default for Manifest {
108 fn default() -> Self {
109 Self::empty()
110 }
111}
112
113impl Manifest {
114 #[must_use]
116 pub fn empty() -> Self {
117 Self {
118 format_version: MANIFEST_FORMAT_VERSION,
119 version: 0,
120 last_checkpointed_lsn: Lsn::ZERO,
121 next_collection_id: 0,
122 next_segment_id: 0,
123 collections: Vec::new(),
124 }
125 }
126
127 #[must_use]
129 pub fn collection(&self, id: CollectionId) -> Option<&CollectionEntry> {
130 self.collections.iter().find(|c| c.id == id)
131 }
132
133 #[must_use]
135 pub fn collection_by_name(&self, name: &str) -> Option<&CollectionEntry> {
136 self.collections.iter().find(|c| c.name == name)
137 }
138}
139
140pub fn write_manifest(dir: &Path, manifest: &Manifest, codec: &dyn PageCodec) -> Result<()> {
143 let body = postcard::to_allocvec(manifest)?;
144
145 let file_name = manifest_file_name(manifest.version);
147 let manifest_path = dir.join(&file_name);
148 write_paged(
149 &manifest_path,
150 codec,
151 PageType::Manifest,
152 manifest.version,
153 &body,
154 )?;
155 fsync_dir(dir)?;
157
158 let tmp_path = dir.join(CURRENT_TMP);
160 {
161 let mut f = OpenOptions::new()
162 .create(true)
163 .truncate(true)
164 .write(true)
165 .open(&tmp_path)
166 .map_err(|e| CoreError::io(&tmp_path, e))?;
167 f.write_all(file_name.as_bytes())
168 .map_err(|e| CoreError::io(&tmp_path, e))?;
169 f.write_all(b"\n")
170 .map_err(|e| CoreError::io(&tmp_path, e))?;
171 f.sync_data().map_err(|e| CoreError::io(&tmp_path, e))?;
172 }
173 let current_path = dir.join(CURRENT_FILE);
175 std::fs::rename(&tmp_path, ¤t_path).map_err(|e| CoreError::io(¤t_path, e))?;
176 fsync_dir(dir)?;
177 Ok(())
178}
179
180pub fn read_current(dir: &Path, codec: &dyn PageCodec) -> Result<Option<Manifest>> {
183 let current_path = dir.join(CURRENT_FILE);
184 let name = match std::fs::read_to_string(¤t_path) {
185 Ok(s) => s.trim().to_owned(),
186 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
187 Err(e) => return Err(CoreError::io(¤t_path, e)),
188 };
189 if name.is_empty() {
190 return Err(CoreError::MalformedPage("CURRENT is empty".to_owned()));
191 }
192 let manifest = read_manifest_file(&dir.join(&name), codec)?;
193 Ok(Some(manifest))
194}
195
196#[derive(Deserialize)]
200struct VersionPeek {
201 format_version: u16,
202}
203
204#[derive(Deserialize)]
208#[cfg_attr(test, derive(Serialize))]
209struct CollectionEntryV1 {
210 id: CollectionId,
211 name: String,
212 descriptor: Vec<u8>,
213 segments: Vec<SegmentRef>,
214}
215
216#[derive(Deserialize)]
217#[cfg_attr(test, derive(Serialize))]
218struct ManifestV1Body {
219 version: u64,
220 last_checkpointed_lsn: Lsn,
221 next_collection_id: u64,
222 next_segment_id: u64,
223 collections: Vec<CollectionEntryV1>,
224}
225
226impl From<ManifestV1Body> for Manifest {
227 fn from(m: ManifestV1Body) -> Self {
228 Self {
229 format_version: MANIFEST_FORMAT_VERSION,
230 version: m.version,
231 last_checkpointed_lsn: m.last_checkpointed_lsn,
232 next_collection_id: m.next_collection_id,
233 next_segment_id: m.next_segment_id,
234 collections: m
235 .collections
236 .into_iter()
237 .map(|c| CollectionEntry {
238 id: c.id,
239 name: c.name,
240 descriptor: c.descriptor,
241 segments: c.segments,
242 index_snapshot: None,
243 })
244 .collect(),
245 }
246 }
247}
248
249fn read_manifest_file(path: &Path, codec: &dyn PageCodec) -> Result<Manifest> {
250 let body = read_paged(path, codec, PageType::Manifest)?;
251 let (peek, rest) = postcard::take_from_bytes::<VersionPeek>(&body)?;
254 match peek.format_version {
255 1 => Ok(postcard::from_bytes::<ManifestV1Body>(rest)?.into()),
256 v if v == MANIFEST_FORMAT_VERSION => Ok(postcard::from_bytes::<Manifest>(&body)?),
257 other => Err(CoreError::UnsupportedVersion {
258 found: other,
259 supported: MANIFEST_FORMAT_VERSION,
260 }),
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use crate::page::{PAGE_BODY_CAP, PAGE_SIZE, PlainCodec};
268
269 fn sample(version: u64, n_collections: usize, desc_len: usize) -> Manifest {
270 let collections = (0..n_collections)
271 .map(|c| CollectionEntry {
272 id: CollectionId(c as u64),
273 name: format!("col-{c}"),
274 descriptor: vec![(c % 256) as u8; desc_len],
275 segments: vec![SegmentRef {
276 id: c as u64,
277 row_count: 10 * c as u64,
278 lsn_low: Lsn(c as u64),
279 lsn_high: Lsn(c as u64 + 5),
280 }],
281 index_snapshot: None,
282 })
283 .collect();
284 Manifest {
285 format_version: MANIFEST_FORMAT_VERSION,
286 version,
287 last_checkpointed_lsn: Lsn(version),
288 next_collection_id: n_collections as u64,
289 next_segment_id: n_collections as u64,
290 collections,
291 }
292 }
293
294 #[test]
295 fn fresh_dir_has_no_manifest() {
296 let dir = tempfile::tempdir().unwrap();
297 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), None);
298 }
299
300 #[test]
301 fn write_then_read_roundtrips() {
302 let dir = tempfile::tempdir().unwrap();
303 let m = sample(1, 3, 16);
304 write_manifest(dir.path(), &m, &PlainCodec).unwrap();
305 let back = read_current(dir.path(), &PlainCodec).unwrap();
306 assert_eq!(back, Some(m));
307 }
308
309 #[test]
310 fn newer_version_supersedes() {
311 let dir = tempfile::tempdir().unwrap();
312 write_manifest(dir.path(), &sample(1, 1, 8), &PlainCodec).unwrap();
313 let v2 = sample(2, 2, 8);
314 write_manifest(dir.path(), &v2, &PlainCodec).unwrap();
315 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v2));
316 }
317
318 #[test]
319 fn multi_page_manifest_roundtrips() {
320 let dir = tempfile::tempdir().unwrap();
321 let m = sample(7, 2, PAGE_BODY_CAP);
323 write_manifest(dir.path(), &m, &PlainCodec).unwrap();
324 let bytes = std::fs::read(dir.path().join(manifest_file_name(7))).unwrap();
326 assert!(bytes.len() > PAGE_SIZE * 2);
327 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(m));
328 }
329
330 #[test]
331 fn temp_pointer_is_renamed_away() {
332 let dir = tempfile::tempdir().unwrap();
333 write_manifest(dir.path(), &sample(1, 1, 8), &PlainCodec).unwrap();
334 assert!(!dir.path().join(CURRENT_TMP).exists());
335 assert!(dir.path().join(CURRENT_FILE).exists());
336 }
337
338 #[test]
339 fn orphan_manifest_without_current_swap_is_ignored() {
340 let dir = tempfile::tempdir().unwrap();
341 let v1 = sample(1, 1, 8);
342 write_manifest(dir.path(), &v1, &PlainCodec).unwrap();
343 std::fs::write(dir.path().join(manifest_file_name(2)), b"garbage").unwrap();
346 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v1));
348 }
349
350 #[test]
351 fn stale_current_tmp_does_not_affect_read() {
352 let dir = tempfile::tempdir().unwrap();
353 let v1 = sample(1, 1, 8);
354 write_manifest(dir.path(), &v1, &PlainCodec).unwrap();
355 std::fs::write(dir.path().join(CURRENT_TMP), b"manifest-9999999999\n").unwrap();
356 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v1));
357 }
358
359 #[test]
360 fn corrupt_manifest_page_is_detected() {
361 let dir = tempfile::tempdir().unwrap();
362 write_manifest(dir.path(), &sample(1, 1, 64), &PlainCodec).unwrap();
363 let path = dir.path().join(manifest_file_name(1));
364 let mut bytes = std::fs::read(&path).unwrap();
365 bytes[64] ^= 0xFF;
367 std::fs::write(&path, &bytes).unwrap();
368 assert!(matches!(
369 read_current(dir.path(), &PlainCodec),
370 Err(CoreError::PageCorrupt { .. })
371 ));
372 }
373
374 #[test]
375 fn accessors_find_collections() {
376 let m = sample(1, 3, 8);
377 assert_eq!(
378 m.collection(CollectionId(1)).map(|c| c.name.as_str()),
379 Some("col-1")
380 );
381 assert_eq!(
382 m.collection_by_name("col-2").map(|c| c.id),
383 Some(CollectionId(2))
384 );
385 assert!(m.collection(CollectionId(99)).is_none());
386 assert!(m.collection_by_name("nope").is_none());
387 }
388
389 #[test]
390 fn v2_manifest_round_trips_an_index_snapshot() {
391 let dir = tempfile::tempdir().unwrap();
392 let mut m = sample(4, 2, 8);
393 m.collections[0].index_snapshot = Some(IndexSnapshotRef {
394 id: 4,
395 lsn: Lsn(99),
396 });
397 write_manifest(dir.path(), &m, &PlainCodec).unwrap();
398 assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(m));
399 }
400
401 #[test]
402 fn v1_manifest_without_index_snapshot_opens_and_upgrades() {
403 let dir = tempfile::tempdir().unwrap();
404 let v1 = ManifestV1Body {
406 version: 3,
407 last_checkpointed_lsn: Lsn(42),
408 next_collection_id: 2,
409 next_segment_id: 5,
410 collections: vec![CollectionEntryV1 {
411 id: CollectionId(0),
412 name: "legacy".to_owned(),
413 descriptor: vec![1, 2, 3],
414 segments: vec![SegmentRef {
415 id: 0,
416 row_count: 7,
417 lsn_low: Lsn(1),
418 lsn_high: Lsn(9),
419 }],
420 }],
421 };
422 let mut body = postcard::to_allocvec(&1u16).unwrap();
424 body.extend_from_slice(&postcard::to_allocvec(&v1).unwrap());
425 write_paged(
426 &dir.path().join(manifest_file_name(3)),
427 &PlainCodec,
428 PageType::Manifest,
429 3,
430 &body,
431 )
432 .unwrap();
433 std::fs::write(
434 dir.path().join(CURRENT_FILE),
435 format!("{}\n", manifest_file_name(3)),
436 )
437 .unwrap();
438
439 let got = read_current(dir.path(), &PlainCodec).unwrap().unwrap();
440 assert_eq!(got.format_version, MANIFEST_FORMAT_VERSION);
441 assert_eq!(got.version, 3);
442 assert_eq!(got.last_checkpointed_lsn, Lsn(42));
443 assert_eq!(got.collections.len(), 1);
444 assert_eq!(got.collections[0].name, "legacy");
445 assert_eq!(got.collections[0].index_snapshot, None);
446 }
447
448 #[test]
449 fn future_manifest_version_is_rejected() {
450 let dir = tempfile::tempdir().unwrap();
451 let mut m = sample(1, 1, 8);
452 m.format_version = 999;
453 write_manifest(dir.path(), &m, &PlainCodec).unwrap();
454 assert!(matches!(
455 read_current(dir.path(), &PlainCodec),
456 Err(CoreError::UnsupportedVersion { found: 999, .. })
457 ));
458 }
459}