Skip to main content

reddb_server/physical/
metadata_file.rs

1use super::*;
2use crate::api::REDDB_FORMAT_VERSION;
3
4impl PhysicalMetadataFile {
5    pub fn from_state(
6        options: RedDBOptions,
7        catalog: CatalogSnapshot,
8        collection_roots: BTreeMap<String, u64>,
9        indexes: Vec<PhysicalIndexState>,
10        previous: Option<&PhysicalMetadataFile>,
11    ) -> Self {
12        let now = unix_ms_now();
13        let mut manifest = SchemaManifest::now(options, catalog.total_collections);
14        if let Some(previous) = previous {
15            manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
16        }
17        manifest.updated_at_unix_ms = now;
18
19        let sequence = previous
20            .map(|previous| previous.superblock.sequence.saturating_add(1))
21            .unwrap_or(1);
22        let mut manifest_events = previous
23            .map(|previous| previous.manifest_events.clone())
24            .unwrap_or_default();
25        manifest_events.extend(build_manifest_events(
26            previous.map(|previous| &previous.superblock.collection_roots),
27            &collection_roots,
28            sequence,
29        ));
30        trim_manifest_history(&mut manifest_events);
31
32        let superblock = SuperblockHeader {
33            format_version: REDDB_FORMAT_VERSION,
34            sequence,
35            copies: DEFAULT_SUPERBLOCK_COPIES,
36            manifest: previous
37                .map(|previous| previous.superblock.manifest.clone())
38                .unwrap_or_default(),
39            free_set: previous
40                .map(|previous| previous.superblock.free_set)
41                .unwrap_or_default(),
42            collection_roots,
43        };
44
45        let mut snapshots = previous
46            .map(|previous| previous.snapshots.clone())
47            .unwrap_or_default();
48        snapshots.push(SnapshotDescriptor {
49            snapshot_id: sequence,
50            created_at_unix_ms: now,
51            superblock_sequence: sequence,
52            collection_count: catalog.total_collections,
53            total_entities: catalog.total_entities,
54        });
55        trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
56
57        Self {
58            protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
59            generated_at_unix_ms: now,
60            last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
61            last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
62            manifest,
63            catalog,
64            manifest_events,
65            indexes,
66            graph_projections: previous
67                .map(|previous| previous.graph_projections.clone())
68                .unwrap_or_default(),
69            analytics_jobs: previous
70                .map(|previous| previous.analytics_jobs.clone())
71                .unwrap_or_default(),
72            tree_definitions: previous
73                .map(|previous| previous.tree_definitions.clone())
74                .unwrap_or_default(),
75            collection_ttl_defaults_ms: previous
76                .map(|previous| previous.collection_ttl_defaults_ms.clone())
77                .unwrap_or_default(),
78            collection_contracts: previous
79                .map(|previous| previous.collection_contracts.clone())
80                .unwrap_or_default(),
81            hypertables: previous
82                .map(|previous| previous.hypertables.clone())
83                .unwrap_or_default(),
84            exports: previous
85                .map(|previous| previous.exports.clone())
86                .unwrap_or_default(),
87            superblock,
88            snapshots,
89        }
90    }
91
92    pub fn metadata_path_for(data_path: &Path) -> PathBuf {
93        reddb_file::layout::physical_metadata_json_path(data_path)
94    }
95
96    pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
97        reddb_file::layout::physical_metadata_binary_path(data_path)
98    }
99
100    pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
101        reddb_file::layout::physical_metadata_journal_path(data_path, sequence)
102    }
103
104    pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
105        reddb_file::layout::physical_export_data_path(data_path, name)
106    }
107
108    pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
109        Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
110    }
111
112    pub fn load_for_data_path_with_source(
113        data_path: &Path,
114    ) -> io::Result<(Self, PhysicalMetadataSource)> {
115        let binary_path = Self::metadata_binary_path_for(data_path);
116        if binary_path.exists() {
117            match Self::load_from_binary_path(&binary_path) {
118                Ok(metadata) => {
119                    return Ok((metadata, PhysicalMetadataSource::Binary));
120                }
121                Err(_) => {
122                    let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
123                    journal_paths.reverse();
124                    for journal_path in journal_paths {
125                        if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
126                            let healed =
127                                metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
128                            let _ = healed.heal_primary_metadata_for_data_path(data_path);
129                            return Ok((healed, PhysicalMetadataSource::BinaryJournal));
130                        }
131                    }
132                }
133            }
134        }
135        Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
136            let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
137            let _ = healed.heal_primary_metadata_for_data_path(data_path);
138            (healed, PhysicalMetadataSource::Json)
139        })
140    }
141
142    pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
143        let binary_path = Self::metadata_binary_path_for(data_path);
144        if binary_path.exists() && super::seqn_journal_enabled() {
145            let sequence = Self::load_from_binary_path(&binary_path)
146                .map(|metadata| metadata.superblock.sequence)
147                .unwrap_or(self.superblock.sequence);
148            let _ = reddb_file::copy_physical_metadata_binary_to_journal(
149                data_path,
150                &binary_path,
151                sequence,
152            );
153        }
154        self.save_to_binary_path(&binary_path)?;
155        self.prune_journal_for_data_path(data_path)?;
156        if super::meta_json_sidecar_enabled() {
157            let json_path = Self::metadata_path_for(data_path);
158            self.save_to_path(&json_path)?;
159        }
160        Ok(binary_path)
161    }
162
163    pub fn load_from_path(path: &Path) -> io::Result<Self> {
164        let text = reddb_file::read_physical_metadata_document(path).map_err(|err| {
165            io::Error::new(
166                io::ErrorKind::InvalidData,
167                format!("invalid physical metadata JSON: {err}"),
168            )
169        })?;
170        Self::from_document_json(&text, "invalid physical metadata JSON")
171    }
172
173    pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
174        let text = self.to_document_json(true)?;
175        reddb_file::write_physical_metadata_json_document(path, &text)
176            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
177    }
178
179    pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
180        let text = reddb_file::read_physical_metadata_document(path).map_err(|err| {
181            io::Error::new(
182                io::ErrorKind::InvalidData,
183                format!("invalid physical metadata binary: {err}"),
184            )
185        })?;
186        Self::from_document_json(&text, "invalid physical metadata binary")
187    }
188
189    pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
190        let text = self.to_document_json(false)?;
191        reddb_file::write_physical_metadata_binary_document(path, &text)
192            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
193    }
194
195    pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
196        reddb_file::list_physical_metadata_journal_paths(data_path)
197            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
198    }
199
200    pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
201        let retention = super::seqn_journal_retention();
202        reddb_file::prune_physical_metadata_journal_paths(data_path, retention)
203            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
204    }
205
206    pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
207        let binary_path = Self::metadata_binary_path_for(data_path);
208        self.save_to_binary_path(&binary_path)?;
209        if super::meta_json_sidecar_enabled() {
210            let json_path = Self::metadata_path_for(data_path);
211            self.save_to_path(&json_path)?;
212        }
213        Ok(())
214    }
215
216    pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
217        let mut metadata = self.clone();
218        metadata.last_loaded_from = Some(source.as_str().to_string());
219        metadata.last_healed_at_unix_ms = Some(unix_ms_now());
220        metadata
221    }
222
223    pub fn to_json_value(&self) -> JsonValue {
224        let json = self
225            .to_document_json(false)
226            .expect("physical metadata must encode as JSON");
227        crate::json::from_str::<JsonValue>(&json)
228            .expect("reddb-file emitted JSON the server can parse")
229    }
230
231    pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
232        Self::from_document_json(&value.to_string_compact(), "invalid physical metadata JSON")
233    }
234
235    fn to_document_json(&self, pretty: bool) -> io::Result<String> {
236        reddb_file::encode_physical_metadata_document_root_json(
237            &self.to_document_envelope(),
238            pretty,
239        )
240        .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
241    }
242
243    fn from_document_json(text: &str, context: &'static str) -> io::Result<Self> {
244        let envelope =
245            reddb_file::decode_physical_metadata_document_root_json(text).map_err(|err| {
246                io::Error::new(io::ErrorKind::InvalidData, format!("{context}: {err}"))
247            })?;
248        Self::from_document_envelope(envelope)
249    }
250
251    fn to_document_envelope(&self) -> reddb_file::PhysicalMetadataDocumentEnvelope {
252        reddb_file::PhysicalMetadataDocumentEnvelope {
253            protocol_version: self.protocol_version.clone(),
254            generated_at_unix_ms: self.generated_at_unix_ms,
255            last_loaded_from: self.last_loaded_from.clone(),
256            last_healed_at_unix_ms: self.last_healed_at_unix_ms,
257            manifest_json: manifest_to_json(&self.manifest).to_string_compact(),
258            catalog_json: catalog_to_json(&self.catalog).to_string_compact(),
259            manifest_events_json: self
260                .manifest_events
261                .iter()
262                .map(|event| manifest_event_to_json(event).to_string_compact())
263                .collect(),
264            indexes_json: self
265                .indexes
266                .iter()
267                .map(|index| index_state_to_json(index).to_string_compact())
268                .collect(),
269            graph_projections_json: self
270                .graph_projections
271                .iter()
272                .map(|projection| graph_projection_to_json(projection).to_string_compact())
273                .collect(),
274            analytics_jobs_json: self
275                .analytics_jobs
276                .iter()
277                .map(|job| analytics_job_to_json(job).to_string_compact())
278                .collect(),
279            tree_definitions_json: self
280                .tree_definitions
281                .iter()
282                .map(|definition| tree_definition_to_json(definition).to_string_compact())
283                .collect(),
284            collection_ttl_defaults_ms: self.collection_ttl_defaults_ms.clone(),
285            collection_contracts_json: self
286                .collection_contracts
287                .iter()
288                .map(|contract| collection_contract_to_json(contract).to_string_compact())
289                .collect(),
290            hypertables_json: self
291                .hypertables
292                .iter()
293                .map(|hypertable| hypertable_to_json(hypertable).to_string_compact())
294                .collect(),
295            exports_json: self
296                .exports
297                .iter()
298                .map(|export| export_descriptor_to_json(export).to_string_compact())
299                .collect(),
300            superblock_json: superblock_to_json(&self.superblock).to_string_compact(),
301            snapshots_json: self
302                .snapshots
303                .iter()
304                .map(|snapshot| snapshot_descriptor_to_json(snapshot).to_string_compact())
305                .collect(),
306        }
307    }
308
309    fn from_document_envelope(
310        envelope: reddb_file::PhysicalMetadataDocumentEnvelope,
311    ) -> io::Result<Self> {
312        Ok(Self {
313            protocol_version: envelope.protocol_version,
314            generated_at_unix_ms: envelope.generated_at_unix_ms,
315            last_loaded_from: envelope.last_loaded_from,
316            last_healed_at_unix_ms: envelope.last_healed_at_unix_ms,
317            manifest: manifest_from_json(&parse_document_fragment(
318                &envelope.manifest_json,
319                "manifest",
320            )?)?,
321            catalog: catalog_from_json(&parse_document_fragment(
322                &envelope.catalog_json,
323                "catalog",
324            )?)?,
325            manifest_events: parse_document_fragments(
326                &envelope.manifest_events_json,
327                "manifest_events",
328                manifest_event_from_json,
329            )?,
330            indexes: parse_document_fragments(
331                &envelope.indexes_json,
332                "indexes",
333                index_state_from_json,
334            )?,
335            graph_projections: parse_document_fragments(
336                &envelope.graph_projections_json,
337                "graph_projections",
338                graph_projection_from_json,
339            )?,
340            analytics_jobs: parse_document_fragments(
341                &envelope.analytics_jobs_json,
342                "analytics_jobs",
343                analytics_job_from_json,
344            )?,
345            tree_definitions: parse_document_fragments(
346                &envelope.tree_definitions_json,
347                "tree_definitions",
348                tree_definition_from_json,
349            )?,
350            collection_ttl_defaults_ms: envelope.collection_ttl_defaults_ms,
351            collection_contracts: parse_document_fragments(
352                &envelope.collection_contracts_json,
353                "collection_contracts",
354                collection_contract_from_json,
355            )?,
356            hypertables: parse_document_fragments(
357                &envelope.hypertables_json,
358                "hypertables",
359                hypertable_from_json,
360            )?,
361            exports: parse_document_fragments(
362                &envelope.exports_json,
363                "exports",
364                export_descriptor_from_json,
365            )?,
366            superblock: superblock_from_json(&parse_document_fragment(
367                &envelope.superblock_json,
368                "superblock",
369            )?)?,
370            snapshots: parse_document_fragments(
371                &envelope.snapshots_json,
372                "snapshots",
373                snapshot_descriptor_from_json,
374            )?,
375        })
376    }
377}
378
379fn parse_document_fragment(json: &str, field: &'static str) -> io::Result<JsonValue> {
380    crate::json::from_str::<JsonValue>(json)
381        .map_err(|err| invalid_data(format!("invalid physical metadata field '{field}': {err}")))
382}
383
384fn parse_document_fragments<T>(
385    fragments: &[String],
386    field: &'static str,
387    decode: fn(&JsonValue) -> io::Result<T>,
388) -> io::Result<Vec<T>> {
389    fragments
390        .iter()
391        .map(|fragment| decode(&parse_document_fragment(fragment, field)?))
392        .collect()
393}