1use super::*;
2use crate::api::REDDB_FORMAT_VERSION;
3
4impl PhysicalMetadataFile {
5 pub fn from_state(
6 options: RedDBOptions,
7 catalog: CatalogSnapshot,
8 collection_roots: BTreeMap<String, u64>,
9 indexes: Vec<PhysicalIndexState>,
10 previous: Option<&PhysicalMetadataFile>,
11 ) -> Self {
12 let now = unix_ms_now();
13 let mut manifest = SchemaManifest::now(options, catalog.total_collections);
14 if let Some(previous) = previous {
15 manifest.created_at_unix_ms = previous.manifest.created_at_unix_ms;
16 }
17 manifest.updated_at_unix_ms = now;
18
19 let sequence = previous
20 .map(|previous| previous.superblock.sequence.saturating_add(1))
21 .unwrap_or(1);
22 let mut manifest_events = previous
23 .map(|previous| previous.manifest_events.clone())
24 .unwrap_or_default();
25 manifest_events.extend(build_manifest_events(
26 previous.map(|previous| &previous.superblock.collection_roots),
27 &collection_roots,
28 sequence,
29 ));
30 trim_manifest_history(&mut manifest_events);
31
32 let superblock = SuperblockHeader {
33 format_version: REDDB_FORMAT_VERSION,
34 sequence,
35 copies: DEFAULT_SUPERBLOCK_COPIES,
36 manifest: previous
37 .map(|previous| previous.superblock.manifest.clone())
38 .unwrap_or_default(),
39 free_set: previous
40 .map(|previous| previous.superblock.free_set)
41 .unwrap_or_default(),
42 collection_roots,
43 };
44
45 let mut snapshots = previous
46 .map(|previous| previous.snapshots.clone())
47 .unwrap_or_default();
48 snapshots.push(SnapshotDescriptor {
49 snapshot_id: sequence,
50 created_at_unix_ms: now,
51 superblock_sequence: sequence,
52 collection_count: catalog.total_collections,
53 total_entities: catalog.total_entities,
54 });
55 trim_snapshot_history(&mut snapshots, manifest.options.snapshot_retention);
56
57 Self {
58 protocol_version: PHYSICAL_METADATA_PROTOCOL_VERSION.to_string(),
59 generated_at_unix_ms: now,
60 last_loaded_from: previous.and_then(|previous| previous.last_loaded_from.clone()),
61 last_healed_at_unix_ms: previous.and_then(|previous| previous.last_healed_at_unix_ms),
62 manifest,
63 catalog,
64 manifest_events,
65 indexes,
66 graph_projections: previous
67 .map(|previous| previous.graph_projections.clone())
68 .unwrap_or_default(),
69 analytics_jobs: previous
70 .map(|previous| previous.analytics_jobs.clone())
71 .unwrap_or_default(),
72 tree_definitions: previous
73 .map(|previous| previous.tree_definitions.clone())
74 .unwrap_or_default(),
75 collection_ttl_defaults_ms: previous
76 .map(|previous| previous.collection_ttl_defaults_ms.clone())
77 .unwrap_or_default(),
78 collection_contracts: previous
79 .map(|previous| previous.collection_contracts.clone())
80 .unwrap_or_default(),
81 hypertables: previous
82 .map(|previous| previous.hypertables.clone())
83 .unwrap_or_default(),
84 exports: previous
85 .map(|previous| previous.exports.clone())
86 .unwrap_or_default(),
87 superblock,
88 snapshots,
89 }
90 }
91
92 pub fn metadata_path_for(data_path: &Path) -> PathBuf {
93 reddb_file::layout::physical_metadata_json_path(data_path)
94 }
95
96 pub fn metadata_binary_path_for(data_path: &Path) -> PathBuf {
97 reddb_file::layout::physical_metadata_binary_path(data_path)
98 }
99
100 pub fn metadata_journal_path_for(data_path: &Path, sequence: u64) -> PathBuf {
101 reddb_file::layout::physical_metadata_journal_path(data_path, sequence)
102 }
103
104 pub fn export_data_path_for(data_path: &Path, name: &str) -> PathBuf {
105 reddb_file::layout::physical_export_data_path(data_path, name)
106 }
107
108 pub fn load_for_data_path(data_path: &Path) -> io::Result<Self> {
109 Self::load_for_data_path_with_source(data_path).map(|(metadata, _)| metadata)
110 }
111
112 pub fn load_for_data_path_with_source(
113 data_path: &Path,
114 ) -> io::Result<(Self, PhysicalMetadataSource)> {
115 let binary_path = Self::metadata_binary_path_for(data_path);
116 if binary_path.exists() {
117 match Self::load_from_binary_path(&binary_path) {
118 Ok(metadata) => {
119 return Ok((metadata, PhysicalMetadataSource::Binary));
120 }
121 Err(_) => {
122 let mut journal_paths = Self::journal_paths_for_data_path(data_path)?;
123 journal_paths.reverse();
124 for journal_path in journal_paths {
125 if let Ok(metadata) = Self::load_from_binary_path(&journal_path) {
126 let healed =
127 metadata.mark_recovery(PhysicalMetadataSource::BinaryJournal);
128 let _ = healed.heal_primary_metadata_for_data_path(data_path);
129 return Ok((healed, PhysicalMetadataSource::BinaryJournal));
130 }
131 }
132 }
133 }
134 }
135 Self::load_from_path(&Self::metadata_path_for(data_path)).map(|metadata| {
136 let healed = metadata.mark_recovery(PhysicalMetadataSource::Json);
137 let _ = healed.heal_primary_metadata_for_data_path(data_path);
138 (healed, PhysicalMetadataSource::Json)
139 })
140 }
141
142 pub fn save_for_data_path(&self, data_path: &Path) -> io::Result<PathBuf> {
143 let binary_path = Self::metadata_binary_path_for(data_path);
144 if binary_path.exists() && super::seqn_journal_enabled() {
145 let sequence = Self::load_from_binary_path(&binary_path)
146 .map(|metadata| metadata.superblock.sequence)
147 .unwrap_or(self.superblock.sequence);
148 let _ = reddb_file::copy_physical_metadata_binary_to_journal(
149 data_path,
150 &binary_path,
151 sequence,
152 );
153 }
154 self.save_to_binary_path(&binary_path)?;
155 self.prune_journal_for_data_path(data_path)?;
156 if super::meta_json_sidecar_enabled() {
157 let json_path = Self::metadata_path_for(data_path);
158 self.save_to_path(&json_path)?;
159 }
160 Ok(binary_path)
161 }
162
163 pub fn load_from_path(path: &Path) -> io::Result<Self> {
164 let text = reddb_file::read_physical_metadata_document(path).map_err(|err| {
165 io::Error::new(
166 io::ErrorKind::InvalidData,
167 format!("invalid physical metadata JSON: {err}"),
168 )
169 })?;
170 Self::from_document_json(&text, "invalid physical metadata JSON")
171 }
172
173 pub fn save_to_path(&self, path: &Path) -> io::Result<()> {
174 let text = self.to_document_json(true)?;
175 reddb_file::write_physical_metadata_json_document(path, &text)
176 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
177 }
178
179 pub fn load_from_binary_path(path: &Path) -> io::Result<Self> {
180 let text = reddb_file::read_physical_metadata_document(path).map_err(|err| {
181 io::Error::new(
182 io::ErrorKind::InvalidData,
183 format!("invalid physical metadata binary: {err}"),
184 )
185 })?;
186 Self::from_document_json(&text, "invalid physical metadata binary")
187 }
188
189 pub fn save_to_binary_path(&self, path: &Path) -> io::Result<()> {
190 let text = self.to_document_json(false)?;
191 reddb_file::write_physical_metadata_binary_document(path, &text)
192 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
193 }
194
195 pub fn journal_paths_for_data_path(data_path: &Path) -> io::Result<Vec<PathBuf>> {
196 reddb_file::list_physical_metadata_journal_paths(data_path)
197 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
198 }
199
200 pub fn prune_journal_for_data_path(&self, data_path: &Path) -> io::Result<()> {
201 let retention = super::seqn_journal_retention();
202 reddb_file::prune_physical_metadata_journal_paths(data_path, retention)
203 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
204 }
205
206 pub fn heal_primary_metadata_for_data_path(&self, data_path: &Path) -> io::Result<()> {
207 let binary_path = Self::metadata_binary_path_for(data_path);
208 self.save_to_binary_path(&binary_path)?;
209 if super::meta_json_sidecar_enabled() {
210 let json_path = Self::metadata_path_for(data_path);
211 self.save_to_path(&json_path)?;
212 }
213 Ok(())
214 }
215
216 pub fn mark_recovery(&self, source: PhysicalMetadataSource) -> Self {
217 let mut metadata = self.clone();
218 metadata.last_loaded_from = Some(source.as_str().to_string());
219 metadata.last_healed_at_unix_ms = Some(unix_ms_now());
220 metadata
221 }
222
223 pub fn to_json_value(&self) -> JsonValue {
224 let json = self
225 .to_document_json(false)
226 .expect("physical metadata must encode as JSON");
227 crate::json::from_str::<JsonValue>(&json)
228 .expect("reddb-file emitted JSON the server can parse")
229 }
230
231 pub fn from_json_value(value: &JsonValue) -> io::Result<Self> {
232 Self::from_document_json(&value.to_string_compact(), "invalid physical metadata JSON")
233 }
234
235 fn to_document_json(&self, pretty: bool) -> io::Result<String> {
236 reddb_file::encode_physical_metadata_document_root_json(
237 &self.to_document_envelope(),
238 pretty,
239 )
240 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
241 }
242
243 fn from_document_json(text: &str, context: &'static str) -> io::Result<Self> {
244 let envelope =
245 reddb_file::decode_physical_metadata_document_root_json(text).map_err(|err| {
246 io::Error::new(io::ErrorKind::InvalidData, format!("{context}: {err}"))
247 })?;
248 Self::from_document_envelope(envelope)
249 }
250
251 fn to_document_envelope(&self) -> reddb_file::PhysicalMetadataDocumentEnvelope {
252 reddb_file::PhysicalMetadataDocumentEnvelope {
253 protocol_version: self.protocol_version.clone(),
254 generated_at_unix_ms: self.generated_at_unix_ms,
255 last_loaded_from: self.last_loaded_from.clone(),
256 last_healed_at_unix_ms: self.last_healed_at_unix_ms,
257 manifest_json: manifest_to_json(&self.manifest).to_string_compact(),
258 catalog_json: catalog_to_json(&self.catalog).to_string_compact(),
259 manifest_events_json: self
260 .manifest_events
261 .iter()
262 .map(|event| manifest_event_to_json(event).to_string_compact())
263 .collect(),
264 indexes_json: self
265 .indexes
266 .iter()
267 .map(|index| index_state_to_json(index).to_string_compact())
268 .collect(),
269 graph_projections_json: self
270 .graph_projections
271 .iter()
272 .map(|projection| graph_projection_to_json(projection).to_string_compact())
273 .collect(),
274 analytics_jobs_json: self
275 .analytics_jobs
276 .iter()
277 .map(|job| analytics_job_to_json(job).to_string_compact())
278 .collect(),
279 tree_definitions_json: self
280 .tree_definitions
281 .iter()
282 .map(|definition| tree_definition_to_json(definition).to_string_compact())
283 .collect(),
284 collection_ttl_defaults_ms: self.collection_ttl_defaults_ms.clone(),
285 collection_contracts_json: self
286 .collection_contracts
287 .iter()
288 .map(|contract| collection_contract_to_json(contract).to_string_compact())
289 .collect(),
290 hypertables_json: self
291 .hypertables
292 .iter()
293 .map(|hypertable| hypertable_to_json(hypertable).to_string_compact())
294 .collect(),
295 exports_json: self
296 .exports
297 .iter()
298 .map(|export| export_descriptor_to_json(export).to_string_compact())
299 .collect(),
300 superblock_json: superblock_to_json(&self.superblock).to_string_compact(),
301 snapshots_json: self
302 .snapshots
303 .iter()
304 .map(|snapshot| snapshot_descriptor_to_json(snapshot).to_string_compact())
305 .collect(),
306 }
307 }
308
309 fn from_document_envelope(
310 envelope: reddb_file::PhysicalMetadataDocumentEnvelope,
311 ) -> io::Result<Self> {
312 Ok(Self {
313 protocol_version: envelope.protocol_version,
314 generated_at_unix_ms: envelope.generated_at_unix_ms,
315 last_loaded_from: envelope.last_loaded_from,
316 last_healed_at_unix_ms: envelope.last_healed_at_unix_ms,
317 manifest: manifest_from_json(&parse_document_fragment(
318 &envelope.manifest_json,
319 "manifest",
320 )?)?,
321 catalog: catalog_from_json(&parse_document_fragment(
322 &envelope.catalog_json,
323 "catalog",
324 )?)?,
325 manifest_events: parse_document_fragments(
326 &envelope.manifest_events_json,
327 "manifest_events",
328 manifest_event_from_json,
329 )?,
330 indexes: parse_document_fragments(
331 &envelope.indexes_json,
332 "indexes",
333 index_state_from_json,
334 )?,
335 graph_projections: parse_document_fragments(
336 &envelope.graph_projections_json,
337 "graph_projections",
338 graph_projection_from_json,
339 )?,
340 analytics_jobs: parse_document_fragments(
341 &envelope.analytics_jobs_json,
342 "analytics_jobs",
343 analytics_job_from_json,
344 )?,
345 tree_definitions: parse_document_fragments(
346 &envelope.tree_definitions_json,
347 "tree_definitions",
348 tree_definition_from_json,
349 )?,
350 collection_ttl_defaults_ms: envelope.collection_ttl_defaults_ms,
351 collection_contracts: parse_document_fragments(
352 &envelope.collection_contracts_json,
353 "collection_contracts",
354 collection_contract_from_json,
355 )?,
356 hypertables: parse_document_fragments(
357 &envelope.hypertables_json,
358 "hypertables",
359 hypertable_from_json,
360 )?,
361 exports: parse_document_fragments(
362 &envelope.exports_json,
363 "exports",
364 export_descriptor_from_json,
365 )?,
366 superblock: superblock_from_json(&parse_document_fragment(
367 &envelope.superblock_json,
368 "superblock",
369 )?)?,
370 snapshots: parse_document_fragments(
371 &envelope.snapshots_json,
372 "snapshots",
373 snapshot_descriptor_from_json,
374 )?,
375 })
376 }
377}
378
379fn parse_document_fragment(json: &str, field: &'static str) -> io::Result<JsonValue> {
380 crate::json::from_str::<JsonValue>(json)
381 .map_err(|err| invalid_data(format!("invalid physical metadata field '{field}': {err}")))
382}
383
384fn parse_document_fragments<T>(
385 fragments: &[String],
386 field: &'static str,
387 decode: fn(&JsonValue) -> io::Result<T>,
388) -> io::Result<Vec<T>> {
389 fragments
390 .iter()
391 .map(|fragment| decode(&parse_document_fragment(fragment, field)?))
392 .collect()
393}