Skip to main content

reddb_server/physical/
metadata_file.rs

1use super::*;
2
3impl PhysicalMetadataFile {
4    pub fn from_state(
5        options: RedDBOptions,
6        catalog: CatalogSnapshot,
7        collection_roots: BTreeMap<String, u64>,
8        indexes: Vec<PhysicalIndexState>,
9        previous: Option<&PhysicalMetadataFile>,
10    ) -> Self {
11        let now = unix_ms_now();
12        let mut manifest = SchemaManifest::now(options, catalog.total_collections);
13        if let Some(previous) = previous {
14            manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
15        }
16        manifest.updated_at_unix_ms = now;
17
18        let sequence = previous
19            .map(|previous| previous.superblock.sequence.saturating_add(1))
20            .unwrap_or(1);
21        let mut manifest_events = previous
22            .map(|previous| previous.manifest_events.clone())
23            .unwrap_or_default();
24        manifest_events.extend(build_manifest_events(
25            previous.map(|previous| &previous.superblock.collection_roots),
26            &collection_roots,
27            sequence,
28        ));
29        trim_manifest_history(&mut manifest_events);
30
31        let superblock = SuperblockHeader {
32            format_version: REDDB_FORMAT_VERSION,
33            sequence,
34            copies: DEFAULT_SUPERBLOCK_COPIES,
35            manifest: previous
36                .map(|previous| previous.superblock.manifest.clone())
37                .unwrap_or_default(),
38            free_set: previous
39                .map(|previous| previous.superblock.free_set)
40                .unwrap_or_default(),
41            collection_roots,
42        };
43
44        let mut snapshots = previous
45            .map(|previous| previous.snapshots.clone())
46            .unwrap_or_default();
47        snapshots.push(SnapshotDescriptor {
48            snapshot_id: sequence,
49            created_at_unix_ms: now,
50            superblock_sequence: sequence,
51            collection_count: catalog.total_collections,
52            total_entities: catalog.total_entities,
53        });
54        trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
55
56        Self {
57            protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
58            generated_at_unix_ms: now,
59            last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
60            last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
61            manifest,
62            catalog,
63            manifest_events,
64            indexes,
65            graph_projections: previous
66                .map(|previous| previous.graph_projections.clone())
67                .unwrap_or_default(),
68            analytics_jobs: previous
69                .map(|previous| previous.analytics_jobs.clone())
70                .unwrap_or_default(),
71            tree_definitions: previous
72                .map(|previous| previous.tree_definitions.clone())
73                .unwrap_or_default(),
74            collection_ttl_defaults_ms: previous
75                .map(|previous| previous.collection_ttl_defaults_ms.clone())
76                .unwrap_or_default(),
77            collection_contracts: previous
78                .map(|previous| previous.collection_contracts.clone())
79                .unwrap_or_default(),
80            exports: previous
81                .map(|previous| previous.exports.clone())
82                .unwrap_or_default(),
83            superblock,
84            snapshots,
85        }
86    }
87
88    pub fn metadata_path_for(data_path: &Path) -> PathBuf {
89        let file_name = data_path
90            .file_name()
91            .map(|name| name.to_string_lossy().into_owned())
92            .unwrap_or_else(|| "data.rdb".to_string());
93        let meta_file = format!("{file_name}.meta.json");
94        match data_path.parent() {
95            Some(parent) => parent.join(meta_file),
96            None => PathBuf::from(meta_file),
97        }
98    }
99
100    pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
101        let file_name = data_path
102            .file_name()
103            .map(|name| name.to_string_lossy().into_owned())
104            .unwrap_or_else(|| "data.rdb".to_string());
105        let meta_file = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}");
106        match data_path.parent() {
107            Some(parent) => parent.join(meta_file),
108            None => PathBuf::from(meta_file),
109        }
110    }
111
112    pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
113        let file_name = data_path
114            .file_name()
115            .map(|name| name.to_string_lossy().into_owned())
116            .unwrap_or_else(|| "data.rdb".to_string());
117        let meta_file =
118            format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-{sequence:020}");
119        match data_path.parent() {
120            Some(parent) => parent.join(meta_file),
121            None => PathBuf::from(meta_file),
122        }
123    }
124
125    pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
126        let file_name = data_path
127            .file_name()
128            .map(|name| name.to_string_lossy().into_owned())
129            .unwrap_or_else(|| "data.rdb".to_string());
130        let stem = file_name.strip_suffix(".rdb").unwrap_or(&file_name);
131        let export_file = format!("{stem}.export.{}.rdb", sanitize_export_name(name));
132        match data_path.parent() {
133            Some(parent) => parent.join(export_file),
134            None => PathBuf::from(export_file),
135        }
136    }
137
138    pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
139        Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
140    }
141
142    pub fn load_for_data_path_with_source(
143        data_path: &Path,
144    ) -> io::Result<(Self, PhysicalMetadataSource)> {
145        let binary_path = Self::metadata_binary_path_for(data_path);
146        if binary_path.exists() {
147            match Self::load_from_binary_path(&binary_path) {
148                Ok(metadata) => {
149                    return Ok((metadata, PhysicalMetadataSource::Binary));
150                }
151                Err(_) => {
152                    let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
153                    journal_paths.reverse();
154                    for journal_path in journal_paths {
155                        if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
156                            let healed =
157                                metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
158                            let _ = healed.heal_primary_metadata_for_data_path(data_path);
159                            return Ok((healed, PhysicalMetadataSource::BinaryJournal));
160                        }
161                    }
162                }
163            }
164        }
165        Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
166            let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
167            let _ = healed.heal_primary_metadata_for_data_path(data_path);
168            (healed, PhysicalMetadataSource::Json)
169        })
170    }
171
172    pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
173        let binary_path = Self::metadata_binary_path_for(data_path);
174        if binary_path.exists() {
175            let sequence = Self::load_from_binary_path(&binary_path)
176                .map(|metadata| metadata.superblock.sequence)
177                .unwrap_or(self.superblock.sequence);
178            let journal_path = Self::metadata_journal_path_for(data_path, sequence);
179            let _ = fs::copy(&binary_path, journal_path);
180        }
181        self.save_to_binary_path(&binary_path)?;
182        self.prune_journal_for_data_path(data_path)?;
183        let json_path = Self::metadata_path_for(data_path);
184        self.save_to_path(&json_path)?;
185        Ok(binary_path)
186    }
187
188    pub fn load_from_path(path: &Path) -> io::Result<Self> {
189        let text = fs::read_to_string(path)?;
190        let parsed = parse_json(&text).map_err(|err| {
191            io::Error::new(
192                io::ErrorKind::InvalidData,
193                format!("invalid physical metadata JSON: {err}"),
194            )
195        })?;
196        let json = JsonValue::from(parsed);
197        Self::from_json_value(&json)
198    }
199
200    pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
201        let text = self.to_json_value().to_string_pretty();
202        fs::write(path, text)
203    }
204
205    pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
206        let bytes = fs::read(path)?;
207        let json = from_slice::<JsonValue>(&bytes).map_err(|err| {
208            io::Error::new(
209                io::ErrorKind::InvalidData,
210                format!("invalid physical metadata binary: {err}"),
211            )
212        })?;
213        Self::from_json_value(&json)
214    }
215
216    pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
217        let bytes = to_vec(&self.to_json_value()).map_err(|err| {
218            io::Error::new(
219                io::ErrorKind::InvalidData,
220                format!("failed to encode physical metadata binary: {err}"),
221            )
222        })?;
223        fs::write(path, bytes)
224    }
225
226    pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
227        let Some(parent) = data_path.parent() else {
228            return Ok(Vec::new());
229        };
230        let file_name = data_path
231            .file_name()
232            .map(|name| name.to_string_lossy().into_owned())
233            .unwrap_or_else(|| "data.rdb".to_string());
234        let prefix = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-");
235
236        let mut paths = Vec::new();
237        for entry in fs::read_dir(parent)? {
238            let entry = entry?;
239            let path = entry.path();
240            let Some(name) = path.file_name().map(|name| name.to_string_lossy()) else {
241                continue;
242            };
243            if name.starts_with(&prefix) {
244                paths.push(path);
245            }
246        }
247        paths.sort();
248        Ok(paths)
249    }
250
251    pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
252        let mut paths = Self::journal_paths_for_data_path(data_path)?;
253        if paths.len() <= DEFAULT_METADATA_JOURNAL_RETENTION {
254            return Ok(());
255        }
256        let delete_count = paths.len() - DEFAULT_METADATA_JOURNAL_RETENTION;
257        for path in paths.drain(0..delete_count) {
258            let _ = fs::remove_file(path);
259        }
260        Ok(())
261    }
262
263    pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
264        let binary_path = Self::metadata_binary_path_for(data_path);
265        self.save_to_binary_path(&binary_path)?;
266        let json_path = Self::metadata_path_for(data_path);
267        self.save_to_path(&json_path)?;
268        Ok(())
269    }
270
271    pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
272        let mut metadata = self.clone();
273        metadata.last_loaded_from = Some(source.as_str().to_string());
274        metadata.last_healed_at_unix_ms = Some(unix_ms_now());
275        metadata
276    }
277
278    pub fn to_json_value(&self) -> JsonValue {
279        let mut root = Map::new();
280        root.insert(
281            "protocol_version".to_string(),
282            JsonValue::String(self.protocol_version.clone()),
283        );
284        root.insert(
285            "generated_at_unix_ms".to_string(),
286            json_u128(self.generated_at_unix_ms),
287        );
288        root.insert(
289            "last_loaded_from".to_string(),
290            self.last_loaded_from
291                .clone()
292                .map(JsonValue::String)
293                .unwrap_or(JsonValue::Null),
294        );
295        root.insert(
296            "last_healed_at_unix_ms".to_string(),
297            self.last_healed_at_unix_ms
298                .map(json_u128)
299                .unwrap_or(JsonValue::Null),
300        );
301        root.insert("manifest".to_string(), manifest_to_json(&self.manifest));
302        root.insert("catalog".to_string(), catalog_to_json(&self.catalog));
303        root.insert(
304            "manifest_events".to_string(),
305            JsonValue::Array(
306                self.manifest_events
307                    .iter()
308                    .map(manifest_event_to_json)
309                    .collect(),
310            ),
311        );
312        root.insert(
313            "indexes".to_string(),
314            JsonValue::Array(self.indexes.iter().map(index_state_to_json).collect()),
315        );
316        root.insert(
317            "graph_projections".to_string(),
318            JsonValue::Array(
319                self.graph_projections
320                    .iter()
321                    .map(graph_projection_to_json)
322                    .collect(),
323            ),
324        );
325        root.insert(
326            "analytics_jobs".to_string(),
327            JsonValue::Array(
328                self.analytics_jobs
329                    .iter()
330                    .map(analytics_job_to_json)
331                    .collect(),
332            ),
333        );
334        root.insert(
335            "tree_definitions".to_string(),
336            JsonValue::Array(
337                self.tree_definitions
338                    .iter()
339                    .map(tree_definition_to_json)
340                    .collect(),
341            ),
342        );
343        root.insert(
344            "collection_ttl_defaults_ms".to_string(),
345            JsonValue::Object(
346                self.collection_ttl_defaults_ms
347                    .iter()
348                    .map(|(collection, ttl_ms)| (collection.clone(), json_u64(*ttl_ms)))
349                    .collect(),
350            ),
351        );
352        root.insert(
353            "collection_contracts".to_string(),
354            JsonValue::Array(
355                self.collection_contracts
356                    .iter()
357                    .map(collection_contract_to_json)
358                    .collect(),
359            ),
360        );
361        root.insert(
362            "exports".to_string(),
363            JsonValue::Array(self.exports.iter().map(export_descriptor_to_json).collect()),
364        );
365        root.insert(
366            "superblock".to_string(),
367            superblock_to_json(&self.superblock),
368        );
369        root.insert(
370            "snapshots".to_string(),
371            JsonValue::Array(
372                self.snapshots
373                    .iter()
374                    .map(snapshot_descriptor_to_json)
375                    .collect(),
376            ),
377        );
378        JsonValue::Object(root)
379    }
380
381    pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
382        let object = expect_object(value, "physical metadata root")?;
383        Ok(Self {
384            protocol_version: json_string_required(object, "protocol_version")?,
385            generated_at_unix_ms: json_u128_required(object, "generated_at_unix_ms")?,
386            last_loaded_from: object
387                .get("last_loaded_from")
388                .and_then(JsonValue::as_str)
389                .map(|value| value.to_string()),
390            last_healed_at_unix_ms: object
391                .get("last_healed_at_unix_ms")
392                .map(json_u128_value)
393                .transpose()?,
394            manifest: manifest_from_json(json_required(object, "manifest")?)?,
395            catalog: catalog_from_json(json_required(object, "catalog")?)?,
396            manifest_events: object
397                .get("manifest_events")
398                .and_then(JsonValue::as_array)
399                .map(|values| {
400                    values
401                        .iter()
402                        .map(manifest_event_from_json)
403                        .collect::<io::Result<Vec<_>>>()
404                })
405                .transpose()?
406                .unwrap_or_default(),
407            indexes: object
408                .get("indexes")
409                .and_then(JsonValue::as_array)
410                .map(|values| {
411                    values
412                        .iter()
413                        .map(index_state_from_json)
414                        .collect::<io::Result<Vec<_>>>()
415                })
416                .transpose()?
417                .unwrap_or_default(),
418            graph_projections: object
419                .get("graph_projections")
420                .and_then(JsonValue::as_array)
421                .map(|values| {
422                    values
423                        .iter()
424                        .map(graph_projection_from_json)
425                        .collect::<io::Result<Vec<_>>>()
426                })
427                .transpose()?
428                .unwrap_or_default(),
429            analytics_jobs: object
430                .get("analytics_jobs")
431                .and_then(JsonValue::as_array)
432                .map(|values| {
433                    values
434                        .iter()
435                        .map(analytics_job_from_json)
436                        .collect::<io::Result<Vec<_>>>()
437                })
438                .transpose()?
439                .unwrap_or_default(),
440            tree_definitions: object
441                .get("tree_definitions")
442                .and_then(JsonValue::as_array)
443                .map(|values| {
444                    values
445                        .iter()
446                        .map(tree_definition_from_json)
447                        .collect::<io::Result<Vec<_>>>()
448                })
449                .transpose()?
450                .unwrap_or_default(),
451            collection_ttl_defaults_ms: object
452                .get("collection_ttl_defaults_ms")
453                .and_then(JsonValue::as_object)
454                .map(|values| {
455                    values
456                        .iter()
457                        .filter_map(|(collection, ttl_ms)| {
458                            json_u64_value(ttl_ms)
459                                .ok()
460                                .map(|ttl_ms| (collection.clone(), ttl_ms))
461                        })
462                        .collect()
463                })
464                .unwrap_or_default(),
465            collection_contracts: object
466                .get("collection_contracts")
467                .and_then(JsonValue::as_array)
468                .map(|values| {
469                    values
470                        .iter()
471                        .map(collection_contract_from_json)
472                        .collect::<io::Result<Vec<_>>>()
473                })
474                .transpose()?
475                .unwrap_or_default(),
476            exports: object
477                .get("exports")
478                .and_then(JsonValue::as_array)
479                .map(|values| {
480                    values
481                        .iter()
482                        .map(export_descriptor_from_json)
483                        .collect::<io::Result<Vec<_>>>()
484                })
485                .transpose()?
486                .unwrap_or_default(),
487            superblock: superblock_from_json(json_required(object, "superblock")?)?,
488            snapshots: json_required(object, "snapshots")?
489                .as_array()
490                .ok_or_else(|| invalid_data("field 'snapshots' must be an array"))?
491                .iter()
492                .map(snapshot_descriptor_from_json)
493                .collect::<io::Result<Vec<_>>>()?,
494        })
495    }
496}