1use super::*;
2
3impl RedDB {
4 pub fn quorum_coordinator(
11 &self,
12 ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13 self.quorum.as_ref()
14 }
15
16 pub fn wait_for_replication_quorum(
27 &self,
28 target_lsn: u64,
29 ) -> Result<(), crate::replication::QuorumError> {
30 match &self.quorum {
31 Some(q) => q.wait_for_quorum(target_lsn),
32 None => Ok(()),
33 }
34 }
35}
36
37impl RedDB {
38 pub(crate) fn native_registry_summary_from_metadata(
39 &self,
40 metadata: &PhysicalMetadataFile,
41 ) -> NativeRegistrySummary {
42 const SAMPLE_LIMIT: usize = 32;
43
44 let collection_names: Vec<_> = metadata
45 .catalog
46 .stats_by_collection
47 .keys()
48 .take(SAMPLE_LIMIT)
49 .cloned()
50 .collect();
51 let indexes: Vec<_> = metadata
52 .indexes
53 .iter()
54 .take(SAMPLE_LIMIT)
55 .map(|index| NativeRegistryIndexSummary {
56 name: index.name.clone(),
57 kind: index.kind.as_str().to_string(),
58 collection: index.collection.clone(),
59 enabled: index.enabled,
60 entries: index.entries as u64,
61 estimated_memory_bytes: index.estimated_memory_bytes,
62 last_refresh_ms: index.last_refresh_ms,
63 backend: index.backend.clone(),
64 })
65 .collect();
66 let graph_projections: Vec<_> = metadata
67 .graph_projections
68 .iter()
69 .take(SAMPLE_LIMIT)
70 .map(|projection| NativeRegistryProjectionSummary {
71 name: projection.name.clone(),
72 source: projection.source.clone(),
73 created_at_unix_ms: projection.created_at_unix_ms,
74 updated_at_unix_ms: projection.updated_at_unix_ms,
75 node_labels: projection.node_labels.clone(),
76 node_types: projection.node_types.clone(),
77 edge_labels: projection.edge_labels.clone(),
78 last_materialized_sequence: projection.last_materialized_sequence,
79 })
80 .collect();
81 let analytics_jobs = metadata
82 .analytics_jobs
83 .iter()
84 .take(SAMPLE_LIMIT)
85 .map(|job| NativeRegistryJobSummary {
86 id: job.id.clone(),
87 kind: job.kind.clone(),
88 projection: job.projection.clone(),
89 state: job.state.clone(),
90 created_at_unix_ms: job.created_at_unix_ms,
91 updated_at_unix_ms: job.updated_at_unix_ms,
92 last_run_sequence: job.last_run_sequence,
93 metadata: job.metadata.clone(),
94 })
95 .collect::<Vec<_>>();
96 let vector_artifacts = self
97 .native_vector_artifact_records()
98 .into_iter()
99 .map(|(summary, _)| summary)
100 .take(SAMPLE_LIMIT)
101 .collect::<Vec<_>>();
102 let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104 NativeRegistrySummary {
105 collection_count: metadata.catalog.total_collections as u32,
106 index_count: metadata.indexes.len() as u32,
107 graph_projection_count: metadata.graph_projections.len() as u32,
108 analytics_job_count: metadata.analytics_jobs.len() as u32,
109 vector_artifact_count,
110 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111 indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112 graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113 analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114 vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115 omitted_collection_count: metadata
116 .catalog
117 .stats_by_collection
118 .len()
119 .saturating_sub(collection_names.len())
120 as u32,
121 omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122 omitted_graph_projection_count: metadata
123 .graph_projections
124 .len()
125 .saturating_sub(graph_projections.len())
126 as u32,
127 omitted_analytics_job_count: metadata
128 .analytics_jobs
129 .len()
130 .saturating_sub(analytics_jobs.len())
131 as u32,
132 omitted_vector_artifact_count: vector_artifact_count
133 .saturating_sub(vector_artifacts.len() as u32),
134 collection_names,
135 indexes,
136 graph_projections,
137 analytics_jobs,
138 vector_artifacts,
139 }
140 }
141
142 fn native_vector_artifact_collection_count(&self) -> usize {
143 self.native_vector_artifact_records().len()
144 }
145
146 pub(crate) fn native_vector_artifact_records(
147 &self,
148 ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149 let mut artifacts = Vec::new();
150 for collection in self.store.list_collections() {
151 let Some(manager) = self.store.get_collection(&collection) else {
152 continue;
153 };
154 let entities = manager.query_all(|_| true);
155 let mut vectors = Vec::new();
156 let mut graph_edges = Vec::new();
157 let mut fulltext_documents = Vec::new();
158 let mut document_records = Vec::new();
159 for entity in entities {
160 match entity.data {
161 EntityData::Vector(vector) => {
162 if !vector.dense.is_empty() {
163 vectors.push((entity.id, vector.dense));
164 }
165 }
166 EntityData::Edge(edge) => {
167 if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168 graph_edges.push((
169 entity.id,
170 edge_kind.from_node,
171 edge_kind.to_node,
172 edge_kind.label,
173 edge.weight,
174 ));
175 }
176 }
177 data => {
178 let text = Self::native_fulltext_text_for_entity(&data);
179 if !text.trim().is_empty() {
180 fulltext_documents.push((entity.id, text));
181 }
182 if let Some(document) =
183 Self::native_document_pathvalue_for_entity(entity.id, &data)
184 {
185 document_records.push(document);
186 }
187 }
188 }
189 }
190 if !vectors.is_empty() {
191 let dimension = vectors[0].1.len();
192 let mut hnsw = HnswIndex::with_dimension(dimension);
193 for (id, vector) in vectors
194 .into_iter()
195 .filter(|(_, vector)| vector.len() == dimension)
196 {
197 hnsw.insert_with_id(id.raw(), vector);
198 }
199 let stats = hnsw.stats();
200 let bytes = hnsw.to_bytes();
201 let summary = NativeVectorArtifactSummary {
202 collection: collection.clone(),
203 artifact_kind: "hnsw".to_string(),
204 vector_count: stats.node_count as u64,
205 dimension: stats.dimension as u32,
206 max_layer: stats.max_layer as u32,
207 serialized_bytes: bytes.len() as u64,
208 checksum: crate::storage::engine::crc32(&bytes) as u64,
209 };
210 artifacts.push((summary, bytes));
211
212 let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213 let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214 let training = manager
215 .query_all(|_| true)
216 .into_iter()
217 .filter_map(|entity| match entity.data {
218 EntityData::Vector(vector) if vector.dense.len() == dimension => {
219 Some(vector.dense)
220 }
221 _ => None,
222 })
223 .collect::<Vec<_>>();
224 ivf.train(&training);
225 let items = manager
226 .query_all(|_| true)
227 .into_iter()
228 .filter_map(|entity| match entity.data {
229 EntityData::Vector(vector) if vector.dense.len() == dimension => {
230 Some((entity.id.raw(), vector.dense))
231 }
232 _ => None,
233 })
234 .collect::<Vec<_>>();
235 ivf.add_batch_with_ids(items);
236 let ivf_stats = ivf.stats();
237 let ivf_bytes = ivf.to_bytes();
238 let ivf_summary = NativeVectorArtifactSummary {
239 collection: collection.clone(),
240 artifact_kind: "ivf".to_string(),
241 vector_count: ivf_stats.total_vectors as u64,
242 dimension: ivf_stats.dimension as u32,
243 max_layer: ivf_stats.n_lists as u32,
244 serialized_bytes: ivf_bytes.len() as u64,
245 checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246 };
247 artifacts.push((ivf_summary, ivf_bytes));
248 }
249
250 if !graph_edges.is_empty() {
251 let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252 let (edge_count, node_count, label_count) =
253 Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254 let summary = NativeVectorArtifactSummary {
255 collection: collection.clone(),
256 artifact_kind: "graph.adjacency".to_string(),
257 vector_count: edge_count,
258 dimension: node_count as u32,
259 max_layer: label_count,
260 serialized_bytes: bytes.len() as u64,
261 checksum: crate::storage::engine::crc32(&bytes) as u64,
262 };
263 artifacts.push((summary, bytes));
264 }
265
266 if !fulltext_documents.is_empty() {
267 let bytes =
268 Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269 let (doc_count, term_count, posting_count) =
270 Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271 let summary = NativeVectorArtifactSummary {
272 collection: collection.clone(),
273 artifact_kind: "text.fulltext".to_string(),
274 vector_count: posting_count,
275 dimension: doc_count as u32,
276 max_layer: term_count as u32,
277 serialized_bytes: bytes.len() as u64,
278 checksum: crate::storage::engine::crc32(&bytes) as u64,
279 };
280 artifacts.push((summary, bytes));
281 }
282
283 if !document_records.is_empty() {
284 let bytes = Self::serialize_native_document_pathvalue_artifact(
285 &collection,
286 &document_records,
287 );
288 let (doc_count, path_count, value_count, unique_value_count) =
289 Self::inspect_native_document_pathvalue_artifact(&bytes)
290 .unwrap_or((0, 0, 0, 0));
291 let _ = unique_value_count;
292 let summary = NativeVectorArtifactSummary {
293 collection: collection.clone(),
294 artifact_kind: "document.pathvalue".to_string(),
295 vector_count: value_count,
296 dimension: doc_count as u32,
297 max_layer: path_count as u32,
298 serialized_bytes: bytes.len() as u64,
299 checksum: crate::storage::engine::crc32(&bytes) as u64,
300 };
301 artifacts.push((summary, bytes));
302 }
303 }
304 artifacts
305 }
306
307 fn serialize_native_graph_adjacency_artifact(
308 edges: &[(EntityId, String, String, String, f32)],
309 ) -> Vec<u8> {
310 let edges: Vec<reddb_file::NativeGraphEdge> = edges
311 .iter()
312 .map(
313 |(edge_id, from_node, to_node, label, weight)| reddb_file::NativeGraphEdge {
314 edge_id: edge_id.raw(),
315 from_node: from_node.clone(),
316 to_node: to_node.clone(),
317 label: label.clone(),
318 weight: *weight,
319 },
320 )
321 .collect();
322 reddb_file::encode_native_graph_adjacency_frame(&reddb_file::NativeGraphAdjacencyFrame {
323 edges,
324 })
325 }
326
327 pub(crate) fn inspect_native_graph_adjacency_artifact(
328 bytes: &[u8],
329 ) -> Result<(u64, u64, u32), String> {
330 let frame =
331 reddb_file::decode_native_graph_adjacency_frame(bytes).map_err(|e| e.to_string())?;
332 let edge_count = frame.edges.len() as u64;
333 let mut nodes = BTreeSet::new();
334 let mut labels = BTreeSet::new();
335 for edge in frame.edges {
336 nodes.insert(edge.from_node);
337 nodes.insert(edge.to_node);
338 labels.insert(edge.label);
339 }
340 Ok((edge_count, nodes.len() as u64, labels.len() as u32))
341 }
342
343 fn serialize_native_fulltext_artifact(
344 collection: &str,
345 documents: &[(EntityId, String)],
346 ) -> Vec<u8> {
347 let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
348 for (entity_id, text) in documents {
349 let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
350 for token in Self::native_fulltext_tokenize(text) {
351 *frequencies.entry(token).or_insert(0) += 1;
352 }
353 for (token, count) in frequencies {
354 postings
355 .entry(token)
356 .or_default()
357 .push((entity_id.raw(), count));
358 }
359 }
360
361 let terms = postings
362 .into_iter()
363 .map(|(term, postings)| reddb_file::NativeFulltextTerm {
364 term,
365 postings: postings
366 .into_iter()
367 .map(
368 |(entity_id, term_count)| reddb_file::NativeFulltextPosting {
369 entity_id,
370 term_count,
371 },
372 )
373 .collect(),
374 })
375 .collect();
376 reddb_file::encode_native_fulltext_frame(&reddb_file::NativeFulltextFrame {
377 collection: collection.to_string(),
378 doc_count: documents.len() as u32,
379 terms,
380 })
381 }
382
383 pub(crate) fn inspect_native_fulltext_artifact(
384 bytes: &[u8],
385 ) -> Result<(u64, u64, u64), String> {
386 let frame = reddb_file::decode_native_fulltext_frame(bytes).map_err(|e| e.to_string())?;
387 let posting_count: u64 = frame
388 .terms
389 .iter()
390 .map(|term| term.postings.len() as u64)
391 .sum();
392 Ok((
393 frame.doc_count as u64,
394 frame.terms.len() as u64,
395 posting_count,
396 ))
397 }
398
399 fn serialize_native_document_pathvalue_artifact(
400 collection: &str,
401 documents: &[(EntityId, Vec<(String, String)>)],
402 ) -> Vec<u8> {
403 let docs: Vec<reddb_file::NativeDocPathValue> = documents
404 .iter()
405 .map(|(entity_id, entries)| reddb_file::NativeDocPathValue {
406 entity_id: entity_id.raw(),
407 entries: entries
408 .iter()
409 .map(|(path, value)| reddb_file::NativeDocPathValueEntry {
410 path: path.clone(),
411 value: value.clone(),
412 })
413 .collect(),
414 })
415 .collect();
416 reddb_file::encode_native_doc_pathvalue_frame(&reddb_file::NativeDocPathValueFrame {
417 collection: collection.to_string(),
418 docs,
419 })
420 }
421
422 pub(crate) fn inspect_native_document_pathvalue_artifact(
423 bytes: &[u8],
424 ) -> Result<(u64, u64, u64, u64), String> {
425 let frame =
426 reddb_file::decode_native_doc_pathvalue_frame(bytes).map_err(|e| e.to_string())?;
427 let doc_count = frame.docs.len() as u64;
428 let mut paths = BTreeSet::new();
429 let mut values = BTreeSet::new();
430 let mut total_entries = 0u64;
431 for doc in frame.docs {
432 for entry in doc.entries {
433 paths.insert(entry.path);
434 values.insert(entry.value);
435 total_entries += 1;
436 }
437 }
438 Ok((
439 doc_count,
440 paths.len() as u64,
441 total_entries,
442 values.len() as u64,
443 ))
444 }
445
446 fn native_document_pathvalue_for_entity(
447 entity_id: EntityId,
448 data: &EntityData,
449 ) -> Option<(EntityId, Vec<(String, String)>)> {
450 let mut entries = Vec::new();
451 match data {
452 EntityData::Row(row) => {
453 if let Some(named) = &row.named {
454 for (key, value) in named {
455 Self::collect_native_document_entries_from_value(key, value, &mut entries);
456 }
457 }
458 for (idx, value) in row.columns.iter().enumerate() {
459 let path = format!("columns[{idx}]");
460 Self::collect_native_document_entries_from_value(&path, value, &mut entries);
461 }
462 }
463 EntityData::Node(node) => {
464 for (key, value) in &node.properties {
465 Self::collect_native_document_entries_from_value(key, value, &mut entries);
466 }
467 }
468 EntityData::Edge(edge) => {
469 for (key, value) in &edge.properties {
470 Self::collect_native_document_entries_from_value(key, value, &mut entries);
471 }
472 }
473 EntityData::Vector(_) => {}
474 EntityData::TimeSeries(_) => {}
475 EntityData::QueueMessage(_) => {}
476 }
477 if entries.is_empty() {
478 None
479 } else {
480 Some((entity_id, entries))
481 }
482 }
483
484 fn collect_native_document_entries_from_value(
485 path: &str,
486 value: &Value,
487 out: &mut Vec<(String, String)>,
488 ) {
489 match value {
490 Value::Json(bytes) | Value::Blob(bytes) => {
491 if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
492 Self::collect_native_document_entries_from_json(path, &json, out);
493 }
494 }
495 _ => {}
496 }
497 }
498
499 fn collect_native_document_entries_from_json(
500 path: &str,
501 value: &JsonValue,
502 out: &mut Vec<(String, String)>,
503 ) {
504 match value {
505 JsonValue::Object(entries) => {
506 for (key, value) in entries {
507 let next = if path.is_empty() {
508 key.clone()
509 } else {
510 format!("{path}.{key}")
511 };
512 Self::collect_native_document_entries_from_json(&next, value, out);
513 }
514 }
515 JsonValue::Array(items) => {
516 for (idx, value) in items.iter().enumerate() {
517 let next = format!("{path}[{idx}]");
518 Self::collect_native_document_entries_from_json(&next, value, out);
519 }
520 }
521 _ => {
522 if let Some(text) = Self::native_json_scalar_text(value) {
523 out.push((path.to_string(), text));
524 }
525 }
526 }
527 }
528
529 fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
530 match value {
531 JsonValue::Null => None,
532 JsonValue::Bool(value) => Some(value.to_string()),
533 JsonValue::Number(value) => Some(value.to_string()),
534 JsonValue::String(value) => Some(value.clone()),
535 JsonValue::Array(_) | JsonValue::Object(_) => None,
536 }
537 }
538
539 fn native_fulltext_text_for_entity(data: &EntityData) -> String {
540 match data {
541 EntityData::Row(row) => {
542 let mut parts = Vec::new();
543 if let Some(named) = &row.named {
544 for value in named.values() {
545 if let Some(text) = Self::native_value_text(value) {
546 parts.push(text);
547 }
548 }
549 }
550 for value in &row.columns {
551 if let Some(text) = Self::native_value_text(value) {
552 parts.push(text);
553 }
554 }
555 parts.join(" ")
556 }
557 EntityData::Node(node) => node
558 .properties
559 .values()
560 .filter_map(Self::native_value_text)
561 .collect::<Vec<_>>()
562 .join(" "),
563 EntityData::Edge(edge) => edge
564 .properties
565 .values()
566 .filter_map(Self::native_value_text)
567 .collect::<Vec<_>>()
568 .join(" "),
569 EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
570 EntityData::TimeSeries(ts) => ts.metric.clone(),
571 EntityData::QueueMessage(_) => String::new(),
572 }
573 }
574
575 fn native_value_text(value: &Value) -> Option<String> {
576 match value {
577 Value::Text(value) => Some(value.to_string()),
578 Value::Json(value) => String::from_utf8(value.clone()).ok(),
579 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
580 Value::Integer(value) => Some(value.to_string()),
581 Value::UnsignedInteger(value) => Some(value.to_string()),
582 Value::Float(value) => Some(value.to_string()),
583 Value::Boolean(value) => Some(value.to_string()),
584 Value::IpAddr(value) => Some(value.to_string()),
585 Value::NodeRef(value) => Some(value.clone()),
586 Value::EdgeRef(value) => Some(value.clone()),
587 Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
588 Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
589 Value::Timestamp(value) => Some(value.to_string()),
590 Value::Duration(value) => Some(value.to_string()),
591 Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
592 Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
593 Value::Email(s) => Some(s.clone()),
594 Value::Url(s) => Some(s.clone()),
595 Value::Phone(n) => Some(format!("+{}", n)),
596 Value::Semver(packed) => Some(format!(
597 "{}.{}.{}",
598 packed / 1_000_000,
599 (packed / 1_000) % 1_000,
600 packed % 1_000
601 )),
602 Value::Cidr(ip, prefix) => Some(format!(
603 "{}.{}.{}.{}/{}",
604 (ip >> 24) & 0xFF,
605 (ip >> 16) & 0xFF,
606 (ip >> 8) & 0xFF,
607 ip & 0xFF,
608 prefix
609 )),
610 Value::Date(days) => Some(days.to_string()),
611 Value::Time(ms) => {
612 let total_secs = ms / 1000;
613 Some(format!(
614 "{:02}:{:02}:{:02}",
615 total_secs / 3600,
616 (total_secs / 60) % 60,
617 total_secs % 60
618 ))
619 }
620 Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
621 Value::EnumValue(i) => Some(format!("enum({})", i)),
622 Value::Array(_) => None,
623 Value::TimestampMs(ms) => Some(ms.to_string()),
624 Value::Ipv4(ip) => Some(format!(
625 "{}.{}.{}.{}",
626 (ip >> 24) & 0xFF,
627 (ip >> 16) & 0xFF,
628 (ip >> 8) & 0xFF,
629 ip & 0xFF
630 )),
631 Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
632 Value::Subnet(ip, mask) => {
633 let prefix = mask.leading_ones();
634 Some(format!(
635 "{}.{}.{}.{}/{}",
636 (ip >> 24) & 0xFF,
637 (ip >> 16) & 0xFF,
638 (ip >> 8) & 0xFF,
639 ip & 0xFF,
640 prefix
641 ))
642 }
643 Value::Port(p) => Some(p.to_string()),
644 Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
645 Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
646 Value::GeoPoint(lat, lon) => Some(format!(
647 "{:.6},{:.6}",
648 *lat as f64 / 1_000_000.0,
649 *lon as f64 / 1_000_000.0
650 )),
651 Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
652 Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
653 Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
654 Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
655 Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
656 Value::AssetCode(code) => Some(code.clone()),
657 Value::Money { .. } => Some(value.display_string()),
658 Value::ColorAlpha([r, g, b, a]) => {
659 Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
660 }
661 Value::BigInt(v) => Some(v.to_string()),
662 Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
663 Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
664 Value::TableRef(name) => Some(name.clone()),
665 Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
666 Value::Secret(_) | Value::Password(_) => None,
667 }
668 }
669
670 fn native_fulltext_tokenize(text: &str) -> Vec<String> {
671 text.to_lowercase()
672 .split(|c: char| !c.is_alphanumeric())
673 .filter(|s| s.len() >= 2)
674 .map(|s| s.to_string())
675 .collect()
676 }
677
678 pub(crate) fn native_recovery_summary_from_metadata(
679 metadata: &PhysicalMetadataFile,
680 ) -> NativeRecoverySummary {
681 const SAMPLE_LIMIT: usize = 16;
682
683 let snapshots: Vec<_> = metadata
684 .snapshots
685 .iter()
686 .rev()
687 .take(SAMPLE_LIMIT)
688 .map(|snapshot| NativeSnapshotSummary {
689 snapshot_id: snapshot.snapshot_id,
690 created_at_unix_ms: snapshot.created_at_unix_ms,
691 superblock_sequence: snapshot.superblock_sequence,
692 collection_count: snapshot.collection_count as u32,
693 total_entities: snapshot.total_entities as u64,
694 })
695 .collect();
696 let exports: Vec<_> = metadata
697 .exports
698 .iter()
699 .rev()
700 .take(SAMPLE_LIMIT)
701 .map(|export| NativeExportSummary {
702 name: export.name.clone(),
703 created_at_unix_ms: export.created_at_unix_ms,
704 snapshot_id: export.snapshot_id,
705 superblock_sequence: export.superblock_sequence,
706 collection_count: export.collection_count as u32,
707 total_entities: export.total_entities as u64,
708 })
709 .collect();
710
711 NativeRecoverySummary {
712 snapshot_count: metadata.snapshots.len() as u32,
713 export_count: metadata.exports.len() as u32,
714 snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
715 exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
716 omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
717 omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
718 snapshots,
719 exports,
720 }
721 }
722
723 pub(crate) fn native_catalog_summary_from_metadata(
724 metadata: &PhysicalMetadataFile,
725 ) -> NativeCatalogSummary {
726 const SAMPLE_LIMIT: usize = 32;
727
728 let collections: Vec<_> = metadata
729 .catalog
730 .stats_by_collection
731 .iter()
732 .take(SAMPLE_LIMIT)
733 .map(|(name, stats)| NativeCatalogCollectionSummary {
734 name: name.clone(),
735 entities: stats.entities as u64,
736 cross_refs: stats.cross_refs as u64,
737 segments: stats.segments as u32,
738 })
739 .collect();
740
741 NativeCatalogSummary {
742 collection_count: metadata.catalog.total_collections as u32,
743 total_entities: metadata.catalog.total_entities as u64,
744 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
745 omitted_collection_count: metadata
746 .catalog
747 .stats_by_collection
748 .len()
749 .saturating_sub(collections.len()) as u32,
750 collections,
751 }
752 }
753
754 pub(crate) fn native_metadata_state_summary_from_metadata(
755 metadata: &PhysicalMetadataFile,
756 ) -> NativeMetadataStateSummary {
757 NativeMetadataStateSummary {
758 protocol_version: metadata.protocol_version.clone(),
759 generated_at_unix_ms: metadata.generated_at_unix_ms,
760 last_loaded_from: metadata.last_loaded_from.clone(),
761 last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
762 }
763 }
764
765 pub(crate) fn inspect_native_header_against_metadata(
766 native: PhysicalFileHeader,
767 metadata: &PhysicalMetadataFile,
768 ) -> NativeHeaderInspection {
769 let expected = Self::native_header_from_metadata(metadata);
770 let mut mismatches = Vec::new();
771
772 if native.format_version != expected.format_version {
773 mismatches.push(NativeHeaderMismatch {
774 field: "format_version",
775 native: native.format_version.to_string(),
776 expected: expected.format_version.to_string(),
777 });
778 }
779 if native.sequence != expected.sequence {
780 mismatches.push(NativeHeaderMismatch {
781 field: "sequence",
782 native: native.sequence.to_string(),
783 expected: expected.sequence.to_string(),
784 });
785 }
786 if native.manifest_oldest_root != expected.manifest_oldest_root {
787 mismatches.push(NativeHeaderMismatch {
788 field: "manifest_oldest_root",
789 native: native.manifest_oldest_root.to_string(),
790 expected: expected.manifest_oldest_root.to_string(),
791 });
792 }
793 if native.manifest_root != expected.manifest_root {
794 mismatches.push(NativeHeaderMismatch {
795 field: "manifest_root",
796 native: native.manifest_root.to_string(),
797 expected: expected.manifest_root.to_string(),
798 });
799 }
800 if native.free_set_root != expected.free_set_root {
801 mismatches.push(NativeHeaderMismatch {
802 field: "free_set_root",
803 native: native.free_set_root.to_string(),
804 expected: expected.free_set_root.to_string(),
805 });
806 }
807 if native.collection_root_count != expected.collection_root_count {
808 mismatches.push(NativeHeaderMismatch {
809 field: "collection_root_count",
810 native: native.collection_root_count.to_string(),
811 expected: expected.collection_root_count.to_string(),
812 });
813 }
814 if native.snapshot_count != expected.snapshot_count {
815 mismatches.push(NativeHeaderMismatch {
816 field: "snapshot_count",
817 native: native.snapshot_count.to_string(),
818 expected: expected.snapshot_count.to_string(),
819 });
820 }
821 if native.index_count != expected.index_count {
822 mismatches.push(NativeHeaderMismatch {
823 field: "index_count",
824 native: native.index_count.to_string(),
825 expected: expected.index_count.to_string(),
826 });
827 }
828 if native.catalog_collection_count != expected.catalog_collection_count {
829 mismatches.push(NativeHeaderMismatch {
830 field: "catalog_collection_count",
831 native: native.catalog_collection_count.to_string(),
832 expected: expected.catalog_collection_count.to_string(),
833 });
834 }
835 if native.catalog_total_entities != expected.catalog_total_entities {
836 mismatches.push(NativeHeaderMismatch {
837 field: "catalog_total_entities",
838 native: native.catalog_total_entities.to_string(),
839 expected: expected.catalog_total_entities.to_string(),
840 });
841 }
842 if native.export_count != expected.export_count {
843 mismatches.push(NativeHeaderMismatch {
844 field: "export_count",
845 native: native.export_count.to_string(),
846 expected: expected.export_count.to_string(),
847 });
848 }
849 if native.graph_projection_count != expected.graph_projection_count {
850 mismatches.push(NativeHeaderMismatch {
851 field: "graph_projection_count",
852 native: native.graph_projection_count.to_string(),
853 expected: expected.graph_projection_count.to_string(),
854 });
855 }
856 if native.analytics_job_count != expected.analytics_job_count {
857 mismatches.push(NativeHeaderMismatch {
858 field: "analytics_job_count",
859 native: native.analytics_job_count.to_string(),
860 expected: expected.analytics_job_count.to_string(),
861 });
862 }
863 if native.manifest_event_count != expected.manifest_event_count {
864 mismatches.push(NativeHeaderMismatch {
865 field: "manifest_event_count",
866 native: native.manifest_event_count.to_string(),
867 expected: expected.manifest_event_count.to_string(),
868 });
869 }
870
871 NativeHeaderInspection {
872 native,
873 expected,
874 consistent: mismatches.is_empty(),
875 mismatches,
876 }
877 }
878
879 pub(crate) fn repair_policy_for_inspection(
880 inspection: &NativeHeaderInspection,
881 ) -> NativeHeaderRepairPolicy {
882 if inspection.consistent {
883 return NativeHeaderRepairPolicy::InSync;
884 }
885
886 if inspection.expected.sequence >= inspection.native.sequence {
887 NativeHeaderRepairPolicy::RepairNativeFromMetadata
888 } else {
889 NativeHeaderRepairPolicy::NativeAheadOfMetadata
890 }
891 }
892
893 pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
894 let retention = self.options.export_retention.max(1);
895 if exports.len() <= retention {
896 return;
897 }
898
899 exports.sort_by_key(|export| export.created_at_unix_ms);
900 let removed: Vec<ExportDescriptor> =
901 exports.drain(0..(exports.len() - retention)).collect();
902
903 for export in removed {
904 let _ = fs::remove_file(&export.data_path);
905 let _ = fs::remove_file(&export.metadata_path);
906 let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
907 &export.data_path,
908 ));
909 let _ = fs::remove_file(binary_path);
910 }
911 }
912
913 pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
914 let mut catalog = IndexCatalog::register_default_vector_graph(
915 self.options.has_capability(Capability::Table),
916 self.options.has_capability(Capability::Graph),
917 );
918 if self.options.has_capability(Capability::FullText) {
919 catalog.register(RuntimeIndexConfig::new(
920 "text-fulltext",
921 IndexKind::FullText,
922 ));
923 catalog.register(RuntimeIndexConfig::new(
924 "document-pathvalue",
925 IndexKind::DocumentPathValue,
926 ));
927 }
928 catalog.register(RuntimeIndexConfig::new(
929 "search-hybrid",
930 IndexKind::HybridSearch,
931 ));
932 catalog
933 }
934
935 pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
936 let catalog = self.runtime_index_catalog();
940 let snapshot = crate::catalog::snapshot_store_with_declarations(
941 "reddb",
942 self.store.as_ref(),
943 Some(&catalog),
944 None, None, );
947 let mut metrics_by_name = std::collections::BTreeMap::new();
948 for metric in &snapshot.indices {
949 metrics_by_name.insert(metric.name.clone(), metric.clone());
950 }
951
952 let mut states = Vec::new();
953 for collection in snapshot.collections {
954 for index_name in &collection.indices {
955 let metric = metrics_by_name.get(index_name);
956 let kind = metric
957 .map(|metric| metric.kind)
958 .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
959 let entries = estimate_index_entries(&collection, kind);
960 states.push(PhysicalIndexState {
961 name: format!("{}::{}", collection.name, index_name),
962 kind,
963 collection: Some(collection.name.clone()),
964 enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
965 entries,
966 estimated_memory_bytes: estimate_index_memory(entries, kind),
967 last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
968 backend: index_backend_name(kind).to_string(),
969 artifact_kind: None,
970 artifact_root_page: None,
971 artifact_checksum: None,
972 build_state: "catalog-derived".to_string(),
973 });
974 }
975 }
976
977 states
978 }
979
980 pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
981 let mut roots = BTreeMap::new();
982
983 for name in self.store.list_collections() {
984 let Some(manager) = self.store.get_collection(&name) else {
985 continue;
986 };
987
988 let stats = manager.stats();
989 let mut root = fnv1a_seed();
990 fnv1a_hash_value(&mut root, &name);
991 fnv1a_hash_value(&mut root, &stats.total_entities);
992 fnv1a_hash_value(&mut root, &stats.growing_count);
993 fnv1a_hash_value(&mut root, &stats.sealed_count);
994 fnv1a_hash_value(&mut root, &stats.archived_count);
995 fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
996 fnv1a_hash_value(&mut root, &stats.seal_ops);
997 fnv1a_hash_value(&mut root, &stats.compact_ops);
998
999 let mut entities = manager.query_all(|_| true);
1000 entities.sort_by_key(|entity| entity.id.raw());
1001
1002 for entity in entities {
1003 fnv1a_hash_value(&mut root, &entity.id.raw());
1004 fnv1a_hash_value(&mut root, &entity.kind);
1005 fnv1a_hash_value(&mut root, &entity.created_at);
1006 fnv1a_hash_value(&mut root, &entity.updated_at);
1007 fnv1a_hash_value(&mut root, &entity.data);
1008 fnv1a_hash_value(&mut root, &entity.sequence_id);
1009 fnv1a_hash_value(&mut root, &entity.embeddings().len());
1010 fnv1a_hash_value(&mut root, &entity.cross_refs().len());
1011 }
1012
1013 roots.insert(name, root);
1014 }
1015
1016 roots
1017 }
1018
1019 pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
1025 TableRef::new(table, row_id)
1026 }
1027
1028 pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
1030 NodeRef::new(collection, node_id)
1031 }
1032
1033 pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
1035 VectorRef::new(collection, vector_id)
1036 }
1037
1038 pub fn query(&self) -> QueryBuilder {
1044 QueryBuilder::new(self.store.clone())
1045 }
1046
1047 pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1054 if self.store.get_collection(collection).is_none() {
1055 return Vec::new();
1056 }
1057
1058 if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1060 let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1061 let results = hnsw.search(vector, k);
1062 let mapped = self.hnsw_results_to_similar(collection, &results);
1063 if !mapped.is_empty() {
1064 return mapped;
1065 }
1066 }
1067
1068 self.similar_brute_force(collection, vector, k)
1070 }
1071
1072 fn similar_brute_force(
1074 &self,
1075 collection: &str,
1076 vector: &[f32],
1077 k: usize,
1078 ) -> Vec<SimilarResult> {
1079 let manager = match self.store.get_collection(collection) {
1080 Some(m) => m,
1081 None => return Vec::new(),
1082 };
1083
1084 let entities = manager.query_all(|_| true);
1085 let mut results: Vec<SimilarResult> = entities
1086 .iter()
1087 .filter_map(|e| {
1088 let score = match &e.data {
1089 EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1090 _ => e
1091 .embeddings()
1092 .iter()
1093 .map(|emb| cosine_similarity(vector, &emb.vector))
1094 .fold(0.0f32, f32::max),
1095 };
1096 let distance = (1.0 - score).max(0.0);
1097 if score > 0.0 {
1098 Some(SimilarResult {
1099 entity_id: e.id,
1100 score,
1101 distance,
1102 entity: e.clone(),
1103 })
1104 } else {
1105 None
1106 }
1107 })
1108 .collect();
1109
1110 results.sort_by(|a, b| {
1111 b.score
1112 .partial_cmp(&a.score)
1113 .unwrap_or(std::cmp::Ordering::Equal)
1114 });
1115 results.truncate(k);
1116 results
1117 }
1118
1119 fn get_or_build_hnsw_index(
1129 &self,
1130 collection: &str,
1131 query_dim: usize,
1132 ) -> Option<Arc<RwLock<HnswIndex>>> {
1133 let manager = self.store.get_collection(collection)?;
1134 let live_count = manager.count();
1135
1136 {
1138 let indexes = self
1139 .vector_indexes
1140 .read()
1141 .unwrap_or_else(|e| e.into_inner());
1142 if let Some(cached) = indexes.get(collection) {
1143 if cached.entity_count == live_count {
1144 return Some(Arc::clone(&cached.index));
1145 }
1146 }
1147 }
1148
1149 let entities = manager.query_all(|_| true);
1151
1152 let vectors: Vec<(u64, Vec<f32>)> = entities
1153 .iter()
1154 .filter_map(|e| match &e.data {
1155 EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1156 Some((e.id.raw(), v.dense.clone()))
1157 }
1158 _ => None,
1159 })
1160 .collect();
1161
1162 const MIN_VECTORS_FOR_HNSW: usize = 100;
1164 if vectors.len() < MIN_VECTORS_FOR_HNSW {
1165 return None;
1166 }
1167
1168 let config = crate::storage::engine::HnswConfig::with_m(16)
1170 .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1171 .with_ef_construction(100)
1172 .with_ef_search(50);
1173 let mut hnsw = HnswIndex::new(query_dim, config);
1174
1175 for (id, vec) in &vectors {
1176 hnsw.insert_with_id(*id, vec.clone());
1177 }
1178
1179 let index = Arc::new(RwLock::new(hnsw));
1180
1181 let mut indexes = self
1183 .vector_indexes
1184 .write()
1185 .unwrap_or_else(|e| e.into_inner());
1186 if let Some(cached) = indexes.get(collection) {
1188 if cached.entity_count == live_count {
1189 return Some(Arc::clone(&cached.index));
1190 }
1191 }
1192 indexes.insert(
1193 collection.to_string(),
1194 CachedVectorIndex {
1195 index: Arc::clone(&index),
1196 entity_count: live_count,
1197 },
1198 );
1199 Some(index)
1200 }
1201
1202 fn hnsw_results_to_similar(
1204 &self,
1205 collection: &str,
1206 results: &[crate::storage::engine::DistanceResult],
1207 ) -> Vec<SimilarResult> {
1208 results
1209 .iter()
1210 .filter_map(|dr| {
1211 let entity_id = EntityId::new(dr.id);
1212 let entity = self.store.get(collection, entity_id)?;
1213 let score = (1.0 - dr.distance).max(0.0);
1215 if score > 0.0 {
1216 Some(SimilarResult {
1217 entity_id,
1218 score,
1219 distance: dr.distance,
1220 entity,
1221 })
1222 } else {
1223 None
1224 }
1225 })
1226 .collect()
1227 }
1228
1229 pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1234 let mut indexes = self
1235 .vector_indexes
1236 .write()
1237 .unwrap_or_else(|e| e.into_inner());
1238 indexes.remove(collection);
1239 }
1240
1241 pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1243 self.store.get_any(id).map(|(_, e)| e)
1244 }
1245
1246 pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1248 self.store.get_any(id)
1249 }
1250
1251 pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1257 ids.iter().map(|id| self.get(*id)).collect()
1258 }
1259
1260 pub fn batch(&self) -> BatchBuilder {
1262 BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1263 }
1264
1265 pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1271 let mut preprocessors = self
1272 .preprocessors
1273 .write()
1274 .unwrap_or_else(|poisoned| poisoned.into_inner());
1275 preprocessors.push(Arc::from(preprocessor));
1276 }
1277
1278 pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1284 self.store
1285 .get_refs_from(id)
1286 .into_iter()
1287 .filter_map(|(target_id, ref_type, collection)| {
1288 self.store
1289 .get(&collection, target_id)
1290 .map(|entity| LinkedEntity {
1291 entity,
1292 ref_type,
1293 collection,
1294 })
1295 })
1296 .collect()
1297 }
1298
1299 pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1301 self.store
1302 .get_refs_to(id)
1303 .into_iter()
1304 .filter_map(|(source_id, ref_type, collection)| {
1305 self.store
1306 .get(&collection, source_id)
1307 .map(|entity| LinkedEntity {
1308 entity,
1309 ref_type,
1310 collection,
1311 })
1312 })
1313 .collect()
1314 }
1315
1316 pub fn store(&self) -> Arc<UnifiedStore> {
1318 self.store.clone()
1319 }
1320
1321 pub(crate) fn turbo_collections(
1327 &self,
1328 ) -> &Arc<
1329 parking_lot::Mutex<
1330 std::collections::HashMap<
1331 String,
1332 Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
1333 >,
1334 >,
1335 > {
1336 self.turbo_collections
1337 .get_or_init(|| Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())))
1338 }
1339
1340 pub(crate) fn turbo_state(
1350 &self,
1351 collection: &str,
1352 ) -> Option<Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>> {
1353 if !crate::runtime::vector_turbo_kind::is_turbo(&self.store, collection) {
1354 return None;
1355 }
1356 let map = self.turbo_collections();
1357 {
1358 let guard = map.lock();
1359 if let Some(state) = guard.get(collection) {
1360 return Some(Arc::clone(state));
1361 }
1362 }
1363 let contract = self.collection_contract(collection)?;
1364 let dim = contract.vector_dimension?;
1365 let metric = contract
1366 .vector_metric
1367 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine);
1368 let state = Arc::new(
1369 crate::runtime::vector_turbo_kind::TurboCollectionState::new(
1370 dim,
1371 metric,
1372 self.store.pager(),
1373 ),
1374 );
1375 if let Some((_, paths)) = self.options.resolve_tiered_layout() {
1380 state.set_snapshot_path(paths.turbo_snapshot_path(collection));
1381 }
1382 let mut guard = map.lock();
1383 let inserted_now = !guard.contains_key(collection);
1384 let entry = guard
1385 .entry(collection.to_string())
1386 .or_insert_with(|| Arc::clone(&state));
1387 let handle = Arc::clone(entry);
1388 drop(guard);
1389 if inserted_now {
1396 let join = crate::runtime::vector_turbo_kind::spawn_background_rebuild(
1397 Arc::clone(&self.store),
1398 collection.to_string(),
1399 Arc::clone(&handle),
1400 );
1401 self.turbo_rebuild_workers.lock().push(join);
1402 }
1403 Some(handle)
1404 }
1405
1406 pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1412 self.ml_runtime.get_or_init(|| {
1413 crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1414 |_handle| Ok(String::new()),
1418 ))
1419 })
1420 }
1421
1422 pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1426 self.semantic_cache.get_or_init(|| {
1427 Arc::new(crate::storage::ml::SemanticCache::new(
1428 crate::storage::ml::SemanticCacheConfig::default(),
1429 ))
1430 })
1431 }
1432
1433 pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1436 self.hypertables
1437 .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1438 }
1439
1440 pub fn continuous_aggregates(
1443 &self,
1444 ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1445 self.continuous_aggregates.get_or_init(|| {
1446 Arc::new(
1447 crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1448 )
1449 })
1450 }
1451
1452 pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1453 let mut file = File::open(path)?;
1454 let mut magic = [0u8; 4];
1455 let read = file.read(&mut magic)?;
1456 Ok(read == 4 && reddb_file::native_store_magic_matches(&magic))
1457 }
1458}