1use super::*;
2
3impl PhysicalMetadataFile {
4 pub fn from_state(
5 options: RedDBOptions,
6 catalog: CatalogSnapshot,
7 collection_roots: BTreeMap<String, u64>,
8 indexes: Vec<PhysicalIndexState>,
9 previous: Option<&PhysicalMetadataFile>,
10 ) -> Self {
11 let now = unix_ms_now();
12 let mut manifest = SchemaManifest::now(options, catalog.total_collections);
13 if let Some(previous) = previous {
14 manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
15 }
16 manifest.updated_at_unix_ms = now;
17
18 let sequence = previous
19 .map(|previous| previous.superblock.sequence.saturating_add(1))
20 .unwrap_or(1);
21 let mut manifest_events = previous
22 .map(|previous| previous.manifest_events.clone())
23 .unwrap_or_default();
24 manifest_events.extend(build_manifest_events(
25 previous.map(|previous| &previous.superblock.collection_roots),
26 &collection_roots,
27 sequence,
28 ));
29 trim_manifest_history(&mut manifest_events);
30
31 let superblock = SuperblockHeader {
32 format_version: REDDB_FORMAT_VERSION,
33 sequence,
34 copies: DEFAULT_SUPERBLOCK_COPIES,
35 manifest: previous
36 .map(|previous| previous.superblock.manifest.clone())
37 .unwrap_or_default(),
38 free_set: previous
39 .map(|previous| previous.superblock.free_set)
40 .unwrap_or_default(),
41 collection_roots,
42 };
43
44 let mut snapshots = previous
45 .map(|previous| previous.snapshots.clone())
46 .unwrap_or_default();
47 snapshots.push(SnapshotDescriptor {
48 snapshot_id: sequence,
49 created_at_unix_ms: now,
50 superblock_sequence: sequence,
51 collection_count: catalog.total_collections,
52 total_entities: catalog.total_entities,
53 });
54 trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
55
56 Self {
57 protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
58 generated_at_unix_ms: now,
59 last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
60 last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
61 manifest,
62 catalog,
63 manifest_events,
64 indexes,
65 graph_projections: previous
66 .map(|previous| previous.graph_projections.clone())
67 .unwrap_or_default(),
68 analytics_jobs: previous
69 .map(|previous| previous.analytics_jobs.clone())
70 .unwrap_or_default(),
71 tree_definitions: previous
72 .map(|previous| previous.tree_definitions.clone())
73 .unwrap_or_default(),
74 collection_ttl_defaults_ms: previous
75 .map(|previous| previous.collection_ttl_defaults_ms.clone())
76 .unwrap_or_default(),
77 collection_contracts: previous
78 .map(|previous| previous.collection_contracts.clone())
79 .unwrap_or_default(),
80 exports: previous
81 .map(|previous| previous.exports.clone())
82 .unwrap_or_default(),
83 superblock,
84 snapshots,
85 }
86 }
87
88 pub fn metadata_path_for(data_path: &Path) -> PathBuf {
89 let file_name = data_path
90 .file_name()
91 .map(|name| name.to_string_lossy().into_owned())
92 .unwrap_or_else(|| "data.rdb".to_string());
93 let meta_file = format!("{file_name}.meta.json");
94 match data_path.parent() {
95 Some(parent) => parent.join(meta_file),
96 None => PathBuf::from(meta_file),
97 }
98 }
99
100 pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
101 let file_name = data_path
102 .file_name()
103 .map(|name| name.to_string_lossy().into_owned())
104 .unwrap_or_else(|| "data.rdb".to_string());
105 let meta_file = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}");
106 match data_path.parent() {
107 Some(parent) => parent.join(meta_file),
108 None => PathBuf::from(meta_file),
109 }
110 }
111
112 pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
113 let file_name = data_path
114 .file_name()
115 .map(|name| name.to_string_lossy().into_owned())
116 .unwrap_or_else(|| "data.rdb".to_string());
117 let meta_file =
118 format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-{sequence:020}");
119 match data_path.parent() {
120 Some(parent) => parent.join(meta_file),
121 None => PathBuf::from(meta_file),
122 }
123 }
124
125 pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
126 let file_name = data_path
127 .file_name()
128 .map(|name| name.to_string_lossy().into_owned())
129 .unwrap_or_else(|| "data.rdb".to_string());
130 let stem = file_name.strip_suffix(".rdb").unwrap_or(&file_name);
131 let export_file = format!("{stem}.export.{}.rdb", sanitize_export_name(name));
132 match data_path.parent() {
133 Some(parent) => parent.join(export_file),
134 None => PathBuf::from(export_file),
135 }
136 }
137
138 pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
139 Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
140 }
141
142 pub fn load_for_data_path_with_source(
143 data_path: &Path,
144 ) -> io::Result<(Self, PhysicalMetadataSource)> {
145 let binary_path = Self::metadata_binary_path_for(data_path);
146 if binary_path.exists() {
147 match Self::load_from_binary_path(&binary_path) {
148 Ok(metadata) => {
149 return Ok((metadata, PhysicalMetadataSource::Binary));
150 }
151 Err(_) => {
152 let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
153 journal_paths.reverse();
154 for journal_path in journal_paths {
155 if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
156 let healed =
157 metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
158 let _ = healed.heal_primary_metadata_for_data_path(data_path);
159 return Ok((healed, PhysicalMetadataSource::BinaryJournal));
160 }
161 }
162 }
163 }
164 }
165 Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
166 let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
167 let _ = healed.heal_primary_metadata_for_data_path(data_path);
168 (healed, PhysicalMetadataSource::Json)
169 })
170 }
171
172 pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
173 let binary_path = Self::metadata_binary_path_for(data_path);
174 if binary_path.exists() && super::seqn_journal_enabled() {
175 let sequence = Self::load_from_binary_path(&binary_path)
176 .map(|metadata| metadata.superblock.sequence)
177 .unwrap_or(self.superblock.sequence);
178 let journal_path = Self::metadata_journal_path_for(data_path, sequence);
179 let _ = fs::copy(&binary_path, journal_path);
180 }
181 self.save_to_binary_path(&binary_path)?;
182 self.prune_journal_for_data_path(data_path)?;
183 if super::meta_json_sidecar_enabled() {
184 let json_path = Self::metadata_path_for(data_path);
185 self.save_to_path(&json_path)?;
186 }
187 Ok(binary_path)
188 }
189
190 pub fn load_from_path(path: &Path) -> io::Result<Self> {
191 let text = fs::read_to_string(path)?;
192 let parsed = parse_json(&text).map_err(|err| {
193 io::Error::new(
194 io::ErrorKind::InvalidData,
195 format!("invalid physical metadata JSON: {err}"),
196 )
197 })?;
198 let json = JsonValue::from(parsed);
199 Self::from_json_value(&json)
200 }
201
202 pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
203 let text = self.to_json_value().to_string_pretty();
204 fs::write(path, text)
205 }
206
207 pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
208 let bytes = fs::read(path)?;
209 let json = from_slice::<JsonValue>(&bytes).map_err(|err| {
210 io::Error::new(
211 io::ErrorKind::InvalidData,
212 format!("invalid physical metadata binary: {err}"),
213 )
214 })?;
215 Self::from_json_value(&json)
216 }
217
218 pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
219 let bytes = to_vec(&self.to_json_value()).map_err(|err| {
220 io::Error::new(
221 io::ErrorKind::InvalidData,
222 format!("failed to encode physical metadata binary: {err}"),
223 )
224 })?;
225 fs::write(path, bytes)
226 }
227
228 pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
229 let Some(parent) = data_path.parent() else {
230 return Ok(Vec::new());
231 };
232 let file_name = data_path
233 .file_name()
234 .map(|name| name.to_string_lossy().into_owned())
235 .unwrap_or_else(|| "data.rdb".to_string());
236 let prefix = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-");
237
238 let mut paths = Vec::new();
239 for entry in fs::read_dir(parent)? {
240 let entry = entry?;
241 let path = entry.path();
242 let Some(name) = path.file_name().map(|name| name.to_string_lossy()) else {
243 continue;
244 };
245 if name.starts_with(&prefix) {
246 paths.push(path);
247 }
248 }
249 paths.sort();
250 Ok(paths)
251 }
252
253 pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
254 let retention = super::seqn_journal_retention();
255 let mut paths = Self::journal_paths_for_data_path(data_path)?;
256 if paths.len() <= retention {
257 return Ok(());
258 }
259 let delete_count = paths.len() - retention;
260 for path in paths.drain(0..delete_count) {
261 let _ = fs::remove_file(path);
262 }
263 Ok(())
264 }
265
266 pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
267 let binary_path = Self::metadata_binary_path_for(data_path);
268 self.save_to_binary_path(&binary_path)?;
269 if super::meta_json_sidecar_enabled() {
270 let json_path = Self::metadata_path_for(data_path);
271 self.save_to_path(&json_path)?;
272 }
273 Ok(())
274 }
275
276 pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
277 let mut metadata = self.clone();
278 metadata.last_loaded_from = Some(source.as_str().to_string());
279 metadata.last_healed_at_unix_ms = Some(unix_ms_now());
280 metadata
281 }
282
283 pub fn to_json_value(&self) -> JsonValue {
284 let mut root = Map::new();
285 root.insert(
286 "protocol_version".to_string(),
287 JsonValue::String(self.protocol_version.clone()),
288 );
289 root.insert(
290 "generated_at_unix_ms".to_string(),
291 json_u128(self.generated_at_unix_ms),
292 );
293 root.insert(
294 "last_loaded_from".to_string(),
295 self.last_loaded_from
296 .clone()
297 .map(JsonValue::String)
298 .unwrap_or(JsonValue::Null),
299 );
300 root.insert(
301 "last_healed_at_unix_ms".to_string(),
302 self.last_healed_at_unix_ms
303 .map(json_u128)
304 .unwrap_or(JsonValue::Null),
305 );
306 root.insert("manifest".to_string(), manifest_to_json(&self.manifest));
307 root.insert("catalog".to_string(), catalog_to_json(&self.catalog));
308 root.insert(
309 "manifest_events".to_string(),
310 JsonValue::Array(
311 self.manifest_events
312 .iter()
313 .map(manifest_event_to_json)
314 .collect(),
315 ),
316 );
317 root.insert(
318 "indexes".to_string(),
319 JsonValue::Array(self.indexes.iter().map(index_state_to_json).collect()),
320 );
321 root.insert(
322 "graph_projections".to_string(),
323 JsonValue::Array(
324 self.graph_projections
325 .iter()
326 .map(graph_projection_to_json)
327 .collect(),
328 ),
329 );
330 root.insert(
331 "analytics_jobs".to_string(),
332 JsonValue::Array(
333 self.analytics_jobs
334 .iter()
335 .map(analytics_job_to_json)
336 .collect(),
337 ),
338 );
339 root.insert(
340 "tree_definitions".to_string(),
341 JsonValue::Array(
342 self.tree_definitions
343 .iter()
344 .map(tree_definition_to_json)
345 .collect(),
346 ),
347 );
348 root.insert(
349 "collection_ttl_defaults_ms".to_string(),
350 JsonValue::Object(
351 self.collection_ttl_defaults_ms
352 .iter()
353 .map(|(collection, ttl_ms)| (collection.clone(), json_u64(*ttl_ms)))
354 .collect(),
355 ),
356 );
357 root.insert(
358 "collection_contracts".to_string(),
359 JsonValue::Array(
360 self.collection_contracts
361 .iter()
362 .map(collection_contract_to_json)
363 .collect(),
364 ),
365 );
366 root.insert(
367 "exports".to_string(),
368 JsonValue::Array(self.exports.iter().map(export_descriptor_to_json).collect()),
369 );
370 root.insert(
371 "superblock".to_string(),
372 superblock_to_json(&self.superblock),
373 );
374 root.insert(
375 "snapshots".to_string(),
376 JsonValue::Array(
377 self.snapshots
378 .iter()
379 .map(snapshot_descriptor_to_json)
380 .collect(),
381 ),
382 );
383 JsonValue::Object(root)
384 }
385
386 pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
387 let object = expect_object(value, "physical metadata root")?;
388 Ok(Self {
389 protocol_version: json_string_required(object, "protocol_version")?,
390 generated_at_unix_ms: json_u128_required(object, "generated_at_unix_ms")?,
391 last_loaded_from: object
392 .get("last_loaded_from")
393 .and_then(JsonValue::as_str)
394 .map(|value| value.to_string()),
395 last_healed_at_unix_ms: object
396 .get("last_healed_at_unix_ms")
397 .map(json_u128_value)
398 .transpose()?,
399 manifest: manifest_from_json(json_required(object, "manifest")?)?,
400 catalog: catalog_from_json(json_required(object, "catalog")?)?,
401 manifest_events: object
402 .get("manifest_events")
403 .and_then(JsonValue::as_array)
404 .map(|values| {
405 values
406 .iter()
407 .map(manifest_event_from_json)
408 .collect::<io::Result<Vec<_>>>()
409 })
410 .transpose()?
411 .unwrap_or_default(),
412 indexes: object
413 .get("indexes")
414 .and_then(JsonValue::as_array)
415 .map(|values| {
416 values
417 .iter()
418 .map(index_state_from_json)
419 .collect::<io::Result<Vec<_>>>()
420 })
421 .transpose()?
422 .unwrap_or_default(),
423 graph_projections: object
424 .get("graph_projections")
425 .and_then(JsonValue::as_array)
426 .map(|values| {
427 values
428 .iter()
429 .map(graph_projection_from_json)
430 .collect::<io::Result<Vec<_>>>()
431 })
432 .transpose()?
433 .unwrap_or_default(),
434 analytics_jobs: object
435 .get("analytics_jobs")
436 .and_then(JsonValue::as_array)
437 .map(|values| {
438 values
439 .iter()
440 .map(analytics_job_from_json)
441 .collect::<io::Result<Vec<_>>>()
442 })
443 .transpose()?
444 .unwrap_or_default(),
445 tree_definitions: object
446 .get("tree_definitions")
447 .and_then(JsonValue::as_array)
448 .map(|values| {
449 values
450 .iter()
451 .map(tree_definition_from_json)
452 .collect::<io::Result<Vec<_>>>()
453 })
454 .transpose()?
455 .unwrap_or_default(),
456 collection_ttl_defaults_ms: object
457 .get("collection_ttl_defaults_ms")
458 .and_then(JsonValue::as_object)
459 .map(|values| {
460 values
461 .iter()
462 .filter_map(|(collection, ttl_ms)| {
463 json_u64_value(ttl_ms)
464 .ok()
465 .map(|ttl_ms| (collection.clone(), ttl_ms))
466 })
467 .collect()
468 })
469 .unwrap_or_default(),
470 collection_contracts: object
471 .get("collection_contracts")
472 .and_then(JsonValue::as_array)
473 .map(|values| {
474 values
475 .iter()
476 .map(collection_contract_from_json)
477 .collect::<io::Result<Vec<_>>>()
478 })
479 .transpose()?
480 .unwrap_or_default(),
481 exports: object
482 .get("exports")
483 .and_then(JsonValue::as_array)
484 .map(|values| {
485 values
486 .iter()
487 .map(export_descriptor_from_json)
488 .collect::<io::Result<Vec<_>>>()
489 })
490 .transpose()?
491 .unwrap_or_default(),
492 superblock: superblock_from_json(json_required(object, "superblock")?)?,
493 snapshots: json_required(object, "snapshots")?
494 .as_array()
495 .ok_or_else(|| invalid_data("field 'snapshots' must be an array"))?
496 .iter()
497 .map(snapshot_descriptor_from_json)
498 .collect::<io::Result<Vec<_>>>()?,
499 })
500 }
501}