1use super::*;
2
3impl PhysicalMetadataFile {
4 pub fn from_state(
5 options: RedDBOptions,
6 catalog: CatalogSnapshot,
7 collection_roots: BTreeMap<String, u64>,
8 indexes: Vec<PhysicalIndexState>,
9 previous: Option<&PhysicalMetadataFile>,
10 ) -> Self {
11 let now = unix_ms_now();
12 let mut manifest = SchemaManifest::now(options, catalog.total_collections);
13 if let Some(previous) = previous {
14 manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
15 }
16 manifest.updated_at_unix_ms = now;
17
18 let sequence = previous
19 .map(|previous| previous.superblock.sequence.saturating_add(1))
20 .unwrap_or(1);
21 let mut manifest_events = previous
22 .map(|previous| previous.manifest_events.clone())
23 .unwrap_or_default();
24 manifest_events.extend(build_manifest_events(
25 previous.map(|previous| &previous.superblock.collection_roots),
26 &collection_roots,
27 sequence,
28 ));
29 trim_manifest_history(&mut manifest_events);
30
31 let superblock = SuperblockHeader {
32 format_version: REDDB_FORMAT_VERSION,
33 sequence,
34 copies: DEFAULT_SUPERBLOCK_COPIES,
35 manifest: previous
36 .map(|previous| previous.superblock.manifest.clone())
37 .unwrap_or_default(),
38 free_set: previous
39 .map(|previous| previous.superblock.free_set)
40 .unwrap_or_default(),
41 collection_roots,
42 };
43
44 let mut snapshots = previous
45 .map(|previous| previous.snapshots.clone())
46 .unwrap_or_default();
47 snapshots.push(SnapshotDescriptor {
48 snapshot_id: sequence,
49 created_at_unix_ms: now,
50 superblock_sequence: sequence,
51 collection_count: catalog.total_collections,
52 total_entities: catalog.total_entities,
53 });
54 trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
55
56 Self {
57 protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
58 generated_at_unix_ms: now,
59 last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
60 last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
61 manifest,
62 catalog,
63 manifest_events,
64 indexes,
65 graph_projections: previous
66 .map(|previous| previous.graph_projections.clone())
67 .unwrap_or_default(),
68 analytics_jobs: previous
69 .map(|previous| previous.analytics_jobs.clone())
70 .unwrap_or_default(),
71 tree_definitions: previous
72 .map(|previous| previous.tree_definitions.clone())
73 .unwrap_or_default(),
74 collection_ttl_defaults_ms: previous
75 .map(|previous| previous.collection_ttl_defaults_ms.clone())
76 .unwrap_or_default(),
77 collection_contracts: previous
78 .map(|previous| previous.collection_contracts.clone())
79 .unwrap_or_default(),
80 hypertables: previous
81 .map(|previous| previous.hypertables.clone())
82 .unwrap_or_default(),
83 exports: previous
84 .map(|previous| previous.exports.clone())
85 .unwrap_or_default(),
86 superblock,
87 snapshots,
88 }
89 }
90
91 pub fn metadata_path_for(data_path: &Path) -> PathBuf {
92 let file_name = data_path
93 .file_name()
94 .map(|name| name.to_string_lossy().into_owned())
95 .unwrap_or_else(|| "data.rdb".to_string());
96 let meta_file = format!("{file_name}.meta.json");
97 match data_path.parent() {
98 Some(parent) => parent.join(meta_file),
99 None => PathBuf::from(meta_file),
100 }
101 }
102
103 pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
104 let file_name = data_path
105 .file_name()
106 .map(|name| name.to_string_lossy().into_owned())
107 .unwrap_or_else(|| "data.rdb".to_string());
108 let meta_file = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}");
109 match data_path.parent() {
110 Some(parent) => parent.join(meta_file),
111 None => PathBuf::from(meta_file),
112 }
113 }
114
115 pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
116 let file_name = data_path
117 .file_name()
118 .map(|name| name.to_string_lossy().into_owned())
119 .unwrap_or_else(|| "data.rdb".to_string());
120 let meta_file =
121 format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-{sequence:020}");
122 match data_path.parent() {
123 Some(parent) => parent.join(meta_file),
124 None => PathBuf::from(meta_file),
125 }
126 }
127
128 pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
129 let file_name = data_path
130 .file_name()
131 .map(|name| name.to_string_lossy().into_owned())
132 .unwrap_or_else(|| "data.rdb".to_string());
133 let stem = file_name.strip_suffix(".rdb").unwrap_or(&file_name);
134 let export_file = format!("{stem}.export.{}.rdb", sanitize_export_name(name));
135 match data_path.parent() {
136 Some(parent) => parent.join(export_file),
137 None => PathBuf::from(export_file),
138 }
139 }
140
141 pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
142 Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
143 }
144
145 pub fn load_for_data_path_with_source(
146 data_path: &Path,
147 ) -> io::Result<(Self, PhysicalMetadataSource)> {
148 let binary_path = Self::metadata_binary_path_for(data_path);
149 if binary_path.exists() {
150 match Self::load_from_binary_path(&binary_path) {
151 Ok(metadata) => {
152 return Ok((metadata, PhysicalMetadataSource::Binary));
153 }
154 Err(_) => {
155 let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
156 journal_paths.reverse();
157 for journal_path in journal_paths {
158 if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
159 let healed =
160 metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
161 let _ = healed.heal_primary_metadata_for_data_path(data_path);
162 return Ok((healed, PhysicalMetadataSource::BinaryJournal));
163 }
164 }
165 }
166 }
167 }
168 Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
169 let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
170 let _ = healed.heal_primary_metadata_for_data_path(data_path);
171 (healed, PhysicalMetadataSource::Json)
172 })
173 }
174
175 pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
176 let binary_path = Self::metadata_binary_path_for(data_path);
177 if binary_path.exists() && super::seqn_journal_enabled() {
178 let sequence = Self::load_from_binary_path(&binary_path)
179 .map(|metadata| metadata.superblock.sequence)
180 .unwrap_or(self.superblock.sequence);
181 let journal_path = Self::metadata_journal_path_for(data_path, sequence);
182 let _ = fs::copy(&binary_path, journal_path);
183 }
184 self.save_to_binary_path(&binary_path)?;
185 self.prune_journal_for_data_path(data_path)?;
186 if super::meta_json_sidecar_enabled() {
187 let json_path = Self::metadata_path_for(data_path);
188 self.save_to_path(&json_path)?;
189 }
190 Ok(binary_path)
191 }
192
193 pub fn load_from_path(path: &Path) -> io::Result<Self> {
194 let text = fs::read_to_string(path)?;
195 let parsed = parse_json(&text).map_err(|err| {
196 io::Error::new(
197 io::ErrorKind::InvalidData,
198 format!("invalid physical metadata JSON: {err}"),
199 )
200 })?;
201 let json = JsonValue::from(parsed);
202 Self::from_json_value(&json)
203 }
204
205 pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
206 let text = self.to_json_value().to_string_pretty();
207 fs::write(path, text)
208 }
209
210 pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
211 let bytes = fs::read(path)?;
212 let json = from_slice::<JsonValue>(&bytes).map_err(|err| {
213 io::Error::new(
214 io::ErrorKind::InvalidData,
215 format!("invalid physical metadata binary: {err}"),
216 )
217 })?;
218 Self::from_json_value(&json)
219 }
220
221 pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
222 let bytes = to_vec(&self.to_json_value()).map_err(|err| {
223 io::Error::new(
224 io::ErrorKind::InvalidData,
225 format!("failed to encode physical metadata binary: {err}"),
226 )
227 })?;
228 fs::write(path, bytes)
229 }
230
231 pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
232 let Some(parent) = data_path.parent() else {
233 return Ok(Vec::new());
234 };
235 let file_name = data_path
236 .file_name()
237 .map(|name| name.to_string_lossy().into_owned())
238 .unwrap_or_else(|| "data.rdb".to_string());
239 let prefix = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-");
240
241 let mut paths = Vec::new();
242 for entry in fs::read_dir(parent)? {
243 let entry = entry?;
244 let path = entry.path();
245 let Some(name) = path.file_name().map(|name| name.to_string_lossy()) else {
246 continue;
247 };
248 if name.starts_with(&prefix) {
249 paths.push(path);
250 }
251 }
252 paths.sort();
253 Ok(paths)
254 }
255
256 pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
257 let retention = super::seqn_journal_retention();
258 let mut paths = Self::journal_paths_for_data_path(data_path)?;
259 if paths.len() <= retention {
260 return Ok(());
261 }
262 let delete_count = paths.len() - retention;
263 for path in paths.drain(0..delete_count) {
264 let _ = fs::remove_file(path);
265 }
266 Ok(())
267 }
268
269 pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
270 let binary_path = Self::metadata_binary_path_for(data_path);
271 self.save_to_binary_path(&binary_path)?;
272 if super::meta_json_sidecar_enabled() {
273 let json_path = Self::metadata_path_for(data_path);
274 self.save_to_path(&json_path)?;
275 }
276 Ok(())
277 }
278
279 pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
280 let mut metadata = self.clone();
281 metadata.last_loaded_from = Some(source.as_str().to_string());
282 metadata.last_healed_at_unix_ms = Some(unix_ms_now());
283 metadata
284 }
285
286 pub fn to_json_value(&self) -> JsonValue {
287 let mut root = Map::new();
288 root.insert(
289 "protocol_version".to_string(),
290 JsonValue::String(self.protocol_version.clone()),
291 );
292 root.insert(
293 "generated_at_unix_ms".to_string(),
294 json_u128(self.generated_at_unix_ms),
295 );
296 root.insert(
297 "last_loaded_from".to_string(),
298 self.last_loaded_from
299 .clone()
300 .map(JsonValue::String)
301 .unwrap_or(JsonValue::Null),
302 );
303 root.insert(
304 "last_healed_at_unix_ms".to_string(),
305 self.last_healed_at_unix_ms
306 .map(json_u128)
307 .unwrap_or(JsonValue::Null),
308 );
309 root.insert("manifest".to_string(), manifest_to_json(&self.manifest));
310 root.insert("catalog".to_string(), catalog_to_json(&self.catalog));
311 root.insert(
312 "manifest_events".to_string(),
313 JsonValue::Array(
314 self.manifest_events
315 .iter()
316 .map(manifest_event_to_json)
317 .collect(),
318 ),
319 );
320 root.insert(
321 "indexes".to_string(),
322 JsonValue::Array(self.indexes.iter().map(index_state_to_json).collect()),
323 );
324 root.insert(
325 "graph_projections".to_string(),
326 JsonValue::Array(
327 self.graph_projections
328 .iter()
329 .map(graph_projection_to_json)
330 .collect(),
331 ),
332 );
333 root.insert(
334 "analytics_jobs".to_string(),
335 JsonValue::Array(
336 self.analytics_jobs
337 .iter()
338 .map(analytics_job_to_json)
339 .collect(),
340 ),
341 );
342 root.insert(
343 "tree_definitions".to_string(),
344 JsonValue::Array(
345 self.tree_definitions
346 .iter()
347 .map(tree_definition_to_json)
348 .collect(),
349 ),
350 );
351 root.insert(
352 "collection_ttl_defaults_ms".to_string(),
353 JsonValue::Object(
354 self.collection_ttl_defaults_ms
355 .iter()
356 .map(|(collection, ttl_ms)| (collection.clone(), json_u64(*ttl_ms)))
357 .collect(),
358 ),
359 );
360 root.insert(
361 "collection_contracts".to_string(),
362 JsonValue::Array(
363 self.collection_contracts
364 .iter()
365 .map(collection_contract_to_json)
366 .collect(),
367 ),
368 );
369 root.insert(
370 "hypertables".to_string(),
371 JsonValue::Array(self.hypertables.iter().map(hypertable_to_json).collect()),
372 );
373 root.insert(
374 "exports".to_string(),
375 JsonValue::Array(self.exports.iter().map(export_descriptor_to_json).collect()),
376 );
377 root.insert(
378 "superblock".to_string(),
379 superblock_to_json(&self.superblock),
380 );
381 root.insert(
382 "snapshots".to_string(),
383 JsonValue::Array(
384 self.snapshots
385 .iter()
386 .map(snapshot_descriptor_to_json)
387 .collect(),
388 ),
389 );
390 JsonValue::Object(root)
391 }
392
393 pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
394 let object = expect_object(value, "physical metadata root")?;
395 Ok(Self {
396 protocol_version: json_string_required(object, "protocol_version")?,
397 generated_at_unix_ms: json_u128_required(object, "generated_at_unix_ms")?,
398 last_loaded_from: object
399 .get("last_loaded_from")
400 .and_then(JsonValue::as_str)
401 .map(|value| value.to_string()),
402 last_healed_at_unix_ms: object
403 .get("last_healed_at_unix_ms")
404 .map(json_u128_value)
405 .transpose()?,
406 manifest: manifest_from_json(json_required(object, "manifest")?)?,
407 catalog: catalog_from_json(json_required(object, "catalog")?)?,
408 manifest_events: object
409 .get("manifest_events")
410 .and_then(JsonValue::as_array)
411 .map(|values| {
412 values
413 .iter()
414 .map(manifest_event_from_json)
415 .collect::<io::Result<Vec<_>>>()
416 })
417 .transpose()?
418 .unwrap_or_default(),
419 indexes: object
420 .get("indexes")
421 .and_then(JsonValue::as_array)
422 .map(|values| {
423 values
424 .iter()
425 .map(index_state_from_json)
426 .collect::<io::Result<Vec<_>>>()
427 })
428 .transpose()?
429 .unwrap_or_default(),
430 graph_projections: object
431 .get("graph_projections")
432 .and_then(JsonValue::as_array)
433 .map(|values| {
434 values
435 .iter()
436 .map(graph_projection_from_json)
437 .collect::<io::Result<Vec<_>>>()
438 })
439 .transpose()?
440 .unwrap_or_default(),
441 analytics_jobs: object
442 .get("analytics_jobs")
443 .and_then(JsonValue::as_array)
444 .map(|values| {
445 values
446 .iter()
447 .map(analytics_job_from_json)
448 .collect::<io::Result<Vec<_>>>()
449 })
450 .transpose()?
451 .unwrap_or_default(),
452 tree_definitions: object
453 .get("tree_definitions")
454 .and_then(JsonValue::as_array)
455 .map(|values| {
456 values
457 .iter()
458 .map(tree_definition_from_json)
459 .collect::<io::Result<Vec<_>>>()
460 })
461 .transpose()?
462 .unwrap_or_default(),
463 collection_ttl_defaults_ms: object
464 .get("collection_ttl_defaults_ms")
465 .and_then(JsonValue::as_object)
466 .map(|values| {
467 values
468 .iter()
469 .filter_map(|(collection, ttl_ms)| {
470 json_u64_value(ttl_ms)
471 .ok()
472 .map(|ttl_ms| (collection.clone(), ttl_ms))
473 })
474 .collect()
475 })
476 .unwrap_or_default(),
477 collection_contracts: object
478 .get("collection_contracts")
479 .and_then(JsonValue::as_array)
480 .map(|values| {
481 values
482 .iter()
483 .map(collection_contract_from_json)
484 .collect::<io::Result<Vec<_>>>()
485 })
486 .transpose()?
487 .unwrap_or_default(),
488 hypertables: object
489 .get("hypertables")
490 .and_then(JsonValue::as_array)
491 .map(|values| {
492 values
493 .iter()
494 .map(hypertable_from_json)
495 .collect::<io::Result<Vec<_>>>()
496 })
497 .transpose()?
498 .unwrap_or_default(),
499 exports: object
500 .get("exports")
501 .and_then(JsonValue::as_array)
502 .map(|values| {
503 values
504 .iter()
505 .map(export_descriptor_from_json)
506 .collect::<io::Result<Vec<_>>>()
507 })
508 .transpose()?
509 .unwrap_or_default(),
510 superblock: superblock_from_json(json_required(object, "superblock")?)?,
511 snapshots: json_required(object, "snapshots")?
512 .as_array()
513 .ok_or_else(|| invalid_data("field 'snapshots' must be an array"))?
514 .iter()
515 .map(snapshot_descriptor_from_json)
516 .collect::<io::Result<Vec<_>>>()?,
517 })
518 }
519}