1use super::*;
2
3impl RedDB {
4 pub fn quorum_coordinator(
11 &self,
12 ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13 self.quorum.as_ref()
14 }
15
16 pub fn wait_for_replication_quorum(
27 &self,
28 target_lsn: u64,
29 ) -> Result<(), crate::replication::QuorumError> {
30 match &self.quorum {
31 Some(q) => q.wait_for_quorum(target_lsn),
32 None => Ok(()),
33 }
34 }
35}
36
37impl RedDB {
38 pub(crate) fn native_registry_summary_from_metadata(
39 &self,
40 metadata: &PhysicalMetadataFile,
41 ) -> NativeRegistrySummary {
42 const SAMPLE_LIMIT: usize = 32;
43
44 let collection_names: Vec<_> = metadata
45 .catalog
46 .stats_by_collection
47 .keys()
48 .take(SAMPLE_LIMIT)
49 .cloned()
50 .collect();
51 let indexes: Vec<_> = metadata
52 .indexes
53 .iter()
54 .take(SAMPLE_LIMIT)
55 .map(|index| NativeRegistryIndexSummary {
56 name: index.name.clone(),
57 kind: index.kind.as_str().to_string(),
58 collection: index.collection.clone(),
59 enabled: index.enabled,
60 entries: index.entries as u64,
61 estimated_memory_bytes: index.estimated_memory_bytes,
62 last_refresh_ms: index.last_refresh_ms,
63 backend: index.backend.clone(),
64 })
65 .collect();
66 let graph_projections: Vec<_> = metadata
67 .graph_projections
68 .iter()
69 .take(SAMPLE_LIMIT)
70 .map(|projection| NativeRegistryProjectionSummary {
71 name: projection.name.clone(),
72 source: projection.source.clone(),
73 created_at_unix_ms: projection.created_at_unix_ms,
74 updated_at_unix_ms: projection.updated_at_unix_ms,
75 node_labels: projection.node_labels.clone(),
76 node_types: projection.node_types.clone(),
77 edge_labels: projection.edge_labels.clone(),
78 last_materialized_sequence: projection.last_materialized_sequence,
79 })
80 .collect();
81 let analytics_jobs = metadata
82 .analytics_jobs
83 .iter()
84 .take(SAMPLE_LIMIT)
85 .map(|job| NativeRegistryJobSummary {
86 id: job.id.clone(),
87 kind: job.kind.clone(),
88 projection: job.projection.clone(),
89 state: job.state.clone(),
90 created_at_unix_ms: job.created_at_unix_ms,
91 updated_at_unix_ms: job.updated_at_unix_ms,
92 last_run_sequence: job.last_run_sequence,
93 metadata: job.metadata.clone(),
94 })
95 .collect::<Vec<_>>();
96 let vector_artifacts = self
97 .native_vector_artifact_records()
98 .into_iter()
99 .map(|(summary, _)| summary)
100 .take(SAMPLE_LIMIT)
101 .collect::<Vec<_>>();
102 let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104 NativeRegistrySummary {
105 collection_count: metadata.catalog.total_collections as u32,
106 index_count: metadata.indexes.len() as u32,
107 graph_projection_count: metadata.graph_projections.len() as u32,
108 analytics_job_count: metadata.analytics_jobs.len() as u32,
109 vector_artifact_count,
110 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111 indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112 graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113 analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114 vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115 omitted_collection_count: metadata
116 .catalog
117 .stats_by_collection
118 .len()
119 .saturating_sub(collection_names.len())
120 as u32,
121 omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122 omitted_graph_projection_count: metadata
123 .graph_projections
124 .len()
125 .saturating_sub(graph_projections.len())
126 as u32,
127 omitted_analytics_job_count: metadata
128 .analytics_jobs
129 .len()
130 .saturating_sub(analytics_jobs.len())
131 as u32,
132 omitted_vector_artifact_count: vector_artifact_count
133 .saturating_sub(vector_artifacts.len() as u32),
134 collection_names,
135 indexes,
136 graph_projections,
137 analytics_jobs,
138 vector_artifacts,
139 }
140 }
141
142 fn native_vector_artifact_collection_count(&self) -> usize {
143 self.native_vector_artifact_records().len()
144 }
145
146 pub(crate) fn native_vector_artifact_records(
147 &self,
148 ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149 let mut artifacts = Vec::new();
150 for collection in self.store.list_collections() {
151 let Some(manager) = self.store.get_collection(&collection) else {
152 continue;
153 };
154 let entities = manager.query_all(|_| true);
155 let mut vectors = Vec::new();
156 let mut graph_edges = Vec::new();
157 let mut fulltext_documents = Vec::new();
158 let mut document_records = Vec::new();
159 for entity in entities {
160 match entity.data {
161 EntityData::Vector(vector) => {
162 if !vector.dense.is_empty() {
163 vectors.push((entity.id, vector.dense));
164 }
165 }
166 EntityData::Edge(edge) => {
167 if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168 graph_edges.push((
169 entity.id,
170 edge_kind.from_node,
171 edge_kind.to_node,
172 edge_kind.label,
173 edge.weight,
174 ));
175 }
176 }
177 data => {
178 let text = Self::native_fulltext_text_for_entity(&data);
179 if !text.trim().is_empty() {
180 fulltext_documents.push((entity.id, text));
181 }
182 if let Some(document) =
183 Self::native_document_pathvalue_for_entity(entity.id, &data)
184 {
185 document_records.push(document);
186 }
187 }
188 }
189 }
190 if !vectors.is_empty() {
191 let dimension = vectors[0].1.len();
192 let mut hnsw = HnswIndex::with_dimension(dimension);
193 for (id, vector) in vectors
194 .into_iter()
195 .filter(|(_, vector)| vector.len() == dimension)
196 {
197 hnsw.insert_with_id(id.raw(), vector);
198 }
199 let stats = hnsw.stats();
200 let bytes = hnsw.to_bytes();
201 let summary = NativeVectorArtifactSummary {
202 collection: collection.clone(),
203 artifact_kind: "hnsw".to_string(),
204 vector_count: stats.node_count as u64,
205 dimension: stats.dimension as u32,
206 max_layer: stats.max_layer as u32,
207 serialized_bytes: bytes.len() as u64,
208 checksum: crate::storage::engine::crc32(&bytes) as u64,
209 };
210 artifacts.push((summary, bytes));
211
212 let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213 let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214 let training = manager
215 .query_all(|_| true)
216 .into_iter()
217 .filter_map(|entity| match entity.data {
218 EntityData::Vector(vector) if vector.dense.len() == dimension => {
219 Some(vector.dense)
220 }
221 _ => None,
222 })
223 .collect::<Vec<_>>();
224 ivf.train(&training);
225 let items = manager
226 .query_all(|_| true)
227 .into_iter()
228 .filter_map(|entity| match entity.data {
229 EntityData::Vector(vector) if vector.dense.len() == dimension => {
230 Some((entity.id.raw(), vector.dense))
231 }
232 _ => None,
233 })
234 .collect::<Vec<_>>();
235 ivf.add_batch_with_ids(items);
236 let ivf_stats = ivf.stats();
237 let ivf_bytes = ivf.to_bytes();
238 let ivf_summary = NativeVectorArtifactSummary {
239 collection: collection.clone(),
240 artifact_kind: "ivf".to_string(),
241 vector_count: ivf_stats.total_vectors as u64,
242 dimension: ivf_stats.dimension as u32,
243 max_layer: ivf_stats.n_lists as u32,
244 serialized_bytes: ivf_bytes.len() as u64,
245 checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246 };
247 artifacts.push((ivf_summary, ivf_bytes));
248 }
249
250 if !graph_edges.is_empty() {
251 let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252 let (edge_count, node_count, label_count) =
253 Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254 let summary = NativeVectorArtifactSummary {
255 collection: collection.clone(),
256 artifact_kind: "graph.adjacency".to_string(),
257 vector_count: edge_count,
258 dimension: node_count as u32,
259 max_layer: label_count,
260 serialized_bytes: bytes.len() as u64,
261 checksum: crate::storage::engine::crc32(&bytes) as u64,
262 };
263 artifacts.push((summary, bytes));
264 }
265
266 if !fulltext_documents.is_empty() {
267 let bytes =
268 Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269 let (doc_count, term_count, posting_count) =
270 Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271 let summary = NativeVectorArtifactSummary {
272 collection: collection.clone(),
273 artifact_kind: "text.fulltext".to_string(),
274 vector_count: posting_count,
275 dimension: doc_count as u32,
276 max_layer: term_count as u32,
277 serialized_bytes: bytes.len() as u64,
278 checksum: crate::storage::engine::crc32(&bytes) as u64,
279 };
280 artifacts.push((summary, bytes));
281 }
282
283 if !document_records.is_empty() {
284 let bytes = Self::serialize_native_document_pathvalue_artifact(
285 &collection,
286 &document_records,
287 );
288 let (doc_count, path_count, value_count, unique_value_count) =
289 Self::inspect_native_document_pathvalue_artifact(&bytes)
290 .unwrap_or((0, 0, 0, 0));
291 let _ = unique_value_count;
292 let summary = NativeVectorArtifactSummary {
293 collection: collection.clone(),
294 artifact_kind: "document.pathvalue".to_string(),
295 vector_count: value_count,
296 dimension: doc_count as u32,
297 max_layer: path_count as u32,
298 serialized_bytes: bytes.len() as u64,
299 checksum: crate::storage::engine::crc32(&bytes) as u64,
300 };
301 artifacts.push((summary, bytes));
302 }
303 }
304 artifacts
305 }
306
307 fn serialize_native_graph_adjacency_artifact(
308 edges: &[(EntityId, String, String, String, f32)],
309 ) -> Vec<u8> {
310 let mut data = Vec::with_capacity(32 + edges.len() * 48);
311 data.extend_from_slice(b"RDGA");
312 data.extend_from_slice(&(edges.len() as u32).to_le_bytes());
313 for (edge_id, from_node, to_node, label, weight) in edges {
314 data.extend_from_slice(&edge_id.raw().to_le_bytes());
315 Self::push_native_artifact_string(&mut data, from_node);
316 Self::push_native_artifact_string(&mut data, to_node);
317 Self::push_native_artifact_string(&mut data, label);
318 data.extend_from_slice(&weight.to_le_bytes());
319 }
320 data
321 }
322
323 pub(crate) fn inspect_native_graph_adjacency_artifact(
324 bytes: &[u8],
325 ) -> Result<(u64, u64, u32), String> {
326 if bytes.len() < 8 || &bytes[0..4] != b"RDGA" {
327 return Err("invalid graph adjacency artifact".to_string());
328 }
329 let mut pos = 4usize;
330 let edge_count =
331 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
332 as usize;
333 pos += 4;
334 let mut nodes = BTreeSet::new();
335 let mut labels = BTreeSet::new();
336 for _ in 0..edge_count {
337 if pos + 8 > bytes.len() {
338 return Err("truncated graph adjacency artifact".to_string());
339 }
340 pos += 8;
341 let from = Self::read_native_artifact_string(bytes, &mut pos)?;
342 let to = Self::read_native_artifact_string(bytes, &mut pos)?;
343 let label = Self::read_native_artifact_string(bytes, &mut pos)?;
344 if pos + 4 > bytes.len() {
345 return Err("truncated graph adjacency artifact weight".to_string());
346 }
347 pos += 4;
348 nodes.insert(from);
349 nodes.insert(to);
350 labels.insert(label);
351 }
352 Ok((edge_count as u64, nodes.len() as u64, labels.len() as u32))
353 }
354
355 fn serialize_native_fulltext_artifact(
356 collection: &str,
357 documents: &[(EntityId, String)],
358 ) -> Vec<u8> {
359 let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
360 for (entity_id, text) in documents {
361 let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
362 for token in Self::native_fulltext_tokenize(text) {
363 *frequencies.entry(token).or_insert(0) += 1;
364 }
365 for (token, count) in frequencies {
366 postings
367 .entry(token)
368 .or_default()
369 .push((entity_id.raw(), count));
370 }
371 }
372
373 let mut data = Vec::with_capacity(64 + postings.len() * 32);
374 data.extend_from_slice(b"RDFT");
375 Self::push_native_artifact_string(&mut data, collection);
376 data.extend_from_slice(&(documents.len() as u32).to_le_bytes());
377 data.extend_from_slice(&(postings.len() as u32).to_le_bytes());
378 for (term, entries) in postings {
379 Self::push_native_artifact_string(&mut data, &term);
380 data.extend_from_slice(&(entries.len() as u32).to_le_bytes());
381 for (entity_id, term_count) in entries {
382 data.extend_from_slice(&entity_id.to_le_bytes());
383 data.extend_from_slice(&term_count.to_le_bytes());
384 }
385 }
386 data
387 }
388
389 pub(crate) fn inspect_native_fulltext_artifact(
390 bytes: &[u8],
391 ) -> Result<(u64, u64, u64), String> {
392 if bytes.len() < 12 || &bytes[0..4] != b"RDFT" {
393 return Err("invalid fulltext artifact".to_string());
394 }
395 let mut pos = 4usize;
396 let _collection = Self::read_native_artifact_string(bytes, &mut pos)?;
397 if pos + 8 > bytes.len() {
398 return Err("truncated fulltext artifact".to_string());
399 }
400 let doc_count =
401 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
402 pos += 4;
403 let term_count =
404 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
405 pos += 4;
406 let mut posting_count = 0u64;
407 for _ in 0..term_count {
408 let _term = Self::read_native_artifact_string(bytes, &mut pos)?;
409 if pos + 4 > bytes.len() {
410 return Err("truncated fulltext posting count".to_string());
411 }
412 let entry_count =
413 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
414 as u64;
415 pos += 4;
416 posting_count += entry_count;
417 let bytes_needed = entry_count as usize * 12;
418 if pos + bytes_needed > bytes.len() {
419 return Err("truncated fulltext postings".to_string());
420 }
421 pos += bytes_needed;
422 }
423 Ok((doc_count, term_count, posting_count))
424 }
425
426 fn serialize_native_document_pathvalue_artifact(
427 collection: &str,
428 documents: &[(EntityId, Vec<(String, String)>)],
429 ) -> Vec<u8> {
430 let total_entries: usize = documents.iter().map(|(_, entries)| entries.len()).sum();
431 let mut data = Vec::with_capacity(64 + total_entries * 48);
432 data.extend_from_slice(b"RDDP");
433 Self::push_native_artifact_string(&mut data, collection);
434 data.extend_from_slice(&(documents.len() as u32).to_le_bytes());
435 data.extend_from_slice(&(total_entries as u32).to_le_bytes());
436 for (entity_id, entries) in documents {
437 data.extend_from_slice(&entity_id.raw().to_le_bytes());
438 data.extend_from_slice(&(entries.len() as u32).to_le_bytes());
439 for (path, value) in entries {
440 Self::push_native_artifact_string(&mut data, path);
441 Self::push_native_artifact_string(&mut data, value);
442 }
443 }
444 data
445 }
446
447 pub(crate) fn inspect_native_document_pathvalue_artifact(
448 bytes: &[u8],
449 ) -> Result<(u64, u64, u64, u64), String> {
450 if bytes.len() < 12 || &bytes[0..4] != b"RDDP" {
451 return Err("invalid document path/value artifact".to_string());
452 }
453 let mut pos = 4usize;
454 let _collection = Self::read_native_artifact_string(bytes, &mut pos)?;
455 if pos + 8 > bytes.len() {
456 return Err("truncated document path/value artifact".to_string());
457 }
458 let doc_count =
459 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
460 pos += 4;
461 let total_entries =
462 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
463 pos += 4;
464 let mut paths = BTreeSet::new();
465 let mut values = BTreeSet::new();
466 let mut seen_entries = 0u64;
467 for _ in 0..doc_count {
468 if pos + 12 > bytes.len() {
469 return Err("truncated document path/value record".to_string());
470 }
471 pos += 8;
472 let entry_count =
473 u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
474 as usize;
475 pos += 4;
476 for _ in 0..entry_count {
477 let path = Self::read_native_artifact_string(bytes, &mut pos)?;
478 let value = Self::read_native_artifact_string(bytes, &mut pos)?;
479 paths.insert(path);
480 values.insert(value);
481 seen_entries += 1;
482 }
483 }
484 if seen_entries != total_entries {
485 return Err("document path/value artifact entry count mismatch".to_string());
486 }
487 Ok((
488 doc_count,
489 paths.len() as u64,
490 total_entries,
491 values.len() as u64,
492 ))
493 }
494
495 fn native_document_pathvalue_for_entity(
496 entity_id: EntityId,
497 data: &EntityData,
498 ) -> Option<(EntityId, Vec<(String, String)>)> {
499 let mut entries = Vec::new();
500 match data {
501 EntityData::Row(row) => {
502 if let Some(named) = &row.named {
503 for (key, value) in named {
504 Self::collect_native_document_entries_from_value(key, value, &mut entries);
505 }
506 }
507 for (idx, value) in row.columns.iter().enumerate() {
508 let path = format!("columns[{idx}]");
509 Self::collect_native_document_entries_from_value(&path, value, &mut entries);
510 }
511 }
512 EntityData::Node(node) => {
513 for (key, value) in &node.properties {
514 Self::collect_native_document_entries_from_value(key, value, &mut entries);
515 }
516 }
517 EntityData::Edge(edge) => {
518 for (key, value) in &edge.properties {
519 Self::collect_native_document_entries_from_value(key, value, &mut entries);
520 }
521 }
522 EntityData::Vector(_) => {}
523 EntityData::TimeSeries(_) => {}
524 EntityData::QueueMessage(_) => {}
525 }
526 if entries.is_empty() {
527 None
528 } else {
529 Some((entity_id, entries))
530 }
531 }
532
533 fn collect_native_document_entries_from_value(
534 path: &str,
535 value: &Value,
536 out: &mut Vec<(String, String)>,
537 ) {
538 match value {
539 Value::Json(bytes) | Value::Blob(bytes) => {
540 if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
541 Self::collect_native_document_entries_from_json(path, &json, out);
542 }
543 }
544 _ => {}
545 }
546 }
547
548 fn collect_native_document_entries_from_json(
549 path: &str,
550 value: &JsonValue,
551 out: &mut Vec<(String, String)>,
552 ) {
553 match value {
554 JsonValue::Object(entries) => {
555 for (key, value) in entries {
556 let next = if path.is_empty() {
557 key.clone()
558 } else {
559 format!("{path}.{key}")
560 };
561 Self::collect_native_document_entries_from_json(&next, value, out);
562 }
563 }
564 JsonValue::Array(items) => {
565 for (idx, value) in items.iter().enumerate() {
566 let next = format!("{path}[{idx}]");
567 Self::collect_native_document_entries_from_json(&next, value, out);
568 }
569 }
570 _ => {
571 if let Some(text) = Self::native_json_scalar_text(value) {
572 out.push((path.to_string(), text));
573 }
574 }
575 }
576 }
577
578 fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
579 match value {
580 JsonValue::Null => None,
581 JsonValue::Bool(value) => Some(value.to_string()),
582 JsonValue::Number(value) => Some(value.to_string()),
583 JsonValue::String(value) => Some(value.clone()),
584 JsonValue::Array(_) | JsonValue::Object(_) => None,
585 }
586 }
587
588 fn native_fulltext_text_for_entity(data: &EntityData) -> String {
589 match data {
590 EntityData::Row(row) => {
591 let mut parts = Vec::new();
592 if let Some(named) = &row.named {
593 for value in named.values() {
594 if let Some(text) = Self::native_value_text(value) {
595 parts.push(text);
596 }
597 }
598 }
599 for value in &row.columns {
600 if let Some(text) = Self::native_value_text(value) {
601 parts.push(text);
602 }
603 }
604 parts.join(" ")
605 }
606 EntityData::Node(node) => node
607 .properties
608 .values()
609 .filter_map(Self::native_value_text)
610 .collect::<Vec<_>>()
611 .join(" "),
612 EntityData::Edge(edge) => edge
613 .properties
614 .values()
615 .filter_map(Self::native_value_text)
616 .collect::<Vec<_>>()
617 .join(" "),
618 EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
619 EntityData::TimeSeries(ts) => ts.metric.clone(),
620 EntityData::QueueMessage(_) => String::new(),
621 }
622 }
623
624 fn native_value_text(value: &Value) -> Option<String> {
625 match value {
626 Value::Text(value) => Some(value.to_string()),
627 Value::Json(value) => String::from_utf8(value.clone()).ok(),
628 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
629 Value::Integer(value) => Some(value.to_string()),
630 Value::UnsignedInteger(value) => Some(value.to_string()),
631 Value::Float(value) => Some(value.to_string()),
632 Value::Boolean(value) => Some(value.to_string()),
633 Value::IpAddr(value) => Some(value.to_string()),
634 Value::NodeRef(value) => Some(value.clone()),
635 Value::EdgeRef(value) => Some(value.clone()),
636 Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
637 Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
638 Value::Timestamp(value) => Some(value.to_string()),
639 Value::Duration(value) => Some(value.to_string()),
640 Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
641 Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
642 Value::Email(s) => Some(s.clone()),
643 Value::Url(s) => Some(s.clone()),
644 Value::Phone(n) => Some(format!("+{}", n)),
645 Value::Semver(packed) => Some(format!(
646 "{}.{}.{}",
647 packed / 1_000_000,
648 (packed / 1_000) % 1_000,
649 packed % 1_000
650 )),
651 Value::Cidr(ip, prefix) => Some(format!(
652 "{}.{}.{}.{}/{}",
653 (ip >> 24) & 0xFF,
654 (ip >> 16) & 0xFF,
655 (ip >> 8) & 0xFF,
656 ip & 0xFF,
657 prefix
658 )),
659 Value::Date(days) => Some(days.to_string()),
660 Value::Time(ms) => {
661 let total_secs = ms / 1000;
662 Some(format!(
663 "{:02}:{:02}:{:02}",
664 total_secs / 3600,
665 (total_secs / 60) % 60,
666 total_secs % 60
667 ))
668 }
669 Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
670 Value::EnumValue(i) => Some(format!("enum({})", i)),
671 Value::Array(_) => None,
672 Value::TimestampMs(ms) => Some(ms.to_string()),
673 Value::Ipv4(ip) => Some(format!(
674 "{}.{}.{}.{}",
675 (ip >> 24) & 0xFF,
676 (ip >> 16) & 0xFF,
677 (ip >> 8) & 0xFF,
678 ip & 0xFF
679 )),
680 Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
681 Value::Subnet(ip, mask) => {
682 let prefix = mask.leading_ones();
683 Some(format!(
684 "{}.{}.{}.{}/{}",
685 (ip >> 24) & 0xFF,
686 (ip >> 16) & 0xFF,
687 (ip >> 8) & 0xFF,
688 ip & 0xFF,
689 prefix
690 ))
691 }
692 Value::Port(p) => Some(p.to_string()),
693 Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
694 Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
695 Value::GeoPoint(lat, lon) => Some(format!(
696 "{:.6},{:.6}",
697 *lat as f64 / 1_000_000.0,
698 *lon as f64 / 1_000_000.0
699 )),
700 Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
701 Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
702 Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
703 Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
704 Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
705 Value::AssetCode(code) => Some(code.clone()),
706 Value::Money { .. } => Some(value.display_string()),
707 Value::ColorAlpha([r, g, b, a]) => {
708 Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
709 }
710 Value::BigInt(v) => Some(v.to_string()),
711 Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
712 Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
713 Value::TableRef(name) => Some(name.clone()),
714 Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
715 Value::Secret(_) | Value::Password(_) => None,
716 }
717 }
718
719 fn native_fulltext_tokenize(text: &str) -> Vec<String> {
720 text.to_lowercase()
721 .split(|c: char| !c.is_alphanumeric())
722 .filter(|s| s.len() >= 2)
723 .map(|s| s.to_string())
724 .collect()
725 }
726
727 fn push_native_artifact_string(buf: &mut Vec<u8>, value: &str) {
728 let bytes = value.as_bytes();
729 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
730 buf.extend_from_slice(bytes);
731 }
732
733 fn read_native_artifact_string(bytes: &[u8], pos: &mut usize) -> Result<String, String> {
734 if *pos + 4 > bytes.len() {
735 return Err("truncated native artifact string length".to_string());
736 }
737 let len = u32::from_le_bytes([
738 bytes[*pos],
739 bytes[*pos + 1],
740 bytes[*pos + 2],
741 bytes[*pos + 3],
742 ]) as usize;
743 *pos += 4;
744 if *pos + len > bytes.len() {
745 return Err("truncated native artifact string content".to_string());
746 }
747 let value = std::str::from_utf8(&bytes[*pos..*pos + len])
748 .map_err(|_| "invalid utf-8 in native artifact".to_string())?
749 .to_string();
750 *pos += len;
751 Ok(value)
752 }
753
754 pub(crate) fn native_recovery_summary_from_metadata(
755 metadata: &PhysicalMetadataFile,
756 ) -> NativeRecoverySummary {
757 const SAMPLE_LIMIT: usize = 16;
758
759 let snapshots: Vec<_> = metadata
760 .snapshots
761 .iter()
762 .rev()
763 .take(SAMPLE_LIMIT)
764 .map(|snapshot| NativeSnapshotSummary {
765 snapshot_id: snapshot.snapshot_id,
766 created_at_unix_ms: snapshot.created_at_unix_ms,
767 superblock_sequence: snapshot.superblock_sequence,
768 collection_count: snapshot.collection_count as u32,
769 total_entities: snapshot.total_entities as u64,
770 })
771 .collect();
772 let exports: Vec<_> = metadata
773 .exports
774 .iter()
775 .rev()
776 .take(SAMPLE_LIMIT)
777 .map(|export| NativeExportSummary {
778 name: export.name.clone(),
779 created_at_unix_ms: export.created_at_unix_ms,
780 snapshot_id: export.snapshot_id,
781 superblock_sequence: export.superblock_sequence,
782 collection_count: export.collection_count as u32,
783 total_entities: export.total_entities as u64,
784 })
785 .collect();
786
787 NativeRecoverySummary {
788 snapshot_count: metadata.snapshots.len() as u32,
789 export_count: metadata.exports.len() as u32,
790 snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
791 exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
792 omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
793 omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
794 snapshots,
795 exports,
796 }
797 }
798
799 pub(crate) fn native_catalog_summary_from_metadata(
800 metadata: &PhysicalMetadataFile,
801 ) -> NativeCatalogSummary {
802 const SAMPLE_LIMIT: usize = 32;
803
804 let collections: Vec<_> = metadata
805 .catalog
806 .stats_by_collection
807 .iter()
808 .take(SAMPLE_LIMIT)
809 .map(|(name, stats)| NativeCatalogCollectionSummary {
810 name: name.clone(),
811 entities: stats.entities as u64,
812 cross_refs: stats.cross_refs as u64,
813 segments: stats.segments as u32,
814 })
815 .collect();
816
817 NativeCatalogSummary {
818 collection_count: metadata.catalog.total_collections as u32,
819 total_entities: metadata.catalog.total_entities as u64,
820 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
821 omitted_collection_count: metadata
822 .catalog
823 .stats_by_collection
824 .len()
825 .saturating_sub(collections.len()) as u32,
826 collections,
827 }
828 }
829
830 pub(crate) fn native_metadata_state_summary_from_metadata(
831 metadata: &PhysicalMetadataFile,
832 ) -> NativeMetadataStateSummary {
833 NativeMetadataStateSummary {
834 protocol_version: metadata.protocol_version.clone(),
835 generated_at_unix_ms: metadata.generated_at_unix_ms,
836 last_loaded_from: metadata.last_loaded_from.clone(),
837 last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
838 }
839 }
840
841 pub(crate) fn inspect_native_header_against_metadata(
842 native: PhysicalFileHeader,
843 metadata: &PhysicalMetadataFile,
844 ) -> NativeHeaderInspection {
845 let expected = Self::native_header_from_metadata(metadata);
846 let mut mismatches = Vec::new();
847
848 if native.format_version != expected.format_version {
849 mismatches.push(NativeHeaderMismatch {
850 field: "format_version",
851 native: native.format_version.to_string(),
852 expected: expected.format_version.to_string(),
853 });
854 }
855 if native.sequence != expected.sequence {
856 mismatches.push(NativeHeaderMismatch {
857 field: "sequence",
858 native: native.sequence.to_string(),
859 expected: expected.sequence.to_string(),
860 });
861 }
862 if native.manifest_oldest_root != expected.manifest_oldest_root {
863 mismatches.push(NativeHeaderMismatch {
864 field: "manifest_oldest_root",
865 native: native.manifest_oldest_root.to_string(),
866 expected: expected.manifest_oldest_root.to_string(),
867 });
868 }
869 if native.manifest_root != expected.manifest_root {
870 mismatches.push(NativeHeaderMismatch {
871 field: "manifest_root",
872 native: native.manifest_root.to_string(),
873 expected: expected.manifest_root.to_string(),
874 });
875 }
876 if native.free_set_root != expected.free_set_root {
877 mismatches.push(NativeHeaderMismatch {
878 field: "free_set_root",
879 native: native.free_set_root.to_string(),
880 expected: expected.free_set_root.to_string(),
881 });
882 }
883 if native.collection_root_count != expected.collection_root_count {
884 mismatches.push(NativeHeaderMismatch {
885 field: "collection_root_count",
886 native: native.collection_root_count.to_string(),
887 expected: expected.collection_root_count.to_string(),
888 });
889 }
890 if native.snapshot_count != expected.snapshot_count {
891 mismatches.push(NativeHeaderMismatch {
892 field: "snapshot_count",
893 native: native.snapshot_count.to_string(),
894 expected: expected.snapshot_count.to_string(),
895 });
896 }
897 if native.index_count != expected.index_count {
898 mismatches.push(NativeHeaderMismatch {
899 field: "index_count",
900 native: native.index_count.to_string(),
901 expected: expected.index_count.to_string(),
902 });
903 }
904 if native.catalog_collection_count != expected.catalog_collection_count {
905 mismatches.push(NativeHeaderMismatch {
906 field: "catalog_collection_count",
907 native: native.catalog_collection_count.to_string(),
908 expected: expected.catalog_collection_count.to_string(),
909 });
910 }
911 if native.catalog_total_entities != expected.catalog_total_entities {
912 mismatches.push(NativeHeaderMismatch {
913 field: "catalog_total_entities",
914 native: native.catalog_total_entities.to_string(),
915 expected: expected.catalog_total_entities.to_string(),
916 });
917 }
918 if native.export_count != expected.export_count {
919 mismatches.push(NativeHeaderMismatch {
920 field: "export_count",
921 native: native.export_count.to_string(),
922 expected: expected.export_count.to_string(),
923 });
924 }
925 if native.graph_projection_count != expected.graph_projection_count {
926 mismatches.push(NativeHeaderMismatch {
927 field: "graph_projection_count",
928 native: native.graph_projection_count.to_string(),
929 expected: expected.graph_projection_count.to_string(),
930 });
931 }
932 if native.analytics_job_count != expected.analytics_job_count {
933 mismatches.push(NativeHeaderMismatch {
934 field: "analytics_job_count",
935 native: native.analytics_job_count.to_string(),
936 expected: expected.analytics_job_count.to_string(),
937 });
938 }
939 if native.manifest_event_count != expected.manifest_event_count {
940 mismatches.push(NativeHeaderMismatch {
941 field: "manifest_event_count",
942 native: native.manifest_event_count.to_string(),
943 expected: expected.manifest_event_count.to_string(),
944 });
945 }
946
947 NativeHeaderInspection {
948 native,
949 expected,
950 consistent: mismatches.is_empty(),
951 mismatches,
952 }
953 }
954
955 pub(crate) fn repair_policy_for_inspection(
956 inspection: &NativeHeaderInspection,
957 ) -> NativeHeaderRepairPolicy {
958 if inspection.consistent {
959 return NativeHeaderRepairPolicy::InSync;
960 }
961
962 if inspection.expected.sequence >= inspection.native.sequence {
963 NativeHeaderRepairPolicy::RepairNativeFromMetadata
964 } else {
965 NativeHeaderRepairPolicy::NativeAheadOfMetadata
966 }
967 }
968
969 pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
970 let retention = self.options.export_retention.max(1);
971 if exports.len() <= retention {
972 return;
973 }
974
975 exports.sort_by_key(|export| export.created_at_unix_ms);
976 let removed: Vec<ExportDescriptor> =
977 exports.drain(0..(exports.len() - retention)).collect();
978
979 for export in removed {
980 let _ = fs::remove_file(&export.data_path);
981 let _ = fs::remove_file(&export.metadata_path);
982 let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
983 &export.data_path,
984 ));
985 let _ = fs::remove_file(binary_path);
986 }
987 }
988
989 pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
990 let mut catalog = IndexCatalog::register_default_vector_graph(
991 self.options.has_capability(Capability::Table),
992 self.options.has_capability(Capability::Graph),
993 );
994 if self.options.has_capability(Capability::FullText) {
995 catalog.register(RuntimeIndexConfig::new(
996 "text-fulltext",
997 IndexKind::FullText,
998 ));
999 catalog.register(RuntimeIndexConfig::new(
1000 "document-pathvalue",
1001 IndexKind::DocumentPathValue,
1002 ));
1003 }
1004 catalog.register(RuntimeIndexConfig::new(
1005 "search-hybrid",
1006 IndexKind::HybridSearch,
1007 ));
1008 catalog
1009 }
1010
1011 pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
1012 let catalog = self.runtime_index_catalog();
1016 let snapshot = crate::catalog::snapshot_store_with_declarations(
1017 "reddb",
1018 self.store.as_ref(),
1019 Some(&catalog),
1020 None, None, );
1023 let mut metrics_by_name = std::collections::BTreeMap::new();
1024 for metric in &snapshot.indices {
1025 metrics_by_name.insert(metric.name.clone(), metric.clone());
1026 }
1027
1028 let mut states = Vec::new();
1029 for collection in snapshot.collections {
1030 for index_name in &collection.indices {
1031 let metric = metrics_by_name.get(index_name);
1032 let kind = metric
1033 .map(|metric| metric.kind)
1034 .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
1035 let entries = estimate_index_entries(&collection, kind);
1036 states.push(PhysicalIndexState {
1037 name: format!("{}::{}", collection.name, index_name),
1038 kind,
1039 collection: Some(collection.name.clone()),
1040 enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
1041 entries,
1042 estimated_memory_bytes: estimate_index_memory(entries, kind),
1043 last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
1044 backend: index_backend_name(kind).to_string(),
1045 artifact_kind: None,
1046 artifact_root_page: None,
1047 artifact_checksum: None,
1048 build_state: "catalog-derived".to_string(),
1049 });
1050 }
1051 }
1052
1053 states
1054 }
1055
1056 pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
1057 let mut roots = BTreeMap::new();
1058
1059 for name in self.store.list_collections() {
1060 let Some(manager) = self.store.get_collection(&name) else {
1061 continue;
1062 };
1063
1064 let stats = manager.stats();
1065 let mut root = fnv1a_seed();
1066 fnv1a_hash_value(&mut root, &name);
1067 fnv1a_hash_value(&mut root, &stats.total_entities);
1068 fnv1a_hash_value(&mut root, &stats.growing_count);
1069 fnv1a_hash_value(&mut root, &stats.sealed_count);
1070 fnv1a_hash_value(&mut root, &stats.archived_count);
1071 fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
1072 fnv1a_hash_value(&mut root, &stats.seal_ops);
1073 fnv1a_hash_value(&mut root, &stats.compact_ops);
1074
1075 let mut entities = manager.query_all(|_| true);
1076 entities.sort_by_key(|entity| entity.id.raw());
1077
1078 for entity in entities {
1079 fnv1a_hash_value(&mut root, &entity.id.raw());
1080 fnv1a_hash_value(&mut root, &entity.kind);
1081 fnv1a_hash_value(&mut root, &entity.created_at);
1082 fnv1a_hash_value(&mut root, &entity.updated_at);
1083 fnv1a_hash_value(&mut root, &entity.data);
1084 fnv1a_hash_value(&mut root, &entity.sequence_id);
1085 fnv1a_hash_value(&mut root, &entity.embeddings().len());
1086 fnv1a_hash_value(&mut root, &entity.cross_refs().len());
1087 }
1088
1089 roots.insert(name, root);
1090 }
1091
1092 roots
1093 }
1094
1095 pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
1101 TableRef::new(table, row_id)
1102 }
1103
1104 pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
1106 NodeRef::new(collection, node_id)
1107 }
1108
1109 pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
1111 VectorRef::new(collection, vector_id)
1112 }
1113
1114 pub fn query(&self) -> QueryBuilder {
1120 QueryBuilder::new(self.store.clone())
1121 }
1122
1123 pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1130 if self.store.get_collection(collection).is_none() {
1131 return Vec::new();
1132 }
1133
1134 if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1136 let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1137 let results = hnsw.search(vector, k);
1138 let mapped = self.hnsw_results_to_similar(collection, &results);
1139 if !mapped.is_empty() {
1140 return mapped;
1141 }
1142 }
1143
1144 self.similar_brute_force(collection, vector, k)
1146 }
1147
1148 fn similar_brute_force(
1150 &self,
1151 collection: &str,
1152 vector: &[f32],
1153 k: usize,
1154 ) -> Vec<SimilarResult> {
1155 let manager = match self.store.get_collection(collection) {
1156 Some(m) => m,
1157 None => return Vec::new(),
1158 };
1159
1160 let entities = manager.query_all(|_| true);
1161 let mut results: Vec<SimilarResult> = entities
1162 .iter()
1163 .filter_map(|e| {
1164 let score = match &e.data {
1165 EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1166 _ => e
1167 .embeddings()
1168 .iter()
1169 .map(|emb| cosine_similarity(vector, &emb.vector))
1170 .fold(0.0f32, f32::max),
1171 };
1172 let distance = (1.0 - score).max(0.0);
1173 if score > 0.0 {
1174 Some(SimilarResult {
1175 entity_id: e.id,
1176 score,
1177 distance,
1178 entity: e.clone(),
1179 })
1180 } else {
1181 None
1182 }
1183 })
1184 .collect();
1185
1186 results.sort_by(|a, b| {
1187 b.score
1188 .partial_cmp(&a.score)
1189 .unwrap_or(std::cmp::Ordering::Equal)
1190 });
1191 results.truncate(k);
1192 results
1193 }
1194
1195 fn get_or_build_hnsw_index(
1205 &self,
1206 collection: &str,
1207 query_dim: usize,
1208 ) -> Option<Arc<RwLock<HnswIndex>>> {
1209 let manager = self.store.get_collection(collection)?;
1210 let live_count = manager.count();
1211
1212 {
1214 let indexes = self
1215 .vector_indexes
1216 .read()
1217 .unwrap_or_else(|e| e.into_inner());
1218 if let Some(cached) = indexes.get(collection) {
1219 if cached.entity_count == live_count {
1220 return Some(Arc::clone(&cached.index));
1221 }
1222 }
1223 }
1224
1225 let entities = manager.query_all(|_| true);
1227
1228 let vectors: Vec<(u64, Vec<f32>)> = entities
1229 .iter()
1230 .filter_map(|e| match &e.data {
1231 EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1232 Some((e.id.raw(), v.dense.clone()))
1233 }
1234 _ => None,
1235 })
1236 .collect();
1237
1238 const MIN_VECTORS_FOR_HNSW: usize = 100;
1240 if vectors.len() < MIN_VECTORS_FOR_HNSW {
1241 return None;
1242 }
1243
1244 let config = crate::storage::engine::HnswConfig::with_m(16)
1246 .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1247 .with_ef_construction(100)
1248 .with_ef_search(50);
1249 let mut hnsw = HnswIndex::new(query_dim, config);
1250
1251 for (id, vec) in &vectors {
1252 hnsw.insert_with_id(*id, vec.clone());
1253 }
1254
1255 let index = Arc::new(RwLock::new(hnsw));
1256
1257 let mut indexes = self
1259 .vector_indexes
1260 .write()
1261 .unwrap_or_else(|e| e.into_inner());
1262 if let Some(cached) = indexes.get(collection) {
1264 if cached.entity_count == live_count {
1265 return Some(Arc::clone(&cached.index));
1266 }
1267 }
1268 indexes.insert(
1269 collection.to_string(),
1270 CachedVectorIndex {
1271 index: Arc::clone(&index),
1272 entity_count: live_count,
1273 },
1274 );
1275 Some(index)
1276 }
1277
1278 fn hnsw_results_to_similar(
1280 &self,
1281 collection: &str,
1282 results: &[crate::storage::engine::DistanceResult],
1283 ) -> Vec<SimilarResult> {
1284 results
1285 .iter()
1286 .filter_map(|dr| {
1287 let entity_id = EntityId::new(dr.id);
1288 let entity = self.store.get(collection, entity_id)?;
1289 let score = (1.0 - dr.distance).max(0.0);
1291 if score > 0.0 {
1292 Some(SimilarResult {
1293 entity_id,
1294 score,
1295 distance: dr.distance,
1296 entity,
1297 })
1298 } else {
1299 None
1300 }
1301 })
1302 .collect()
1303 }
1304
1305 pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1310 let mut indexes = self
1311 .vector_indexes
1312 .write()
1313 .unwrap_or_else(|e| e.into_inner());
1314 indexes.remove(collection);
1315 }
1316
1317 pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1319 self.store.get_any(id).map(|(_, e)| e)
1320 }
1321
1322 pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1324 self.store.get_any(id)
1325 }
1326
1327 pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1333 ids.iter().map(|id| self.get(*id)).collect()
1334 }
1335
1336 pub fn batch(&self) -> BatchBuilder {
1338 BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1339 }
1340
1341 pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1347 let mut preprocessors = self
1348 .preprocessors
1349 .write()
1350 .unwrap_or_else(|poisoned| poisoned.into_inner());
1351 preprocessors.push(Arc::from(preprocessor));
1352 }
1353
1354 pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1360 self.store
1361 .get_refs_from(id)
1362 .into_iter()
1363 .filter_map(|(target_id, ref_type, collection)| {
1364 self.store
1365 .get(&collection, target_id)
1366 .map(|entity| LinkedEntity {
1367 entity,
1368 ref_type,
1369 collection,
1370 })
1371 })
1372 .collect()
1373 }
1374
1375 pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1377 self.store
1378 .get_refs_to(id)
1379 .into_iter()
1380 .filter_map(|(source_id, ref_type, collection)| {
1381 self.store
1382 .get(&collection, source_id)
1383 .map(|entity| LinkedEntity {
1384 entity,
1385 ref_type,
1386 collection,
1387 })
1388 })
1389 .collect()
1390 }
1391
1392 pub fn store(&self) -> Arc<UnifiedStore> {
1394 self.store.clone()
1395 }
1396
1397 pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1403 self.ml_runtime.get_or_init(|| {
1404 crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1405 |_handle| Ok(String::new()),
1409 ))
1410 })
1411 }
1412
1413 pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1417 self.semantic_cache.get_or_init(|| {
1418 Arc::new(crate::storage::ml::SemanticCache::new(
1419 crate::storage::ml::SemanticCacheConfig::default(),
1420 ))
1421 })
1422 }
1423
1424 pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1427 self.hypertables
1428 .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1429 }
1430
1431 pub fn continuous_aggregates(
1434 &self,
1435 ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1436 self.continuous_aggregates.get_or_init(|| {
1437 Arc::new(
1438 crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1439 )
1440 })
1441 }
1442
1443 pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1444 let mut file = File::open(path)?;
1445 let mut magic = [0u8; 4];
1446 let read = file.read(&mut magic)?;
1447 Ok(read == 4 && &magic == b"RDST")
1448 }
1449}