Skip to main content

reddb_server/physical/
metadata_file.rs

1use super::*;
2
3impl PhysicalMetadataFile {
4    pub fn from_state(
5        options: RedDBOptions,
6        catalog: CatalogSnapshot,
7        collection_roots: BTreeMap<String, u64>,
8        indexes: Vec<PhysicalIndexState>,
9        previous: Option<&PhysicalMetadataFile>,
10    ) -> Self {
11        let now = unix_ms_now();
12        let mut manifest = SchemaManifest::now(options, catalog.total_collections);
13        if let Some(previous) = previous {
14            manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
15        }
16        manifest.updated_at_unix_ms = now;
17
18        let sequence = previous
19            .map(|previous| previous.superblock.sequence.saturating_add(1))
20            .unwrap_or(1);
21        let mut manifest_events = previous
22            .map(|previous| previous.manifest_events.clone())
23            .unwrap_or_default();
24        manifest_events.extend(build_manifest_events(
25            previous.map(|previous| &previous.superblock.collection_roots),
26            &collection_roots,
27            sequence,
28        ));
29        trim_manifest_history(&mut manifest_events);
30
31        let superblock = SuperblockHeader {
32            format_version: REDDB_FORMAT_VERSION,
33            sequence,
34            copies: DEFAULT_SUPERBLOCK_COPIES,
35            manifest: previous
36                .map(|previous| previous.superblock.manifest.clone())
37                .unwrap_or_default(),
38            free_set: previous
39                .map(|previous| previous.superblock.free_set)
40                .unwrap_or_default(),
41            collection_roots,
42        };
43
44        let mut snapshots = previous
45            .map(|previous| previous.snapshots.clone())
46            .unwrap_or_default();
47        snapshots.push(SnapshotDescriptor {
48            snapshot_id: sequence,
49            created_at_unix_ms: now,
50            superblock_sequence: sequence,
51            collection_count: catalog.total_collections,
52            total_entities: catalog.total_entities,
53        });
54        trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
55
56        Self {
57            protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
58            generated_at_unix_ms: now,
59            last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
60            last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
61            manifest,
62            catalog,
63            manifest_events,
64            indexes,
65            graph_projections: previous
66                .map(|previous| previous.graph_projections.clone())
67                .unwrap_or_default(),
68            analytics_jobs: previous
69                .map(|previous| previous.analytics_jobs.clone())
70                .unwrap_or_default(),
71            tree_definitions: previous
72                .map(|previous| previous.tree_definitions.clone())
73                .unwrap_or_default(),
74            collection_ttl_defaults_ms: previous
75                .map(|previous| previous.collection_ttl_defaults_ms.clone())
76                .unwrap_or_default(),
77            collection_contracts: previous
78                .map(|previous| previous.collection_contracts.clone())
79                .unwrap_or_default(),
80            exports: previous
81                .map(|previous| previous.exports.clone())
82                .unwrap_or_default(),
83            superblock,
84            snapshots,
85        }
86    }
87
88    pub fn metadata_path_for(data_path: &Path) -> PathBuf {
89        let file_name = data_path
90            .file_name()
91            .map(|name| name.to_string_lossy().into_owned())
92            .unwrap_or_else(|| "data.rdb".to_string());
93        let meta_file = format!("{file_name}.meta.json");
94        match data_path.parent() {
95            Some(parent) => parent.join(meta_file),
96            None => PathBuf::from(meta_file),
97        }
98    }
99
100    pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
101        let file_name = data_path
102            .file_name()
103            .map(|name| name.to_string_lossy().into_owned())
104            .unwrap_or_else(|| "data.rdb".to_string());
105        let meta_file = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}");
106        match data_path.parent() {
107            Some(parent) => parent.join(meta_file),
108            None => PathBuf::from(meta_file),
109        }
110    }
111
112    pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
113        let file_name = data_path
114            .file_name()
115            .map(|name| name.to_string_lossy().into_owned())
116            .unwrap_or_else(|| "data.rdb".to_string());
117        let meta_file =
118            format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-{sequence:020}");
119        match data_path.parent() {
120            Some(parent) => parent.join(meta_file),
121            None => PathBuf::from(meta_file),
122        }
123    }
124
125    pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
126        let file_name = data_path
127            .file_name()
128            .map(|name| name.to_string_lossy().into_owned())
129            .unwrap_or_else(|| "data.rdb".to_string());
130        let stem = file_name.strip_suffix(".rdb").unwrap_or(&file_name);
131        let export_file = format!("{stem}.export.{}.rdb", sanitize_export_name(name));
132        match data_path.parent() {
133            Some(parent) => parent.join(export_file),
134            None => PathBuf::from(export_file),
135        }
136    }
137
138    pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
139        Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
140    }
141
142    pub fn load_for_data_path_with_source(
143        data_path: &Path,
144    ) -> io::Result<(Self, PhysicalMetadataSource)> {
145        let binary_path = Self::metadata_binary_path_for(data_path);
146        if binary_path.exists() {
147            match Self::load_from_binary_path(&binary_path) {
148                Ok(metadata) => {
149                    return Ok((metadata, PhysicalMetadataSource::Binary));
150                }
151                Err(_) => {
152                    let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
153                    journal_paths.reverse();
154                    for journal_path in journal_paths {
155                        if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
156                            let healed =
157                                metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
158                            let _ = healed.heal_primary_metadata_for_data_path(data_path);
159                            return Ok((healed, PhysicalMetadataSource::BinaryJournal));
160                        }
161                    }
162                }
163            }
164        }
165        Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
166            let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
167            let _ = healed.heal_primary_metadata_for_data_path(data_path);
168            (healed, PhysicalMetadataSource::Json)
169        })
170    }
171
172    pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
173        let binary_path = Self::metadata_binary_path_for(data_path);
174        if binary_path.exists() && super::seqn_journal_enabled() {
175            let sequence = Self::load_from_binary_path(&binary_path)
176                .map(|metadata| metadata.superblock.sequence)
177                .unwrap_or(self.superblock.sequence);
178            let journal_path = Self::metadata_journal_path_for(data_path, sequence);
179            let _ = fs::copy(&binary_path, journal_path);
180        }
181        self.save_to_binary_path(&binary_path)?;
182        self.prune_journal_for_data_path(data_path)?;
183        if super::meta_json_sidecar_enabled() {
184            let json_path = Self::metadata_path_for(data_path);
185            self.save_to_path(&json_path)?;
186        }
187        Ok(binary_path)
188    }
189
190    pub fn load_from_path(path: &Path) -> io::Result<Self> {
191        let text = fs::read_to_string(path)?;
192        let parsed = parse_json(&text).map_err(|err| {
193            io::Error::new(
194                io::ErrorKind::InvalidData,
195                format!("invalid physical metadata JSON: {err}"),
196            )
197        })?;
198        let json = JsonValue::from(parsed);
199        Self::from_json_value(&json)
200    }
201
202    pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
203        let text = self.to_json_value().to_string_pretty();
204        fs::write(path, text)
205    }
206
207    pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
208        let bytes = fs::read(path)?;
209        let json = from_slice::<JsonValue>(&bytes).map_err(|err| {
210            io::Error::new(
211                io::ErrorKind::InvalidData,
212                format!("invalid physical metadata binary: {err}"),
213            )
214        })?;
215        Self::from_json_value(&json)
216    }
217
218    pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
219        let bytes = to_vec(&self.to_json_value()).map_err(|err| {
220            io::Error::new(
221                io::ErrorKind::InvalidData,
222                format!("failed to encode physical metadata binary: {err}"),
223            )
224        })?;
225        fs::write(path, bytes)
226    }
227
228    pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
229        let Some(parent) = data_path.parent() else {
230            return Ok(Vec::new());
231        };
232        let file_name = data_path
233            .file_name()
234            .map(|name| name.to_string_lossy().into_owned())
235            .unwrap_or_else(|| "data.rdb".to_string());
236        let prefix = format!("{file_name}.{PHYSICAL_METADATA_BINARY_EXTENSION}.seq-");
237
238        let mut paths = Vec::new();
239        for entry in fs::read_dir(parent)? {
240            let entry = entry?;
241            let path = entry.path();
242            let Some(name) = path.file_name().map(|name| name.to_string_lossy()) else {
243                continue;
244            };
245            if name.starts_with(&prefix) {
246                paths.push(path);
247            }
248        }
249        paths.sort();
250        Ok(paths)
251    }
252
253    pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
254        let retention = super::seqn_journal_retention();
255        let mut paths = Self::journal_paths_for_data_path(data_path)?;
256        if paths.len() <= retention {
257            return Ok(());
258        }
259        let delete_count = paths.len() - retention;
260        for path in paths.drain(0..delete_count) {
261            let _ = fs::remove_file(path);
262        }
263        Ok(())
264    }
265
266    pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
267        let binary_path = Self::metadata_binary_path_for(data_path);
268        self.save_to_binary_path(&binary_path)?;
269        if super::meta_json_sidecar_enabled() {
270            let json_path = Self::metadata_path_for(data_path);
271            self.save_to_path(&json_path)?;
272        }
273        Ok(())
274    }
275
276    pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
277        let mut metadata = self.clone();
278        metadata.last_loaded_from = Some(source.as_str().to_string());
279        metadata.last_healed_at_unix_ms = Some(unix_ms_now());
280        metadata
281    }
282
283    pub fn to_json_value(&self) -> JsonValue {
284        let mut root = Map::new();
285        root.insert(
286            "protocol_version".to_string(),
287            JsonValue::String(self.protocol_version.clone()),
288        );
289        root.insert(
290            "generated_at_unix_ms".to_string(),
291            json_u128(self.generated_at_unix_ms),
292        );
293        root.insert(
294            "last_loaded_from".to_string(),
295            self.last_loaded_from
296                .clone()
297                .map(JsonValue::String)
298                .unwrap_or(JsonValue::Null),
299        );
300        root.insert(
301            "last_healed_at_unix_ms".to_string(),
302            self.last_healed_at_unix_ms
303                .map(json_u128)
304                .unwrap_or(JsonValue::Null),
305        );
306        root.insert("manifest".to_string(), manifest_to_json(&self.manifest));
307        root.insert("catalog".to_string(), catalog_to_json(&self.catalog));
308        root.insert(
309            "manifest_events".to_string(),
310            JsonValue::Array(
311                self.manifest_events
312                    .iter()
313                    .map(manifest_event_to_json)
314                    .collect(),
315            ),
316        );
317        root.insert(
318            "indexes".to_string(),
319            JsonValue::Array(self.indexes.iter().map(index_state_to_json).collect()),
320        );
321        root.insert(
322            "graph_projections".to_string(),
323            JsonValue::Array(
324                self.graph_projections
325                    .iter()
326                    .map(graph_projection_to_json)
327                    .collect(),
328            ),
329        );
330        root.insert(
331            "analytics_jobs".to_string(),
332            JsonValue::Array(
333                self.analytics_jobs
334                    .iter()
335                    .map(analytics_job_to_json)
336                    .collect(),
337            ),
338        );
339        root.insert(
340            "tree_definitions".to_string(),
341            JsonValue::Array(
342                self.tree_definitions
343                    .iter()
344                    .map(tree_definition_to_json)
345                    .collect(),
346            ),
347        );
348        root.insert(
349            "collection_ttl_defaults_ms".to_string(),
350            JsonValue::Object(
351                self.collection_ttl_defaults_ms
352                    .iter()
353                    .map(|(collection, ttl_ms)| (collection.clone(), json_u64(*ttl_ms)))
354                    .collect(),
355            ),
356        );
357        root.insert(
358            "collection_contracts".to_string(),
359            JsonValue::Array(
360                self.collection_contracts
361                    .iter()
362                    .map(collection_contract_to_json)
363                    .collect(),
364            ),
365        );
366        root.insert(
367            "exports".to_string(),
368            JsonValue::Array(self.exports.iter().map(export_descriptor_to_json).collect()),
369        );
370        root.insert(
371            "superblock".to_string(),
372            superblock_to_json(&self.superblock),
373        );
374        root.insert(
375            "snapshots".to_string(),
376            JsonValue::Array(
377                self.snapshots
378                    .iter()
379                    .map(snapshot_descriptor_to_json)
380                    .collect(),
381            ),
382        );
383        JsonValue::Object(root)
384    }
385
386    pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
387        let object = expect_object(value, "physical metadata root")?;
388        Ok(Self {
389            protocol_version: json_string_required(object, "protocol_version")?,
390            generated_at_unix_ms: json_u128_required(object, "generated_at_unix_ms")?,
391            last_loaded_from: object
392                .get("last_loaded_from")
393                .and_then(JsonValue::as_str)
394                .map(|value| value.to_string()),
395            last_healed_at_unix_ms: object
396                .get("last_healed_at_unix_ms")
397                .map(json_u128_value)
398                .transpose()?,
399            manifest: manifest_from_json(json_required(object, "manifest")?)?,
400            catalog: catalog_from_json(json_required(object, "catalog")?)?,
401            manifest_events: object
402                .get("manifest_events")
403                .and_then(JsonValue::as_array)
404                .map(|values| {
405                    values
406                        .iter()
407                        .map(manifest_event_from_json)
408                        .collect::<io::Result<Vec<_>>>()
409                })
410                .transpose()?
411                .unwrap_or_default(),
412            indexes: object
413                .get("indexes")
414                .and_then(JsonValue::as_array)
415                .map(|values| {
416                    values
417                        .iter()
418                        .map(index_state_from_json)
419                        .collect::<io::Result<Vec<_>>>()
420                })
421                .transpose()?
422                .unwrap_or_default(),
423            graph_projections: object
424                .get("graph_projections")
425                .and_then(JsonValue::as_array)
426                .map(|values| {
427                    values
428                        .iter()
429                        .map(graph_projection_from_json)
430                        .collect::<io::Result<Vec<_>>>()
431                })
432                .transpose()?
433                .unwrap_or_default(),
434            analytics_jobs: object
435                .get("analytics_jobs")
436                .and_then(JsonValue::as_array)
437                .map(|values| {
438                    values
439                        .iter()
440                        .map(analytics_job_from_json)
441                        .collect::<io::Result<Vec<_>>>()
442                })
443                .transpose()?
444                .unwrap_or_default(),
445            tree_definitions: object
446                .get("tree_definitions")
447                .and_then(JsonValue::as_array)
448                .map(|values| {
449                    values
450                        .iter()
451                        .map(tree_definition_from_json)
452                        .collect::<io::Result<Vec<_>>>()
453                })
454                .transpose()?
455                .unwrap_or_default(),
456            collection_ttl_defaults_ms: object
457                .get("collection_ttl_defaults_ms")
458                .and_then(JsonValue::as_object)
459                .map(|values| {
460                    values
461                        .iter()
462                        .filter_map(|(collection, ttl_ms)| {
463                            json_u64_value(ttl_ms)
464                                .ok()
465                                .map(|ttl_ms| (collection.clone(), ttl_ms))
466                        })
467                        .collect()
468                })
469                .unwrap_or_default(),
470            collection_contracts: object
471                .get("collection_contracts")
472                .and_then(JsonValue::as_array)
473                .map(|values| {
474                    values
475                        .iter()
476                        .map(collection_contract_from_json)
477                        .collect::<io::Result<Vec<_>>>()
478                })
479                .transpose()?
480                .unwrap_or_default(),
481            exports: object
482                .get("exports")
483                .and_then(JsonValue::as_array)
484                .map(|values| {
485                    values
486                        .iter()
487                        .map(export_descriptor_from_json)
488                        .collect::<io::Result<Vec<_>>>()
489                })
490                .transpose()?
491                .unwrap_or_default(),
492            superblock: superblock_from_json(json_required(object, "superblock")?)?,
493            snapshots: json_required(object, "snapshots")?
494                .as_array()
495                .ok_or_else(|| invalid_data("field 'snapshots' must be an array"))?
496                .iter()
497                .map(snapshot_descriptor_from_json)
498                .collect::<io::Result<Vec<_>>>()?,
499        })
500    }
501}