Skip to main content

reddb_server/physical/
metadata_file.rs

1use super::*;
2
3impl PhysicalMetadataFile {
4    pub fn from_state(
5        options: RedDBOptions,
6        catalog: CatalogSnapshot,
7        collection_roots: BTreeMap<String, u64>,
8        indexes: Vec<PhysicalIndexState>,
9        previous: Option<&PhysicalMetadataFile>,
10    ) -> Self {
11        let now = unix_ms_now();
12        let mut manifest = SchemaManifest::now(options, catalog.total_collections);
13        if let Some(previous) = previous {
14            manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
15        }
16        manifest.updated_at_unix_ms = now;
17
18        let sequence = previous
19            .map(|previous| previous.superblock.sequence.saturating_add(1))
20            .unwrap_or(1);
21        let mut manifest_events = previous
22            .map(|previous| previous.manifest_events.clone())
23            .unwrap_or_default();
24        manifest_events.extend(build_manifest_events(
25            previous.map(|previous| &previous.superblock.collection_roots),
26            &collection_roots,
27            sequence,
28        ));
29        trim_manifest_history(&mut manifest_events);
30
31        let superblock = SuperblockHeader {
32            format_version: REDDB_FORMAT_VERSION,
33            sequence,
34            copies: DEFAULT_SUPERBLOCK_COPIES,
35            manifest: previous
36                .map(|previous| previous.superblock.manifest.clone())
37                .unwrap_or_default(),
38            free_set: previous
39                .map(|previous| previous.superblock.free_set)
40                .unwrap_or_default(),
41            collection_roots,
42        };
43
44        let mut snapshots = previous
45            .map(|previous| previous.snapshots.clone())
46            .unwrap_or_default();
47        snapshots.push(SnapshotDescriptor {
48            snapshot_id: sequence,
49            created_at_unix_ms: now,
50            superblock_sequence: sequence,
51            collection_count: catalog.total_collections,
52            total_entities: catalog.total_entities,
53        });
54        trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
55
56        Self {
57            protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
58            generated_at_unix_ms: now,
59            last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
60            last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
61            manifest,
62            catalog,
63            manifest_events,
64            indexes,
65            graph_projections: previous
66                .map(|previous| previous.graph_projections.clone())
67                .unwrap_or_default(),
68            analytics_jobs: previous
69                .map(|previous| previous.analytics_jobs.clone())
70                .unwrap_or_default(),
71            tree_definitions: previous
72                .map(|previous| previous.tree_definitions.clone())
73                .unwrap_or_default(),
74            collection_ttl_defaults_ms: previous
75                .map(|previous| previous.collection_ttl_defaults_ms.clone())
76                .unwrap_or_default(),
77            collection_contracts: previous
78                .map(|previous| previous.collection_contracts.clone())
79                .unwrap_or_default(),
80            hypertables: previous
81                .map(|previous| previous.hypertables.clone())
82                .unwrap_or_default(),
83            exports: previous
84                .map(|previous| previous.exports.clone())
85                .unwrap_or_default(),
86            superblock,
87            snapshots,
88        }
89    }
90
91    pub fn metadata_path_for(data_path: &Path) -> PathBuf {
92        let file_name = data_path
93            .file_name()
94            .map(|name| name.to_string_lossy().into_owned())
95            .unwrap_or_else(|| "data.rdb".to_string());
96        let meta_file = format!("{file_name}.meta.json");
97        match data_path.parent() {
98            Some(parent) => parent.join(meta_file),
99            None => PathBuf::from(meta_file),
100        }
101    }
102
103    pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
104        let file_name = data_path
105            .file_name()
106            .map(|name| name.to_string_lossy().into_owned())
107            .unwrap_or_else(|| "data.rdb".to_string());
108        let meta_file = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}");
109        match data_path.parent() {
110            Some(parent) => parent.join(meta_file),
111            None => PathBuf::from(meta_file),
112        }
113    }
114
115    pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
116        let file_name = data_path
117            .file_name()
118            .map(|name| name.to_string_lossy().into_owned())
119            .unwrap_or_else(|| "data.rdb".to_string());
120        let meta_file =
121            format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-{sequence:020}");
122        match data_path.parent() {
123            Some(parent) => parent.join(meta_file),
124            None => PathBuf::from(meta_file),
125        }
126    }
127
128    pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
129        let file_name = data_path
130            .file_name()
131            .map(|name| name.to_string_lossy().into_owned())
132            .unwrap_or_else(|| "data.rdb".to_string());
133        let stem = file_name.strip_suffix(".rdb").unwrap_or(&file_name);
134        let export_file = format!("{stem}.export.{}.rdb", sanitize_export_name(name));
135        match data_path.parent() {
136            Some(parent) => parent.join(export_file),
137            None => PathBuf::from(export_file),
138        }
139    }
140
141    pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
142        Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
143    }
144
145    pub fn load_for_data_path_with_source(
146        data_path: &Path,
147    ) -> io::Result<(Self, PhysicalMetadataSource)> {
148        let binary_path = Self::metadata_binary_path_for(data_path);
149        if binary_path.exists() {
150            match Self::load_from_binary_path(&binary_path) {
151                Ok(metadata) => {
152                    return Ok((metadata, PhysicalMetadataSource::Binary));
153                }
154                Err(_) => {
155                    let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
156                    journal_paths.reverse();
157                    for journal_path in journal_paths {
158                        if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
159                            let healed =
160                                metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
161                            let _ = healed.heal_primary_metadata_for_data_path(data_path);
162                            return Ok((healed, PhysicalMetadataSource::BinaryJournal));
163                        }
164                    }
165                }
166            }
167        }
168        Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
169            let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
170            let _ = healed.heal_primary_metadata_for_data_path(data_path);
171            (healed, PhysicalMetadataSource::Json)
172        })
173    }
174
175    pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
176        let binary_path = Self::metadata_binary_path_for(data_path);
177        if binary_path.exists() && super::seqn_journal_enabled() {
178            let sequence = Self::load_from_binary_path(&binary_path)
179                .map(|metadata| metadata.superblock.sequence)
180                .unwrap_or(self.superblock.sequence);
181            let journal_path = Self::metadata_journal_path_for(data_path, sequence);
182            let _ = fs::copy(&binary_path, journal_path);
183        }
184        self.save_to_binary_path(&binary_path)?;
185        self.prune_journal_for_data_path(data_path)?;
186        if super::meta_json_sidecar_enabled() {
187            let json_path = Self::metadata_path_for(data_path);
188            self.save_to_path(&json_path)?;
189        }
190        Ok(binary_path)
191    }
192
193    pub fn load_from_path(path: &Path) -> io::Result<Self> {
194        let text = fs::read_to_string(path)?;
195        let parsed = parse_json(&text).map_err(|err| {
196            io::Error::new(
197                io::ErrorKind::InvalidData,
198                format!("invalid physical metadata JSON: {err}"),
199            )
200        })?;
201        let json = JsonValue::from(parsed);
202        Self::from_json_value(&json)
203    }
204
205    pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
206        let text = self.to_json_value().to_string_pretty();
207        fs::write(path, text)
208    }
209
210    pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
211        let bytes = fs::read(path)?;
212        let json = from_slice::<JsonValue>(&bytes).map_err(|err| {
213            io::Error::new(
214                io::ErrorKind::InvalidData,
215                format!("invalid physical metadata binary: {err}"),
216            )
217        })?;
218        Self::from_json_value(&json)
219    }
220
221    pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
222        let bytes = to_vec(&self.to_json_value()).map_err(|err| {
223            io::Error::new(
224                io::ErrorKind::InvalidData,
225                format!("failed to encode physical metadata binary: {err}"),
226            )
227        })?;
228        fs::write(path, bytes)
229    }
230
231    pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
232        let Some(parent) = data_path.parent() else {
233            return Ok(Vec::new());
234        };
235        let file_name = data_path
236            .file_name()
237            .map(|name| name.to_string_lossy().into_owned())
238            .unwrap_or_else(|| "data.rdb".to_string());
239        let prefix = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-");
240
241        let mut paths = Vec::new();
242        for entry in fs::read_dir(parent)? {
243            let entry = entry?;
244            let path = entry.path();
245            let Some(name) = path.file_name().map(|name| name.to_string_lossy()) else {
246                continue;
247            };
248            if name.starts_with(&prefix) {
249                paths.push(path);
250            }
251        }
252        paths.sort();
253        Ok(paths)
254    }
255
256    pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
257        let retention = super::seqn_journal_retention();
258        let mut paths = Self::journal_paths_for_data_path(data_path)?;
259        if paths.len() <= retention {
260            return Ok(());
261        }
262        let delete_count = paths.len() - retention;
263        for path in paths.drain(0..delete_count) {
264            let _ = fs::remove_file(path);
265        }
266        Ok(())
267    }
268
269    pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
270        let binary_path = Self::metadata_binary_path_for(data_path);
271        self.save_to_binary_path(&binary_path)?;
272        if super::meta_json_sidecar_enabled() {
273            let json_path = Self::metadata_path_for(data_path);
274            self.save_to_path(&json_path)?;
275        }
276        Ok(())
277    }
278
279    pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
280        let mut metadata = self.clone();
281        metadata.last_loaded_from = Some(source.as_str().to_string());
282        metadata.last_healed_at_unix_ms = Some(unix_ms_now());
283        metadata
284    }
285
286    pub fn to_json_value(&self) -> JsonValue {
287        let mut root = Map::new();
288        root.insert(
289            "protocol_version".to_string(),
290            JsonValue::String(self.protocol_version.clone()),
291        );
292        root.insert(
293            "generated_at_unix_ms".to_string(),
294            json_u128(self.generated_at_unix_ms),
295        );
296        root.insert(
297            "last_loaded_from".to_string(),
298            self.last_loaded_from
299                .clone()
300                .map(JsonValue::String)
301                .unwrap_or(JsonValue::Null),
302        );
303        root.insert(
304            "last_healed_at_unix_ms".to_string(),
305            self.last_healed_at_unix_ms
306                .map(json_u128)
307                .unwrap_or(JsonValue::Null),
308        );
309        root.insert("manifest".to_string(), manifest_to_json(&self.manifest));
310        root.insert("catalog".to_string(), catalog_to_json(&self.catalog));
311        root.insert(
312            "manifest_events".to_string(),
313            JsonValue::Array(
314                self.manifest_events
315                    .iter()
316                    .map(manifest_event_to_json)
317                    .collect(),
318            ),
319        );
320        root.insert(
321            "indexes".to_string(),
322            JsonValue::Array(self.indexes.iter().map(index_state_to_json).collect()),
323        );
324        root.insert(
325            "graph_projections".to_string(),
326            JsonValue::Array(
327                self.graph_projections
328                    .iter()
329                    .map(graph_projection_to_json)
330                    .collect(),
331            ),
332        );
333        root.insert(
334            "analytics_jobs".to_string(),
335            JsonValue::Array(
336                self.analytics_jobs
337                    .iter()
338                    .map(analytics_job_to_json)
339                    .collect(),
340            ),
341        );
342        root.insert(
343            "tree_definitions".to_string(),
344            JsonValue::Array(
345                self.tree_definitions
346                    .iter()
347                    .map(tree_definition_to_json)
348                    .collect(),
349            ),
350        );
351        root.insert(
352            "collection_ttl_defaults_ms".to_string(),
353            JsonValue::Object(
354                self.collection_ttl_defaults_ms
355                    .iter()
356                    .map(|(collection, ttl_ms)| (collection.clone(), json_u64(*ttl_ms)))
357                    .collect(),
358            ),
359        );
360        root.insert(
361            "collection_contracts".to_string(),
362            JsonValue::Array(
363                self.collection_contracts
364                    .iter()
365                    .map(collection_contract_to_json)
366                    .collect(),
367            ),
368        );
369        root.insert(
370            "hypertables".to_string(),
371            JsonValue::Array(self.hypertables.iter().map(hypertable_to_json).collect()),
372        );
373        root.insert(
374            "exports".to_string(),
375            JsonValue::Array(self.exports.iter().map(export_descriptor_to_json).collect()),
376        );
377        root.insert(
378            "superblock".to_string(),
379            superblock_to_json(&self.superblock),
380        );
381        root.insert(
382            "snapshots".to_string(),
383            JsonValue::Array(
384                self.snapshots
385                    .iter()
386                    .map(snapshot_descriptor_to_json)
387                    .collect(),
388            ),
389        );
390        JsonValue::Object(root)
391    }
392
393    pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
394        let object = expect_object(value, "physical metadata root")?;
395        Ok(Self {
396            protocol_version: json_string_required(object, "protocol_version")?,
397            generated_at_unix_ms: json_u128_required(object, "generated_at_unix_ms")?,
398            last_loaded_from: object
399                .get("last_loaded_from")
400                .and_then(JsonValue::as_str)
401                .map(|value| value.to_string()),
402            last_healed_at_unix_ms: object
403                .get("last_healed_at_unix_ms")
404                .map(json_u128_value)
405                .transpose()?,
406            manifest: manifest_from_json(json_required(object, "manifest")?)?,
407            catalog: catalog_from_json(json_required(object, "catalog")?)?,
408            manifest_events: object
409                .get("manifest_events")
410                .and_then(JsonValue::as_array)
411                .map(|values| {
412                    values
413                        .iter()
414                        .map(manifest_event_from_json)
415                        .collect::<io::Result<Vec<_>>>()
416                })
417                .transpose()?
418                .unwrap_or_default(),
419            indexes: object
420                .get("indexes")
421                .and_then(JsonValue::as_array)
422                .map(|values| {
423                    values
424                        .iter()
425                        .map(index_state_from_json)
426                        .collect::<io::Result<Vec<_>>>()
427                })
428                .transpose()?
429                .unwrap_or_default(),
430            graph_projections: object
431                .get("graph_projections")
432                .and_then(JsonValue::as_array)
433                .map(|values| {
434                    values
435                        .iter()
436                        .map(graph_projection_from_json)
437                        .collect::<io::Result<Vec<_>>>()
438                })
439                .transpose()?
440                .unwrap_or_default(),
441            analytics_jobs: object
442                .get("analytics_jobs")
443                .and_then(JsonValue::as_array)
444                .map(|values| {
445                    values
446                        .iter()
447                        .map(analytics_job_from_json)
448                        .collect::<io::Result<Vec<_>>>()
449                })
450                .transpose()?
451                .unwrap_or_default(),
452            tree_definitions: object
453                .get("tree_definitions")
454                .and_then(JsonValue::as_array)
455                .map(|values| {
456                    values
457                        .iter()
458                        .map(tree_definition_from_json)
459                        .collect::<io::Result<Vec<_>>>()
460                })
461                .transpose()?
462                .unwrap_or_default(),
463            collection_ttl_defaults_ms: object
464                .get("collection_ttl_defaults_ms")
465                .and_then(JsonValue::as_object)
466                .map(|values| {
467                    values
468                        .iter()
469                        .filter_map(|(collection, ttl_ms)| {
470                            json_u64_value(ttl_ms)
471                                .ok()
472                                .map(|ttl_ms| (collection.clone(), ttl_ms))
473                        })
474                        .collect()
475                })
476                .unwrap_or_default(),
477            collection_contracts: object
478                .get("collection_contracts")
479                .and_then(JsonValue::as_array)
480                .map(|values| {
481                    values
482                        .iter()
483                        .map(collection_contract_from_json)
484                        .collect::<io::Result<Vec<_>>>()
485                })
486                .transpose()?
487                .unwrap_or_default(),
488            hypertables: object
489                .get("hypertables")
490                .and_then(JsonValue::as_array)
491                .map(|values| {
492                    values
493                        .iter()
494                        .map(hypertable_from_json)
495                        .collect::<io::Result<Vec<_>>>()
496                })
497                .transpose()?
498                .unwrap_or_default(),
499            exports: object
500                .get("exports")
501                .and_then(JsonValue::as_array)
502                .map(|values| {
503                    values
504                        .iter()
505                        .map(export_descriptor_from_json)
506                        .collect::<io::Result<Vec<_>>>()
507                })
508                .transpose()?
509                .unwrap_or_default(),
510            superblock: superblock_from_json(json_required(object, "superblock")?)?,
511            snapshots: json_required(object, "snapshots")?
512                .as_array()
513                .ok_or_else(|| invalid_data("field 'snapshots' must be an array"))?
514                .iter()
515                .map(snapshot_descriptor_from_json)
516                .collect::<io::Result<Vec<_>>>()?,
517        })
518    }
519}