1use super::*;
2
3impl RedDB {
4 pub fn quorum_coordinator(
11 &self,
12 ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13 self.quorum.as_ref()
14 }
15
16 pub fn wait_for_replication_quorum(
27 &self,
28 target_lsn: u64,
29 ) -> Result<(), crate::replication::QuorumError> {
30 match &self.quorum {
31 Some(q) => q.wait_for_quorum(target_lsn),
32 None => Ok(()),
33 }
34 }
35}
36
37impl RedDB {
38 pub(crate) fn native_registry_summary_from_metadata(
39 &self,
40 metadata: &PhysicalMetadataFile,
41 ) -> NativeRegistrySummary {
42 const SAMPLE_LIMIT: usize = 32;
43
44 let collection_names: Vec<_> = metadata
45 .catalog
46 .stats_by_collection
47 .keys()
48 .take(SAMPLE_LIMIT)
49 .cloned()
50 .collect();
51 let indexes: Vec<_> = metadata
52 .indexes
53 .iter()
54 .take(SAMPLE_LIMIT)
55 .map(|index| NativeRegistryIndexSummary {
56 name: index.name.clone(),
57 kind: index.kind.as_str().to_string(),
58 collection: index.collection.clone(),
59 enabled: index.enabled,
60 entries: index.entries as u64,
61 estimated_memory_bytes: index.estimated_memory_bytes,
62 last_refresh_ms: index.last_refresh_ms,
63 backend: index.backend.clone(),
64 })
65 .collect();
66 let graph_projections: Vec<_> = metadata
67 .graph_projections
68 .iter()
69 .take(SAMPLE_LIMIT)
70 .map(|projection| NativeRegistryProjectionSummary {
71 name: projection.name.clone(),
72 source: projection.source.clone(),
73 created_at_unix_ms: projection.created_at_unix_ms,
74 updated_at_unix_ms: projection.updated_at_unix_ms,
75 node_labels: projection.node_labels.clone(),
76 node_types: projection.node_types.clone(),
77 edge_labels: projection.edge_labels.clone(),
78 last_materialized_sequence: projection.last_materialized_sequence,
79 })
80 .collect();
81 let analytics_jobs = metadata
82 .analytics_jobs
83 .iter()
84 .take(SAMPLE_LIMIT)
85 .map(|job| NativeRegistryJobSummary {
86 id: job.id.clone(),
87 kind: job.kind.clone(),
88 projection: job.projection.clone(),
89 state: job.state.clone(),
90 created_at_unix_ms: job.created_at_unix_ms,
91 updated_at_unix_ms: job.updated_at_unix_ms,
92 last_run_sequence: job.last_run_sequence,
93 metadata: job.metadata.clone(),
94 })
95 .collect::<Vec<_>>();
96 let vector_artifacts = self
97 .native_vector_artifact_records()
98 .into_iter()
99 .map(|(summary, _)| summary)
100 .take(SAMPLE_LIMIT)
101 .collect::<Vec<_>>();
102 let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104 NativeRegistrySummary {
105 collection_count: metadata.catalog.total_collections as u32,
106 index_count: metadata.indexes.len() as u32,
107 graph_projection_count: metadata.graph_projections.len() as u32,
108 analytics_job_count: metadata.analytics_jobs.len() as u32,
109 vector_artifact_count,
110 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111 indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112 graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113 analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114 vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115 omitted_collection_count: metadata
116 .catalog
117 .stats_by_collection
118 .len()
119 .saturating_sub(collection_names.len())
120 as u32,
121 omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122 omitted_graph_projection_count: metadata
123 .graph_projections
124 .len()
125 .saturating_sub(graph_projections.len())
126 as u32,
127 omitted_analytics_job_count: metadata
128 .analytics_jobs
129 .len()
130 .saturating_sub(analytics_jobs.len())
131 as u32,
132 omitted_vector_artifact_count: vector_artifact_count
133 .saturating_sub(vector_artifacts.len() as u32),
134 collection_names,
135 indexes,
136 graph_projections,
137 analytics_jobs,
138 vector_artifacts,
139 }
140 }
141
142 fn native_vector_artifact_collection_count(&self) -> usize {
143 self.native_vector_artifact_records().len()
144 }
145
146 pub(crate) fn native_vector_artifact_records(
147 &self,
148 ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149 let mut artifacts = Vec::new();
150 for collection in self.store.list_collections() {
151 let Some(manager) = self.store.get_collection(&collection) else {
152 continue;
153 };
154 let entities = manager.query_all(|_| true);
155 let mut vectors = Vec::new();
156 let mut graph_edges = Vec::new();
157 let mut fulltext_documents = Vec::new();
158 let mut document_records = Vec::new();
159 for entity in entities {
160 match entity.data {
161 EntityData::Vector(vector) => {
162 if !vector.dense.is_empty() {
163 vectors.push((entity.id, vector.dense));
164 }
165 }
166 EntityData::Edge(edge) => {
167 if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168 graph_edges.push((
169 entity.id,
170 edge_kind.from_node,
171 edge_kind.to_node,
172 edge_kind.label,
173 edge.weight,
174 ));
175 }
176 }
177 data => {
178 let text = Self::native_fulltext_text_for_entity(&data);
179 if !text.trim().is_empty() {
180 fulltext_documents.push((entity.id, text));
181 }
182 if let Some(document) =
183 Self::native_document_pathvalue_for_entity(entity.id, &data)
184 {
185 document_records.push(document);
186 }
187 }
188 }
189 }
190 if !vectors.is_empty() {
191 let dimension = vectors[0].1.len();
192 let mut hnsw = HnswIndex::with_dimension(dimension);
193 for (id, vector) in vectors
194 .into_iter()
195 .filter(|(_, vector)| vector.len() == dimension)
196 {
197 hnsw.insert_with_id(id.raw(), vector);
198 }
199 let stats = hnsw.stats();
200 let bytes = hnsw.to_bytes();
201 let summary = NativeVectorArtifactSummary {
202 collection: collection.clone(),
203 artifact_kind: "hnsw".to_string(),
204 vector_count: stats.node_count as u64,
205 dimension: stats.dimension as u32,
206 max_layer: stats.max_layer as u32,
207 serialized_bytes: bytes.len() as u64,
208 checksum: crate::storage::engine::crc32(&bytes) as u64,
209 };
210 artifacts.push((summary, bytes));
211
212 let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213 let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214 let training = manager
215 .query_all(|_| true)
216 .into_iter()
217 .filter_map(|entity| match entity.data {
218 EntityData::Vector(vector) if vector.dense.len() == dimension => {
219 Some(vector.dense)
220 }
221 _ => None,
222 })
223 .collect::<Vec<_>>();
224 ivf.train(&training);
225 let items = manager
226 .query_all(|_| true)
227 .into_iter()
228 .filter_map(|entity| match entity.data {
229 EntityData::Vector(vector) if vector.dense.len() == dimension => {
230 Some((entity.id.raw(), vector.dense))
231 }
232 _ => None,
233 })
234 .collect::<Vec<_>>();
235 ivf.add_batch_with_ids(items);
236 let ivf_stats = ivf.stats();
237 let ivf_bytes = ivf.to_bytes();
238 let ivf_summary = NativeVectorArtifactSummary {
239 collection: collection.clone(),
240 artifact_kind: "ivf".to_string(),
241 vector_count: ivf_stats.total_vectors as u64,
242 dimension: ivf_stats.dimension as u32,
243 max_layer: ivf_stats.n_lists as u32,
244 serialized_bytes: ivf_bytes.len() as u64,
245 checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246 };
247 artifacts.push((ivf_summary, ivf_bytes));
248 }
249
250 if !graph_edges.is_empty() {
251 let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252 let (edge_count, node_count, label_count) =
253 Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254 let summary = NativeVectorArtifactSummary {
255 collection: collection.clone(),
256 artifact_kind: "graph.adjacency".to_string(),
257 vector_count: edge_count,
258 dimension: node_count as u32,
259 max_layer: label_count,
260 serialized_bytes: bytes.len() as u64,
261 checksum: crate::storage::engine::crc32(&bytes) as u64,
262 };
263 artifacts.push((summary, bytes));
264 }
265
266 if !fulltext_documents.is_empty() {
267 let bytes =
268 Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269 let (doc_count, term_count, posting_count) =
270 Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271 let summary = NativeVectorArtifactSummary {
272 collection: collection.clone(),
273 artifact_kind: "text.fulltext".to_string(),
274 vector_count: posting_count,
275 dimension: doc_count as u32,
276 max_layer: term_count as u32,
277 serialized_bytes: bytes.len() as u64,
278 checksum: crate::storage::engine::crc32(&bytes) as u64,
279 };
280 artifacts.push((summary, bytes));
281 }
282
283 if !document_records.is_empty() {
284 let bytes = Self::serialize_native_document_pathvalue_artifact(
285 &collection,
286 &document_records,
287 );
288 let (doc_count, path_count, value_count, unique_value_count) =
289 Self::inspect_native_document_pathvalue_artifact(&bytes)
290 .unwrap_or((0, 0, 0, 0));
291 let _ = unique_value_count;
292 let summary = NativeVectorArtifactSummary {
293 collection: collection.clone(),
294 artifact_kind: "document.pathvalue".to_string(),
295 vector_count: value_count,
296 dimension: doc_count as u32,
297 max_layer: path_count as u32,
298 serialized_bytes: bytes.len() as u64,
299 checksum: crate::storage::engine::crc32(&bytes) as u64,
300 };
301 artifacts.push((summary, bytes));
302 }
303 }
304 artifacts
305 }
306
307 fn serialize_native_graph_adjacency_artifact(
308 edges: &[(EntityId, String, String, String, f32)],
309 ) -> Vec<u8> {
310 let edges: Vec<reddb_file::GraphAdjacencyEdge> = edges
311 .iter()
312 .map(
313 |(edge_id, from_node, to_node, label, weight)| reddb_file::GraphAdjacencyEdge {
314 edge_id: edge_id.raw(),
315 from_node: from_node.clone(),
316 to_node: to_node.clone(),
317 label: label.clone(),
318 weight: *weight,
319 },
320 )
321 .collect();
322 reddb_file::encode_graph_adjacency(&edges)
323 }
324
325 pub(crate) fn inspect_native_graph_adjacency_artifact(
326 bytes: &[u8],
327 ) -> Result<(u64, u64, u32), String> {
328 let edges = reddb_file::decode_graph_adjacency(bytes).map_err(|e| e.to_string())?;
329 let edge_count = edges.len() as u64;
330 let mut nodes = BTreeSet::new();
331 let mut labels = BTreeSet::new();
332 for edge in edges {
333 nodes.insert(edge.from_node);
334 nodes.insert(edge.to_node);
335 labels.insert(edge.label);
336 }
337 Ok((edge_count, nodes.len() as u64, labels.len() as u32))
338 }
339
340 fn serialize_native_fulltext_artifact(
341 collection: &str,
342 documents: &[(EntityId, String)],
343 ) -> Vec<u8> {
344 let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
345 for (entity_id, text) in documents {
346 let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
347 for token in Self::native_fulltext_tokenize(text) {
348 *frequencies.entry(token).or_insert(0) += 1;
349 }
350 for (token, count) in frequencies {
351 postings
352 .entry(token)
353 .or_default()
354 .push((entity_id.raw(), count));
355 }
356 }
357
358 reddb_file::encode_fulltext_index(collection, documents.len(), &postings)
359 }
360
361 pub(crate) fn inspect_native_fulltext_artifact(
362 bytes: &[u8],
363 ) -> Result<(u64, u64, u64), String> {
364 let index = reddb_file::decode_fulltext_index(bytes).map_err(|e| e.to_string())?;
365 let posting_count: u64 = index.postings.values().map(|e| e.len() as u64).sum();
366 Ok((
367 index.doc_count as u64,
368 index.postings.len() as u64,
369 posting_count,
370 ))
371 }
372
373 fn serialize_native_document_pathvalue_artifact(
374 collection: &str,
375 documents: &[(EntityId, Vec<(String, String)>)],
376 ) -> Vec<u8> {
377 let documents: Vec<reddb_file::DocPathValueRecord> = documents
378 .iter()
379 .map(|(entity_id, entries)| reddb_file::DocPathValueRecord {
380 entity_id: entity_id.raw(),
381 entries: entries.clone(),
382 })
383 .collect();
384 reddb_file::encode_document_pathvalue(collection, &documents)
385 }
386
387 pub(crate) fn inspect_native_document_pathvalue_artifact(
388 bytes: &[u8],
389 ) -> Result<(u64, u64, u64, u64), String> {
390 let index = reddb_file::decode_document_pathvalue(bytes).map_err(|e| e.to_string())?;
391 let doc_count = index.documents.len() as u64;
392 let mut paths = BTreeSet::new();
393 let mut values = BTreeSet::new();
394 let mut total_entries = 0u64;
395 for doc in index.documents {
396 for (path, value) in doc.entries {
397 paths.insert(path);
398 values.insert(value);
399 total_entries += 1;
400 }
401 }
402 Ok((
403 doc_count,
404 paths.len() as u64,
405 total_entries,
406 values.len() as u64,
407 ))
408 }
409
410 fn native_document_pathvalue_for_entity(
411 entity_id: EntityId,
412 data: &EntityData,
413 ) -> Option<(EntityId, Vec<(String, String)>)> {
414 let mut entries = Vec::new();
415 match data {
416 EntityData::Row(row) => {
417 if let Some(named) = &row.named {
418 for (key, value) in named {
419 Self::collect_native_document_entries_from_value(key, value, &mut entries);
420 }
421 }
422 for (idx, value) in row.columns.iter().enumerate() {
423 let path = format!("columns[{idx}]");
424 Self::collect_native_document_entries_from_value(&path, value, &mut entries);
425 }
426 }
427 EntityData::Node(node) => {
428 for (key, value) in &node.properties {
429 Self::collect_native_document_entries_from_value(key, value, &mut entries);
430 }
431 }
432 EntityData::Edge(edge) => {
433 for (key, value) in &edge.properties {
434 Self::collect_native_document_entries_from_value(key, value, &mut entries);
435 }
436 }
437 EntityData::Vector(_) => {}
438 EntityData::TimeSeries(_) => {}
439 EntityData::QueueMessage(_) => {}
440 }
441 if entries.is_empty() {
442 None
443 } else {
444 Some((entity_id, entries))
445 }
446 }
447
448 fn collect_native_document_entries_from_value(
449 path: &str,
450 value: &Value,
451 out: &mut Vec<(String, String)>,
452 ) {
453 match value {
454 Value::Json(bytes) | Value::Blob(bytes) => {
455 if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
456 Self::collect_native_document_entries_from_json(path, &json, out);
457 }
458 }
459 _ => {}
460 }
461 }
462
463 fn collect_native_document_entries_from_json(
464 path: &str,
465 value: &JsonValue,
466 out: &mut Vec<(String, String)>,
467 ) {
468 match value {
469 JsonValue::Object(entries) => {
470 for (key, value) in entries {
471 let next = if path.is_empty() {
472 key.clone()
473 } else {
474 format!("{path}.{key}")
475 };
476 Self::collect_native_document_entries_from_json(&next, value, out);
477 }
478 }
479 JsonValue::Array(items) => {
480 for (idx, value) in items.iter().enumerate() {
481 let next = format!("{path}[{idx}]");
482 Self::collect_native_document_entries_from_json(&next, value, out);
483 }
484 }
485 _ => {
486 if let Some(text) = Self::native_json_scalar_text(value) {
487 out.push((path.to_string(), text));
488 }
489 }
490 }
491 }
492
493 fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
494 match value {
495 JsonValue::Null => None,
496 JsonValue::Bool(value) => Some(value.to_string()),
497 JsonValue::Number(value) => Some(value.to_string()),
498 JsonValue::String(value) => Some(value.clone()),
499 JsonValue::Array(_) | JsonValue::Object(_) => None,
500 }
501 }
502
503 fn native_fulltext_text_for_entity(data: &EntityData) -> String {
504 match data {
505 EntityData::Row(row) => {
506 let mut parts = Vec::new();
507 if let Some(named) = &row.named {
508 for value in named.values() {
509 if let Some(text) = Self::native_value_text(value) {
510 parts.push(text);
511 }
512 }
513 }
514 for value in &row.columns {
515 if let Some(text) = Self::native_value_text(value) {
516 parts.push(text);
517 }
518 }
519 parts.join(" ")
520 }
521 EntityData::Node(node) => node
522 .properties
523 .values()
524 .filter_map(Self::native_value_text)
525 .collect::<Vec<_>>()
526 .join(" "),
527 EntityData::Edge(edge) => edge
528 .properties
529 .values()
530 .filter_map(Self::native_value_text)
531 .collect::<Vec<_>>()
532 .join(" "),
533 EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
534 EntityData::TimeSeries(ts) => ts.metric.clone(),
535 EntityData::QueueMessage(_) => String::new(),
536 }
537 }
538
539 fn native_value_text(value: &Value) -> Option<String> {
540 match value {
541 Value::Text(value) => Some(value.to_string()),
542 Value::Json(value) => String::from_utf8(value.clone()).ok(),
543 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
544 Value::Integer(value) => Some(value.to_string()),
545 Value::UnsignedInteger(value) => Some(value.to_string()),
546 Value::Float(value) => Some(value.to_string()),
547 Value::Boolean(value) => Some(value.to_string()),
548 Value::IpAddr(value) => Some(value.to_string()),
549 Value::NodeRef(value) => Some(value.clone()),
550 Value::EdgeRef(value) => Some(value.clone()),
551 Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
552 Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
553 Value::Timestamp(value) => Some(value.to_string()),
554 Value::Duration(value) => Some(value.to_string()),
555 Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
556 Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
557 Value::Email(s) => Some(s.clone()),
558 Value::Url(s) => Some(s.clone()),
559 Value::Phone(n) => Some(format!("+{}", n)),
560 Value::Semver(packed) => Some(format!(
561 "{}.{}.{}",
562 packed / 1_000_000,
563 (packed / 1_000) % 1_000,
564 packed % 1_000
565 )),
566 Value::Cidr(ip, prefix) => Some(format!(
567 "{}.{}.{}.{}/{}",
568 (ip >> 24) & 0xFF,
569 (ip >> 16) & 0xFF,
570 (ip >> 8) & 0xFF,
571 ip & 0xFF,
572 prefix
573 )),
574 Value::Date(days) => Some(days.to_string()),
575 Value::Time(ms) => {
576 let total_secs = ms / 1000;
577 Some(format!(
578 "{:02}:{:02}:{:02}",
579 total_secs / 3600,
580 (total_secs / 60) % 60,
581 total_secs % 60
582 ))
583 }
584 Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
585 Value::EnumValue(i) => Some(format!("enum({})", i)),
586 Value::Array(_) => None,
587 Value::TimestampMs(ms) => Some(ms.to_string()),
588 Value::Ipv4(ip) => Some(format!(
589 "{}.{}.{}.{}",
590 (ip >> 24) & 0xFF,
591 (ip >> 16) & 0xFF,
592 (ip >> 8) & 0xFF,
593 ip & 0xFF
594 )),
595 Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
596 Value::Subnet(ip, mask) => {
597 let prefix = mask.leading_ones();
598 Some(format!(
599 "{}.{}.{}.{}/{}",
600 (ip >> 24) & 0xFF,
601 (ip >> 16) & 0xFF,
602 (ip >> 8) & 0xFF,
603 ip & 0xFF,
604 prefix
605 ))
606 }
607 Value::Port(p) => Some(p.to_string()),
608 Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
609 Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
610 Value::GeoPoint(lat, lon) => Some(format!(
611 "{:.6},{:.6}",
612 *lat as f64 / 1_000_000.0,
613 *lon as f64 / 1_000_000.0
614 )),
615 Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
616 Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
617 Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
618 Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
619 Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
620 Value::AssetCode(code) => Some(code.clone()),
621 Value::Money { .. } => Some(value.display_string()),
622 Value::ColorAlpha([r, g, b, a]) => {
623 Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
624 }
625 Value::BigInt(v) => Some(v.to_string()),
626 Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
627 Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
628 Value::TableRef(name) => Some(name.clone()),
629 Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
630 Value::Secret(_) | Value::Password(_) => None,
631 }
632 }
633
634 fn native_fulltext_tokenize(text: &str) -> Vec<String> {
635 text.to_lowercase()
636 .split(|c: char| !c.is_alphanumeric())
637 .filter(|s| s.len() >= 2)
638 .map(|s| s.to_string())
639 .collect()
640 }
641
642 pub(crate) fn native_recovery_summary_from_metadata(
643 metadata: &PhysicalMetadataFile,
644 ) -> NativeRecoverySummary {
645 const SAMPLE_LIMIT: usize = 16;
646
647 let snapshots: Vec<_> = metadata
648 .snapshots
649 .iter()
650 .rev()
651 .take(SAMPLE_LIMIT)
652 .map(|snapshot| NativeSnapshotSummary {
653 snapshot_id: snapshot.snapshot_id,
654 created_at_unix_ms: snapshot.created_at_unix_ms,
655 superblock_sequence: snapshot.superblock_sequence,
656 collection_count: snapshot.collection_count as u32,
657 total_entities: snapshot.total_entities as u64,
658 })
659 .collect();
660 let exports: Vec<_> = metadata
661 .exports
662 .iter()
663 .rev()
664 .take(SAMPLE_LIMIT)
665 .map(|export| NativeExportSummary {
666 name: export.name.clone(),
667 created_at_unix_ms: export.created_at_unix_ms,
668 snapshot_id: export.snapshot_id,
669 superblock_sequence: export.superblock_sequence,
670 collection_count: export.collection_count as u32,
671 total_entities: export.total_entities as u64,
672 })
673 .collect();
674
675 NativeRecoverySummary {
676 snapshot_count: metadata.snapshots.len() as u32,
677 export_count: metadata.exports.len() as u32,
678 snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
679 exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
680 omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
681 omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
682 snapshots,
683 exports,
684 }
685 }
686
687 pub(crate) fn native_catalog_summary_from_metadata(
688 metadata: &PhysicalMetadataFile,
689 ) -> NativeCatalogSummary {
690 const SAMPLE_LIMIT: usize = 32;
691
692 let collections: Vec<_> = metadata
693 .catalog
694 .stats_by_collection
695 .iter()
696 .take(SAMPLE_LIMIT)
697 .map(|(name, stats)| NativeCatalogCollectionSummary {
698 name: name.clone(),
699 entities: stats.entities as u64,
700 cross_refs: stats.cross_refs as u64,
701 segments: stats.segments as u32,
702 })
703 .collect();
704
705 NativeCatalogSummary {
706 collection_count: metadata.catalog.total_collections as u32,
707 total_entities: metadata.catalog.total_entities as u64,
708 collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
709 omitted_collection_count: metadata
710 .catalog
711 .stats_by_collection
712 .len()
713 .saturating_sub(collections.len()) as u32,
714 collections,
715 }
716 }
717
718 pub(crate) fn native_metadata_state_summary_from_metadata(
719 metadata: &PhysicalMetadataFile,
720 ) -> NativeMetadataStateSummary {
721 NativeMetadataStateSummary {
722 protocol_version: metadata.protocol_version.clone(),
723 generated_at_unix_ms: metadata.generated_at_unix_ms,
724 last_loaded_from: metadata.last_loaded_from.clone(),
725 last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
726 }
727 }
728
729 pub(crate) fn inspect_native_header_against_metadata(
730 native: PhysicalFileHeader,
731 metadata: &PhysicalMetadataFile,
732 ) -> NativeHeaderInspection {
733 let expected = Self::native_header_from_metadata(metadata);
734 let mut mismatches = Vec::new();
735
736 if native.format_version != expected.format_version {
737 mismatches.push(NativeHeaderMismatch {
738 field: "format_version",
739 native: native.format_version.to_string(),
740 expected: expected.format_version.to_string(),
741 });
742 }
743 if native.sequence != expected.sequence {
744 mismatches.push(NativeHeaderMismatch {
745 field: "sequence",
746 native: native.sequence.to_string(),
747 expected: expected.sequence.to_string(),
748 });
749 }
750 if native.manifest_oldest_root != expected.manifest_oldest_root {
751 mismatches.push(NativeHeaderMismatch {
752 field: "manifest_oldest_root",
753 native: native.manifest_oldest_root.to_string(),
754 expected: expected.manifest_oldest_root.to_string(),
755 });
756 }
757 if native.manifest_root != expected.manifest_root {
758 mismatches.push(NativeHeaderMismatch {
759 field: "manifest_root",
760 native: native.manifest_root.to_string(),
761 expected: expected.manifest_root.to_string(),
762 });
763 }
764 if native.free_set_root != expected.free_set_root {
765 mismatches.push(NativeHeaderMismatch {
766 field: "free_set_root",
767 native: native.free_set_root.to_string(),
768 expected: expected.free_set_root.to_string(),
769 });
770 }
771 if native.collection_root_count != expected.collection_root_count {
772 mismatches.push(NativeHeaderMismatch {
773 field: "collection_root_count",
774 native: native.collection_root_count.to_string(),
775 expected: expected.collection_root_count.to_string(),
776 });
777 }
778 if native.snapshot_count != expected.snapshot_count {
779 mismatches.push(NativeHeaderMismatch {
780 field: "snapshot_count",
781 native: native.snapshot_count.to_string(),
782 expected: expected.snapshot_count.to_string(),
783 });
784 }
785 if native.index_count != expected.index_count {
786 mismatches.push(NativeHeaderMismatch {
787 field: "index_count",
788 native: native.index_count.to_string(),
789 expected: expected.index_count.to_string(),
790 });
791 }
792 if native.catalog_collection_count != expected.catalog_collection_count {
793 mismatches.push(NativeHeaderMismatch {
794 field: "catalog_collection_count",
795 native: native.catalog_collection_count.to_string(),
796 expected: expected.catalog_collection_count.to_string(),
797 });
798 }
799 if native.catalog_total_entities != expected.catalog_total_entities {
800 mismatches.push(NativeHeaderMismatch {
801 field: "catalog_total_entities",
802 native: native.catalog_total_entities.to_string(),
803 expected: expected.catalog_total_entities.to_string(),
804 });
805 }
806 if native.export_count != expected.export_count {
807 mismatches.push(NativeHeaderMismatch {
808 field: "export_count",
809 native: native.export_count.to_string(),
810 expected: expected.export_count.to_string(),
811 });
812 }
813 if native.graph_projection_count != expected.graph_projection_count {
814 mismatches.push(NativeHeaderMismatch {
815 field: "graph_projection_count",
816 native: native.graph_projection_count.to_string(),
817 expected: expected.graph_projection_count.to_string(),
818 });
819 }
820 if native.analytics_job_count != expected.analytics_job_count {
821 mismatches.push(NativeHeaderMismatch {
822 field: "analytics_job_count",
823 native: native.analytics_job_count.to_string(),
824 expected: expected.analytics_job_count.to_string(),
825 });
826 }
827 if native.manifest_event_count != expected.manifest_event_count {
828 mismatches.push(NativeHeaderMismatch {
829 field: "manifest_event_count",
830 native: native.manifest_event_count.to_string(),
831 expected: expected.manifest_event_count.to_string(),
832 });
833 }
834
835 NativeHeaderInspection {
836 native,
837 expected,
838 consistent: mismatches.is_empty(),
839 mismatches,
840 }
841 }
842
843 pub(crate) fn repair_policy_for_inspection(
844 inspection: &NativeHeaderInspection,
845 ) -> NativeHeaderRepairPolicy {
846 if inspection.consistent {
847 return NativeHeaderRepairPolicy::InSync;
848 }
849
850 if inspection.expected.sequence >= inspection.native.sequence {
851 NativeHeaderRepairPolicy::RepairNativeFromMetadata
852 } else {
853 NativeHeaderRepairPolicy::NativeAheadOfMetadata
854 }
855 }
856
857 pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
858 let retention = self.options.export_retention.max(1);
859 if exports.len() <= retention {
860 return;
861 }
862
863 exports.sort_by_key(|export| export.created_at_unix_ms);
864 let removed: Vec<ExportDescriptor> =
865 exports.drain(0..(exports.len() - retention)).collect();
866
867 for export in removed {
868 let _ = fs::remove_file(&export.data_path);
869 let _ = fs::remove_file(&export.metadata_path);
870 let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
871 &export.data_path,
872 ));
873 let _ = fs::remove_file(binary_path);
874 }
875 }
876
877 pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
878 let mut catalog = IndexCatalog::register_default_vector_graph(
879 self.options.has_capability(Capability::Table),
880 self.options.has_capability(Capability::Graph),
881 );
882 if self.options.has_capability(Capability::FullText) {
883 catalog.register(RuntimeIndexConfig::new(
884 "text-fulltext",
885 IndexKind::FullText,
886 ));
887 catalog.register(RuntimeIndexConfig::new(
888 "document-pathvalue",
889 IndexKind::DocumentPathValue,
890 ));
891 }
892 catalog.register(RuntimeIndexConfig::new(
893 "search-hybrid",
894 IndexKind::HybridSearch,
895 ));
896 catalog
897 }
898
899 pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
900 let catalog = self.runtime_index_catalog();
904 let snapshot = crate::catalog::snapshot_store_with_declarations(
905 "reddb",
906 self.store.as_ref(),
907 Some(&catalog),
908 None, None, );
911 let mut metrics_by_name = std::collections::BTreeMap::new();
912 for metric in &snapshot.indices {
913 metrics_by_name.insert(metric.name.clone(), metric.clone());
914 }
915
916 let mut states = Vec::new();
917 for collection in snapshot.collections {
918 for index_name in &collection.indices {
919 let metric = metrics_by_name.get(index_name);
920 let kind = metric
921 .map(|metric| metric.kind)
922 .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
923 let entries = estimate_index_entries(&collection, kind);
924 states.push(PhysicalIndexState {
925 name: format!("{}::{}", collection.name, index_name),
926 kind,
927 collection: Some(collection.name.clone()),
928 enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
929 entries,
930 estimated_memory_bytes: estimate_index_memory(entries, kind),
931 last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
932 backend: index_backend_name(kind).to_string(),
933 artifact_kind: None,
934 artifact_root_page: None,
935 artifact_checksum: None,
936 build_state: "catalog-derived".to_string(),
937 });
938 }
939 }
940
941 states
942 }
943
944 pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
945 let mut roots = BTreeMap::new();
946
947 for name in self.store.list_collections() {
948 let Some(manager) = self.store.get_collection(&name) else {
949 continue;
950 };
951
952 let stats = manager.stats();
953 let mut root = fnv1a_seed();
954 fnv1a_hash_value(&mut root, &name);
955 fnv1a_hash_value(&mut root, &stats.total_entities);
956 fnv1a_hash_value(&mut root, &stats.growing_count);
957 fnv1a_hash_value(&mut root, &stats.sealed_count);
958 fnv1a_hash_value(&mut root, &stats.archived_count);
959 fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
960 fnv1a_hash_value(&mut root, &stats.seal_ops);
961 fnv1a_hash_value(&mut root, &stats.compact_ops);
962
963 let mut entities = manager.query_all(|_| true);
964 entities.sort_by_key(|entity| entity.id.raw());
965
966 for entity in entities {
967 fnv1a_hash_value(&mut root, &entity.id.raw());
968 fnv1a_hash_value(&mut root, &entity.kind);
969 fnv1a_hash_value(&mut root, &entity.created_at);
970 fnv1a_hash_value(&mut root, &entity.updated_at);
971 fnv1a_hash_value(&mut root, &entity.data);
972 fnv1a_hash_value(&mut root, &entity.sequence_id);
973 fnv1a_hash_value(&mut root, &entity.embeddings().len());
974 fnv1a_hash_value(&mut root, &entity.cross_refs().len());
975 }
976
977 roots.insert(name, root);
978 }
979
980 roots
981 }
982
983 pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
989 TableRef::new(table, row_id)
990 }
991
992 pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
994 NodeRef::new(collection, node_id)
995 }
996
997 pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
999 VectorRef::new(collection, vector_id)
1000 }
1001
1002 pub fn query(&self) -> QueryBuilder {
1008 QueryBuilder::new(self.store.clone())
1009 }
1010
1011 pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1018 if self.store.get_collection(collection).is_none() {
1019 return Vec::new();
1020 }
1021
1022 if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1024 let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1025 let results = hnsw.search(vector, k);
1026 let mapped = self.hnsw_results_to_similar(collection, &results);
1027 if !mapped.is_empty() {
1028 return mapped;
1029 }
1030 }
1031
1032 self.similar_brute_force(collection, vector, k)
1034 }
1035
1036 fn similar_brute_force(
1038 &self,
1039 collection: &str,
1040 vector: &[f32],
1041 k: usize,
1042 ) -> Vec<SimilarResult> {
1043 let manager = match self.store.get_collection(collection) {
1044 Some(m) => m,
1045 None => return Vec::new(),
1046 };
1047
1048 let entities = manager.query_all(|_| true);
1049 let mut results: Vec<SimilarResult> = entities
1050 .iter()
1051 .filter_map(|e| {
1052 let score = match &e.data {
1053 EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1054 _ => e
1055 .embeddings()
1056 .iter()
1057 .map(|emb| cosine_similarity(vector, &emb.vector))
1058 .fold(0.0f32, f32::max),
1059 };
1060 let distance = (1.0 - score).max(0.0);
1061 if score > 0.0 {
1062 Some(SimilarResult {
1063 entity_id: e.id,
1064 score,
1065 distance,
1066 entity: e.clone(),
1067 })
1068 } else {
1069 None
1070 }
1071 })
1072 .collect();
1073
1074 results.sort_by(|a, b| {
1075 b.score
1076 .partial_cmp(&a.score)
1077 .unwrap_or(std::cmp::Ordering::Equal)
1078 });
1079 results.truncate(k);
1080 results
1081 }
1082
1083 fn get_or_build_hnsw_index(
1093 &self,
1094 collection: &str,
1095 query_dim: usize,
1096 ) -> Option<Arc<RwLock<HnswIndex>>> {
1097 let manager = self.store.get_collection(collection)?;
1098 let live_count = manager.count();
1099
1100 {
1102 let indexes = self
1103 .vector_indexes
1104 .read()
1105 .unwrap_or_else(|e| e.into_inner());
1106 if let Some(cached) = indexes.get(collection) {
1107 if cached.entity_count == live_count {
1108 return Some(Arc::clone(&cached.index));
1109 }
1110 }
1111 }
1112
1113 let entities = manager.query_all(|_| true);
1115
1116 let vectors: Vec<(u64, Vec<f32>)> = entities
1117 .iter()
1118 .filter_map(|e| match &e.data {
1119 EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1120 Some((e.id.raw(), v.dense.clone()))
1121 }
1122 _ => None,
1123 })
1124 .collect();
1125
1126 const MIN_VECTORS_FOR_HNSW: usize = 100;
1128 if vectors.len() < MIN_VECTORS_FOR_HNSW {
1129 return None;
1130 }
1131
1132 let config = crate::storage::engine::HnswConfig::with_m(16)
1134 .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1135 .with_ef_construction(100)
1136 .with_ef_search(50);
1137 let mut hnsw = HnswIndex::new(query_dim, config);
1138
1139 for (id, vec) in &vectors {
1140 hnsw.insert_with_id(*id, vec.clone());
1141 }
1142
1143 let index = Arc::new(RwLock::new(hnsw));
1144
1145 let mut indexes = self
1147 .vector_indexes
1148 .write()
1149 .unwrap_or_else(|e| e.into_inner());
1150 if let Some(cached) = indexes.get(collection) {
1152 if cached.entity_count == live_count {
1153 return Some(Arc::clone(&cached.index));
1154 }
1155 }
1156 indexes.insert(
1157 collection.to_string(),
1158 CachedVectorIndex {
1159 index: Arc::clone(&index),
1160 entity_count: live_count,
1161 },
1162 );
1163 Some(index)
1164 }
1165
1166 fn hnsw_results_to_similar(
1168 &self,
1169 collection: &str,
1170 results: &[crate::storage::engine::DistanceResult],
1171 ) -> Vec<SimilarResult> {
1172 results
1173 .iter()
1174 .filter_map(|dr| {
1175 let entity_id = EntityId::new(dr.id);
1176 let entity = self.store.get(collection, entity_id)?;
1177 let score = (1.0 - dr.distance).max(0.0);
1179 if score > 0.0 {
1180 Some(SimilarResult {
1181 entity_id,
1182 score,
1183 distance: dr.distance,
1184 entity,
1185 })
1186 } else {
1187 None
1188 }
1189 })
1190 .collect()
1191 }
1192
1193 pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1198 let mut indexes = self
1199 .vector_indexes
1200 .write()
1201 .unwrap_or_else(|e| e.into_inner());
1202 indexes.remove(collection);
1203 }
1204
1205 pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1207 self.store.get_any(id).map(|(_, e)| e)
1208 }
1209
1210 pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1212 self.store.get_any(id)
1213 }
1214
1215 pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1221 ids.iter().map(|id| self.get(*id)).collect()
1222 }
1223
1224 pub fn batch(&self) -> BatchBuilder {
1226 BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1227 }
1228
1229 pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1235 let mut preprocessors = self
1236 .preprocessors
1237 .write()
1238 .unwrap_or_else(|poisoned| poisoned.into_inner());
1239 preprocessors.push(Arc::from(preprocessor));
1240 }
1241
1242 pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1248 self.store
1249 .get_refs_from(id)
1250 .into_iter()
1251 .filter_map(|(target_id, ref_type, collection)| {
1252 self.store
1253 .get(&collection, target_id)
1254 .map(|entity| LinkedEntity {
1255 entity,
1256 ref_type,
1257 collection,
1258 })
1259 })
1260 .collect()
1261 }
1262
1263 pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1265 self.store
1266 .get_refs_to(id)
1267 .into_iter()
1268 .filter_map(|(source_id, ref_type, collection)| {
1269 self.store
1270 .get(&collection, source_id)
1271 .map(|entity| LinkedEntity {
1272 entity,
1273 ref_type,
1274 collection,
1275 })
1276 })
1277 .collect()
1278 }
1279
1280 pub fn store(&self) -> Arc<UnifiedStore> {
1282 self.store.clone()
1283 }
1284
1285 pub(crate) fn turbo_collections(
1291 &self,
1292 ) -> &Arc<
1293 parking_lot::Mutex<
1294 std::collections::HashMap<
1295 String,
1296 Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
1297 >,
1298 >,
1299 > {
1300 self.turbo_collections
1301 .get_or_init(|| Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())))
1302 }
1303
1304 pub(crate) fn turbo_state(
1314 &self,
1315 collection: &str,
1316 ) -> Option<Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>> {
1317 if !crate::runtime::vector_turbo_kind::is_turbo(&self.store, collection) {
1318 return None;
1319 }
1320 let map = self.turbo_collections();
1321 {
1322 let guard = map.lock();
1323 if let Some(state) = guard.get(collection) {
1324 return Some(Arc::clone(state));
1325 }
1326 }
1327 let contract = self.collection_contract(collection)?;
1328 let dim = contract.vector_dimension?;
1329 let metric = contract
1330 .vector_metric
1331 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine);
1332 let state = Arc::new(
1333 crate::runtime::vector_turbo_kind::TurboCollectionState::new(
1334 dim,
1335 metric,
1336 self.store.pager(),
1337 ),
1338 );
1339 if let Some((_, paths)) = self.options.resolve_tiered_layout() {
1344 state.set_snapshot_path(paths.turbo_snapshot_path(collection));
1345 }
1346 let mut guard = map.lock();
1347 let inserted_now = !guard.contains_key(collection);
1348 let entry = guard
1349 .entry(collection.to_string())
1350 .or_insert_with(|| Arc::clone(&state));
1351 let handle = Arc::clone(entry);
1352 drop(guard);
1353 if inserted_now {
1360 let join = crate::runtime::vector_turbo_kind::spawn_background_rebuild(
1361 Arc::clone(&self.store),
1362 collection.to_string(),
1363 Arc::clone(&handle),
1364 );
1365 self.turbo_rebuild_workers.lock().push(join);
1366 }
1367 Some(handle)
1368 }
1369
1370 pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1376 self.ml_runtime.get_or_init(|| {
1377 crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1378 |_handle| Ok(String::new()),
1382 ))
1383 })
1384 }
1385
1386 pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1390 self.semantic_cache.get_or_init(|| {
1391 Arc::new(crate::storage::ml::SemanticCache::new(
1392 crate::storage::ml::SemanticCacheConfig::default(),
1393 ))
1394 })
1395 }
1396
1397 pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1400 self.hypertables
1401 .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1402 }
1403
1404 pub fn continuous_aggregates(
1407 &self,
1408 ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1409 self.continuous_aggregates.get_or_init(|| {
1410 Arc::new(
1411 crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1412 )
1413 })
1414 }
1415
1416 pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1417 let mut file = File::open(path)?;
1418 let mut magic = [0u8; 4];
1419 let read = file.read(&mut magic)?;
1420 Ok(read == 4 && reddb_file::native_store_magic_matches(&magic))
1421 }
1422}