Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_access.rs

1use super::*;
2
3impl RedDB {
4    /// Access the quorum coordinator (Phase 2.6 PG parity).
5    ///
6    /// `None` when this instance is not a primary. Write paths call
7    /// `quorum.wait_for_quorum(lsn)` after the primary WAL commit to
8    /// block until the configured replica quorum has acked; `Async`
9    /// mode (default) returns instantly so no behavioural change.
10    pub fn quorum_coordinator(
11        &self,
12    ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13        self.quorum.as_ref()
14    }
15
16    /// Wait for the configured replica quorum on a given LSN.
17    ///
18    /// Convenience wrapper that returns `Ok(())` when:
19    /// * there is no quorum coordinator (non-primary instance), OR
20    /// * the coordinator is in `Async` mode (historical behaviour).
21    ///
22    /// Callers in the write path (INSERT/UPDATE/DELETE dispatch) invoke
23    /// this after logging the commit record. Returning `Err` signals
24    /// that replication did NOT reach quorum — the primary still has
25    /// the record but the client ack should be deferred or rejected.
26    pub fn wait_for_replication_quorum(
27        &self,
28        target_lsn: u64,
29    ) -> Result<(), crate::replication::QuorumError> {
30        match &self.quorum {
31            Some(q) => q.wait_for_quorum(target_lsn),
32            None => Ok(()),
33        }
34    }
35}
36
37impl RedDB {
38    pub(crate) fn native_registry_summary_from_metadata(
39        &self,
40        metadata: &PhysicalMetadataFile,
41    ) -> NativeRegistrySummary {
42        const SAMPLE_LIMIT: usize = 32;
43
44        let collection_names: Vec<_> = metadata
45            .catalog
46            .stats_by_collection
47            .keys()
48            .take(SAMPLE_LIMIT)
49            .cloned()
50            .collect();
51        let indexes: Vec<_> = metadata
52            .indexes
53            .iter()
54            .take(SAMPLE_LIMIT)
55            .map(|index| NativeRegistryIndexSummary {
56                name: index.name.clone(),
57                kind: index.kind.as_str().to_string(),
58                collection: index.collection.clone(),
59                enabled: index.enabled,
60                entries: index.entries as u64,
61                estimated_memory_bytes: index.estimated_memory_bytes,
62                last_refresh_ms: index.last_refresh_ms,
63                backend: index.backend.clone(),
64            })
65            .collect();
66        let graph_projections: Vec<_> = metadata
67            .graph_projections
68            .iter()
69            .take(SAMPLE_LIMIT)
70            .map(|projection| NativeRegistryProjectionSummary {
71                name: projection.name.clone(),
72                source: projection.source.clone(),
73                created_at_unix_ms: projection.created_at_unix_ms,
74                updated_at_unix_ms: projection.updated_at_unix_ms,
75                node_labels: projection.node_labels.clone(),
76                node_types: projection.node_types.clone(),
77                edge_labels: projection.edge_labels.clone(),
78                last_materialized_sequence: projection.last_materialized_sequence,
79            })
80            .collect();
81        let analytics_jobs = metadata
82            .analytics_jobs
83            .iter()
84            .take(SAMPLE_LIMIT)
85            .map(|job| NativeRegistryJobSummary {
86                id: job.id.clone(),
87                kind: job.kind.clone(),
88                projection: job.projection.clone(),
89                state: job.state.clone(),
90                created_at_unix_ms: job.created_at_unix_ms,
91                updated_at_unix_ms: job.updated_at_unix_ms,
92                last_run_sequence: job.last_run_sequence,
93                metadata: job.metadata.clone(),
94            })
95            .collect::<Vec<_>>();
96        let vector_artifacts = self
97            .native_vector_artifact_records()
98            .into_iter()
99            .map(|(summary, _)| summary)
100            .take(SAMPLE_LIMIT)
101            .collect::<Vec<_>>();
102        let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104        NativeRegistrySummary {
105            collection_count: metadata.catalog.total_collections as u32,
106            index_count: metadata.indexes.len() as u32,
107            graph_projection_count: metadata.graph_projections.len() as u32,
108            analytics_job_count: metadata.analytics_jobs.len() as u32,
109            vector_artifact_count,
110            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111            indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112            graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113            analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114            vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115            omitted_collection_count: metadata
116                .catalog
117                .stats_by_collection
118                .len()
119                .saturating_sub(collection_names.len())
120                as u32,
121            omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122            omitted_graph_projection_count: metadata
123                .graph_projections
124                .len()
125                .saturating_sub(graph_projections.len())
126                as u32,
127            omitted_analytics_job_count: metadata
128                .analytics_jobs
129                .len()
130                .saturating_sub(analytics_jobs.len())
131                as u32,
132            omitted_vector_artifact_count: vector_artifact_count
133                .saturating_sub(vector_artifacts.len() as u32),
134            collection_names,
135            indexes,
136            graph_projections,
137            analytics_jobs,
138            vector_artifacts,
139        }
140    }
141
142    fn native_vector_artifact_collection_count(&self) -> usize {
143        self.native_vector_artifact_records().len()
144    }
145
146    pub(crate) fn native_vector_artifact_records(
147        &self,
148    ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149        let mut artifacts = Vec::new();
150        for collection in self.store.list_collections() {
151            let Some(manager) = self.store.get_collection(&collection) else {
152                continue;
153            };
154            let entities = manager.query_all(|_| true);
155            let mut vectors = Vec::new();
156            let mut graph_edges = Vec::new();
157            let mut fulltext_documents = Vec::new();
158            let mut document_records = Vec::new();
159            for entity in entities {
160                match entity.data {
161                    EntityData::Vector(vector) => {
162                        if !vector.dense.is_empty() {
163                            vectors.push((entity.id, vector.dense));
164                        }
165                    }
166                    EntityData::Edge(edge) => {
167                        if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168                            graph_edges.push((
169                                entity.id,
170                                edge_kind.from_node,
171                                edge_kind.to_node,
172                                edge_kind.label,
173                                edge.weight,
174                            ));
175                        }
176                    }
177                    data => {
178                        let text = Self::native_fulltext_text_for_entity(&data);
179                        if !text.trim().is_empty() {
180                            fulltext_documents.push((entity.id, text));
181                        }
182                        if let Some(document) =
183                            Self::native_document_pathvalue_for_entity(entity.id, &data)
184                        {
185                            document_records.push(document);
186                        }
187                    }
188                }
189            }
190            if !vectors.is_empty() {
191                let dimension = vectors[0].1.len();
192                let mut hnsw = HnswIndex::with_dimension(dimension);
193                for (id, vector) in vectors
194                    .into_iter()
195                    .filter(|(_, vector)| vector.len() == dimension)
196                {
197                    hnsw.insert_with_id(id.raw(), vector);
198                }
199                let stats = hnsw.stats();
200                let bytes = hnsw.to_bytes();
201                let summary = NativeVectorArtifactSummary {
202                    collection: collection.clone(),
203                    artifact_kind: "hnsw".to_string(),
204                    vector_count: stats.node_count as u64,
205                    dimension: stats.dimension as u32,
206                    max_layer: stats.max_layer as u32,
207                    serialized_bytes: bytes.len() as u64,
208                    checksum: crate::storage::engine::crc32(&bytes) as u64,
209                };
210                artifacts.push((summary, bytes));
211
212                let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213                let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214                let training = manager
215                    .query_all(|_| true)
216                    .into_iter()
217                    .filter_map(|entity| match entity.data {
218                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
219                            Some(vector.dense)
220                        }
221                        _ => None,
222                    })
223                    .collect::<Vec<_>>();
224                ivf.train(&training);
225                let items = manager
226                    .query_all(|_| true)
227                    .into_iter()
228                    .filter_map(|entity| match entity.data {
229                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
230                            Some((entity.id.raw(), vector.dense))
231                        }
232                        _ => None,
233                    })
234                    .collect::<Vec<_>>();
235                ivf.add_batch_with_ids(items);
236                let ivf_stats = ivf.stats();
237                let ivf_bytes = ivf.to_bytes();
238                let ivf_summary = NativeVectorArtifactSummary {
239                    collection: collection.clone(),
240                    artifact_kind: "ivf".to_string(),
241                    vector_count: ivf_stats.total_vectors as u64,
242                    dimension: ivf_stats.dimension as u32,
243                    max_layer: ivf_stats.n_lists as u32,
244                    serialized_bytes: ivf_bytes.len() as u64,
245                    checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246                };
247                artifacts.push((ivf_summary, ivf_bytes));
248            }
249
250            if !graph_edges.is_empty() {
251                let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252                let (edge_count, node_count, label_count) =
253                    Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254                let summary = NativeVectorArtifactSummary {
255                    collection: collection.clone(),
256                    artifact_kind: "graph.adjacency".to_string(),
257                    vector_count: edge_count,
258                    dimension: node_count as u32,
259                    max_layer: label_count,
260                    serialized_bytes: bytes.len() as u64,
261                    checksum: crate::storage::engine::crc32(&bytes) as u64,
262                };
263                artifacts.push((summary, bytes));
264            }
265
266            if !fulltext_documents.is_empty() {
267                let bytes =
268                    Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269                let (doc_count, term_count, posting_count) =
270                    Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271                let summary = NativeVectorArtifactSummary {
272                    collection: collection.clone(),
273                    artifact_kind: "text.fulltext".to_string(),
274                    vector_count: posting_count,
275                    dimension: doc_count as u32,
276                    max_layer: term_count as u32,
277                    serialized_bytes: bytes.len() as u64,
278                    checksum: crate::storage::engine::crc32(&bytes) as u64,
279                };
280                artifacts.push((summary, bytes));
281            }
282
283            if !document_records.is_empty() {
284                let bytes = Self::serialize_native_document_pathvalue_artifact(
285                    &collection,
286                    &document_records,
287                );
288                let (doc_count, path_count, value_count, unique_value_count) =
289                    Self::inspect_native_document_pathvalue_artifact(&bytes)
290                        .unwrap_or((0, 0, 0, 0));
291                let _ = unique_value_count;
292                let summary = NativeVectorArtifactSummary {
293                    collection: collection.clone(),
294                    artifact_kind: "document.pathvalue".to_string(),
295                    vector_count: value_count,
296                    dimension: doc_count as u32,
297                    max_layer: path_count as u32,
298                    serialized_bytes: bytes.len() as u64,
299                    checksum: crate::storage::engine::crc32(&bytes) as u64,
300                };
301                artifacts.push((summary, bytes));
302            }
303        }
304        artifacts
305    }
306
307    fn serialize_native_graph_adjacency_artifact(
308        edges: &[(EntityId, String, String, String, f32)],
309    ) -> Vec<u8> {
310        let mut data = Vec::with_capacity(32 + edges.len() * 48);
311        data.extend_from_slice(b"RDGA");
312        data.extend_from_slice(&(edges.len() as u32).to_le_bytes());
313        for (edge_id, from_node, to_node, label, weight) in edges {
314            data.extend_from_slice(&edge_id.raw().to_le_bytes());
315            Self::push_native_artifact_string(&mut data, from_node);
316            Self::push_native_artifact_string(&mut data, to_node);
317            Self::push_native_artifact_string(&mut data, label);
318            data.extend_from_slice(&weight.to_le_bytes());
319        }
320        data
321    }
322
323    pub(crate) fn inspect_native_graph_adjacency_artifact(
324        bytes: &[u8],
325    ) -> Result<(u64, u64, u32), String> {
326        if bytes.len() < 8 || &bytes[0..4] != b"RDGA" {
327            return Err("invalid graph adjacency artifact".to_string());
328        }
329        let mut pos = 4usize;
330        let edge_count =
331            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
332                as usize;
333        pos += 4;
334        let mut nodes = BTreeSet::new();
335        let mut labels = BTreeSet::new();
336        for _ in 0..edge_count {
337            if pos + 8 > bytes.len() {
338                return Err("truncated graph adjacency artifact".to_string());
339            }
340            pos += 8;
341            let from = Self::read_native_artifact_string(bytes, &mut pos)?;
342            let to = Self::read_native_artifact_string(bytes, &mut pos)?;
343            let label = Self::read_native_artifact_string(bytes, &mut pos)?;
344            if pos + 4 > bytes.len() {
345                return Err("truncated graph adjacency artifact weight".to_string());
346            }
347            pos += 4;
348            nodes.insert(from);
349            nodes.insert(to);
350            labels.insert(label);
351        }
352        Ok((edge_count as u64, nodes.len() as u64, labels.len() as u32))
353    }
354
355    fn serialize_native_fulltext_artifact(
356        collection: &str,
357        documents: &[(EntityId, String)],
358    ) -> Vec<u8> {
359        let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
360        for (entity_id, text) in documents {
361            let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
362            for token in Self::native_fulltext_tokenize(text) {
363                *frequencies.entry(token).or_insert(0) += 1;
364            }
365            for (token, count) in frequencies {
366                postings
367                    .entry(token)
368                    .or_default()
369                    .push((entity_id.raw(), count));
370            }
371        }
372
373        let mut data = Vec::with_capacity(64 + postings.len() * 32);
374        data.extend_from_slice(b"RDFT");
375        Self::push_native_artifact_string(&mut data, collection);
376        data.extend_from_slice(&(documents.len() as u32).to_le_bytes());
377        data.extend_from_slice(&(postings.len() as u32).to_le_bytes());
378        for (term, entries) in postings {
379            Self::push_native_artifact_string(&mut data, &term);
380            data.extend_from_slice(&(entries.len() as u32).to_le_bytes());
381            for (entity_id, term_count) in entries {
382                data.extend_from_slice(&entity_id.to_le_bytes());
383                data.extend_from_slice(&term_count.to_le_bytes());
384            }
385        }
386        data
387    }
388
389    pub(crate) fn inspect_native_fulltext_artifact(
390        bytes: &[u8],
391    ) -> Result<(u64, u64, u64), String> {
392        if bytes.len() < 12 || &bytes[0..4] != b"RDFT" {
393            return Err("invalid fulltext artifact".to_string());
394        }
395        let mut pos = 4usize;
396        let _collection = Self::read_native_artifact_string(bytes, &mut pos)?;
397        if pos + 8 > bytes.len() {
398            return Err("truncated fulltext artifact".to_string());
399        }
400        let doc_count =
401            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
402        pos += 4;
403        let term_count =
404            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
405        pos += 4;
406        let mut posting_count = 0u64;
407        for _ in 0..term_count {
408            let _term = Self::read_native_artifact_string(bytes, &mut pos)?;
409            if pos + 4 > bytes.len() {
410                return Err("truncated fulltext posting count".to_string());
411            }
412            let entry_count =
413                u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
414                    as u64;
415            pos += 4;
416            posting_count += entry_count;
417            let bytes_needed = entry_count as usize * 12;
418            if pos + bytes_needed > bytes.len() {
419                return Err("truncated fulltext postings".to_string());
420            }
421            pos += bytes_needed;
422        }
423        Ok((doc_count, term_count, posting_count))
424    }
425
426    fn serialize_native_document_pathvalue_artifact(
427        collection: &str,
428        documents: &[(EntityId, Vec<(String, String)>)],
429    ) -> Vec<u8> {
430        let total_entries: usize = documents.iter().map(|(_, entries)| entries.len()).sum();
431        let mut data = Vec::with_capacity(64 + total_entries * 48);
432        data.extend_from_slice(b"RDDP");
433        Self::push_native_artifact_string(&mut data, collection);
434        data.extend_from_slice(&(documents.len() as u32).to_le_bytes());
435        data.extend_from_slice(&(total_entries as u32).to_le_bytes());
436        for (entity_id, entries) in documents {
437            data.extend_from_slice(&entity_id.raw().to_le_bytes());
438            data.extend_from_slice(&(entries.len() as u32).to_le_bytes());
439            for (path, value) in entries {
440                Self::push_native_artifact_string(&mut data, path);
441                Self::push_native_artifact_string(&mut data, value);
442            }
443        }
444        data
445    }
446
447    pub(crate) fn inspect_native_document_pathvalue_artifact(
448        bytes: &[u8],
449    ) -> Result<(u64, u64, u64, u64), String> {
450        if bytes.len() < 12 || &bytes[0..4] != b"RDDP" {
451            return Err("invalid document path/value artifact".to_string());
452        }
453        let mut pos = 4usize;
454        let _collection = Self::read_native_artifact_string(bytes, &mut pos)?;
455        if pos + 8 > bytes.len() {
456            return Err("truncated document path/value artifact".to_string());
457        }
458        let doc_count =
459            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
460        pos += 4;
461        let total_entries =
462            u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) as u64;
463        pos += 4;
464        let mut paths = BTreeSet::new();
465        let mut values = BTreeSet::new();
466        let mut seen_entries = 0u64;
467        for _ in 0..doc_count {
468            if pos + 12 > bytes.len() {
469                return Err("truncated document path/value record".to_string());
470            }
471            pos += 8;
472            let entry_count =
473                u32::from_le_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
474                    as usize;
475            pos += 4;
476            for _ in 0..entry_count {
477                let path = Self::read_native_artifact_string(bytes, &mut pos)?;
478                let value = Self::read_native_artifact_string(bytes, &mut pos)?;
479                paths.insert(path);
480                values.insert(value);
481                seen_entries += 1;
482            }
483        }
484        if seen_entries != total_entries {
485            return Err("document path/value artifact entry count mismatch".to_string());
486        }
487        Ok((
488            doc_count,
489            paths.len() as u64,
490            total_entries,
491            values.len() as u64,
492        ))
493    }
494
495    fn native_document_pathvalue_for_entity(
496        entity_id: EntityId,
497        data: &EntityData,
498    ) -> Option<(EntityId, Vec<(String, String)>)> {
499        let mut entries = Vec::new();
500        match data {
501            EntityData::Row(row) => {
502                if let Some(named) = &row.named {
503                    for (key, value) in named {
504                        Self::collect_native_document_entries_from_value(key, value, &mut entries);
505                    }
506                }
507                for (idx, value) in row.columns.iter().enumerate() {
508                    let path = format!("columns[{idx}]");
509                    Self::collect_native_document_entries_from_value(&path, value, &mut entries);
510                }
511            }
512            EntityData::Node(node) => {
513                for (key, value) in &node.properties {
514                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
515                }
516            }
517            EntityData::Edge(edge) => {
518                for (key, value) in &edge.properties {
519                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
520                }
521            }
522            EntityData::Vector(_) => {}
523            EntityData::TimeSeries(_) => {}
524            EntityData::QueueMessage(_) => {}
525        }
526        if entries.is_empty() {
527            None
528        } else {
529            Some((entity_id, entries))
530        }
531    }
532
533    fn collect_native_document_entries_from_value(
534        path: &str,
535        value: &Value,
536        out: &mut Vec<(String, String)>,
537    ) {
538        match value {
539            Value::Json(bytes) | Value::Blob(bytes) => {
540                if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
541                    Self::collect_native_document_entries_from_json(path, &json, out);
542                }
543            }
544            _ => {}
545        }
546    }
547
548    fn collect_native_document_entries_from_json(
549        path: &str,
550        value: &JsonValue,
551        out: &mut Vec<(String, String)>,
552    ) {
553        match value {
554            JsonValue::Object(entries) => {
555                for (key, value) in entries {
556                    let next = if path.is_empty() {
557                        key.clone()
558                    } else {
559                        format!("{path}.{key}")
560                    };
561                    Self::collect_native_document_entries_from_json(&next, value, out);
562                }
563            }
564            JsonValue::Array(items) => {
565                for (idx, value) in items.iter().enumerate() {
566                    let next = format!("{path}[{idx}]");
567                    Self::collect_native_document_entries_from_json(&next, value, out);
568                }
569            }
570            _ => {
571                if let Some(text) = Self::native_json_scalar_text(value) {
572                    out.push((path.to_string(), text));
573                }
574            }
575        }
576    }
577
578    fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
579        match value {
580            JsonValue::Null => None,
581            JsonValue::Bool(value) => Some(value.to_string()),
582            JsonValue::Number(value) => Some(value.to_string()),
583            JsonValue::String(value) => Some(value.clone()),
584            JsonValue::Array(_) | JsonValue::Object(_) => None,
585        }
586    }
587
588    fn native_fulltext_text_for_entity(data: &EntityData) -> String {
589        match data {
590            EntityData::Row(row) => {
591                let mut parts = Vec::new();
592                if let Some(named) = &row.named {
593                    for value in named.values() {
594                        if let Some(text) = Self::native_value_text(value) {
595                            parts.push(text);
596                        }
597                    }
598                }
599                for value in &row.columns {
600                    if let Some(text) = Self::native_value_text(value) {
601                        parts.push(text);
602                    }
603                }
604                parts.join(" ")
605            }
606            EntityData::Node(node) => node
607                .properties
608                .values()
609                .filter_map(Self::native_value_text)
610                .collect::<Vec<_>>()
611                .join(" "),
612            EntityData::Edge(edge) => edge
613                .properties
614                .values()
615                .filter_map(Self::native_value_text)
616                .collect::<Vec<_>>()
617                .join(" "),
618            EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
619            EntityData::TimeSeries(ts) => ts.metric.clone(),
620            EntityData::QueueMessage(_) => String::new(),
621        }
622    }
623
624    fn native_value_text(value: &Value) -> Option<String> {
625        match value {
626            Value::Text(value) => Some(value.to_string()),
627            Value::Json(value) => String::from_utf8(value.clone()).ok(),
628            Value::Blob(value) => String::from_utf8(value.clone()).ok(),
629            Value::Integer(value) => Some(value.to_string()),
630            Value::UnsignedInteger(value) => Some(value.to_string()),
631            Value::Float(value) => Some(value.to_string()),
632            Value::Boolean(value) => Some(value.to_string()),
633            Value::IpAddr(value) => Some(value.to_string()),
634            Value::NodeRef(value) => Some(value.clone()),
635            Value::EdgeRef(value) => Some(value.clone()),
636            Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
637            Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
638            Value::Timestamp(value) => Some(value.to_string()),
639            Value::Duration(value) => Some(value.to_string()),
640            Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
641            Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
642            Value::Email(s) => Some(s.clone()),
643            Value::Url(s) => Some(s.clone()),
644            Value::Phone(n) => Some(format!("+{}", n)),
645            Value::Semver(packed) => Some(format!(
646                "{}.{}.{}",
647                packed / 1_000_000,
648                (packed / 1_000) % 1_000,
649                packed % 1_000
650            )),
651            Value::Cidr(ip, prefix) => Some(format!(
652                "{}.{}.{}.{}/{}",
653                (ip >> 24) & 0xFF,
654                (ip >> 16) & 0xFF,
655                (ip >> 8) & 0xFF,
656                ip & 0xFF,
657                prefix
658            )),
659            Value::Date(days) => Some(days.to_string()),
660            Value::Time(ms) => {
661                let total_secs = ms / 1000;
662                Some(format!(
663                    "{:02}:{:02}:{:02}",
664                    total_secs / 3600,
665                    (total_secs / 60) % 60,
666                    total_secs % 60
667                ))
668            }
669            Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
670            Value::EnumValue(i) => Some(format!("enum({})", i)),
671            Value::Array(_) => None,
672            Value::TimestampMs(ms) => Some(ms.to_string()),
673            Value::Ipv4(ip) => Some(format!(
674                "{}.{}.{}.{}",
675                (ip >> 24) & 0xFF,
676                (ip >> 16) & 0xFF,
677                (ip >> 8) & 0xFF,
678                ip & 0xFF
679            )),
680            Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
681            Value::Subnet(ip, mask) => {
682                let prefix = mask.leading_ones();
683                Some(format!(
684                    "{}.{}.{}.{}/{}",
685                    (ip >> 24) & 0xFF,
686                    (ip >> 16) & 0xFF,
687                    (ip >> 8) & 0xFF,
688                    ip & 0xFF,
689                    prefix
690                ))
691            }
692            Value::Port(p) => Some(p.to_string()),
693            Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
694            Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
695            Value::GeoPoint(lat, lon) => Some(format!(
696                "{:.6},{:.6}",
697                *lat as f64 / 1_000_000.0,
698                *lon as f64 / 1_000_000.0
699            )),
700            Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
701            Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
702            Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
703            Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
704            Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
705            Value::AssetCode(code) => Some(code.clone()),
706            Value::Money { .. } => Some(value.display_string()),
707            Value::ColorAlpha([r, g, b, a]) => {
708                Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
709            }
710            Value::BigInt(v) => Some(v.to_string()),
711            Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
712            Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
713            Value::TableRef(name) => Some(name.clone()),
714            Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
715            Value::Secret(_) | Value::Password(_) => None,
716        }
717    }
718
719    fn native_fulltext_tokenize(text: &str) -> Vec<String> {
720        text.to_lowercase()
721            .split(|c: char| !c.is_alphanumeric())
722            .filter(|s| s.len() >= 2)
723            .map(|s| s.to_string())
724            .collect()
725    }
726
727    fn push_native_artifact_string(buf: &mut Vec<u8>, value: &str) {
728        let bytes = value.as_bytes();
729        buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
730        buf.extend_from_slice(bytes);
731    }
732
733    fn read_native_artifact_string(bytes: &[u8], pos: &mut usize) -> Result<String, String> {
734        if *pos + 4 > bytes.len() {
735            return Err("truncated native artifact string length".to_string());
736        }
737        let len = u32::from_le_bytes([
738            bytes[*pos],
739            bytes[*pos + 1],
740            bytes[*pos + 2],
741            bytes[*pos + 3],
742        ]) as usize;
743        *pos += 4;
744        if *pos + len > bytes.len() {
745            return Err("truncated native artifact string content".to_string());
746        }
747        let value = std::str::from_utf8(&bytes[*pos..*pos + len])
748            .map_err(|_| "invalid utf-8 in native artifact".to_string())?
749            .to_string();
750        *pos += len;
751        Ok(value)
752    }
753
754    pub(crate) fn native_recovery_summary_from_metadata(
755        metadata: &PhysicalMetadataFile,
756    ) -> NativeRecoverySummary {
757        const SAMPLE_LIMIT: usize = 16;
758
759        let snapshots: Vec<_> = metadata
760            .snapshots
761            .iter()
762            .rev()
763            .take(SAMPLE_LIMIT)
764            .map(|snapshot| NativeSnapshotSummary {
765                snapshot_id: snapshot.snapshot_id,
766                created_at_unix_ms: snapshot.created_at_unix_ms,
767                superblock_sequence: snapshot.superblock_sequence,
768                collection_count: snapshot.collection_count as u32,
769                total_entities: snapshot.total_entities as u64,
770            })
771            .collect();
772        let exports: Vec<_> = metadata
773            .exports
774            .iter()
775            .rev()
776            .take(SAMPLE_LIMIT)
777            .map(|export| NativeExportSummary {
778                name: export.name.clone(),
779                created_at_unix_ms: export.created_at_unix_ms,
780                snapshot_id: export.snapshot_id,
781                superblock_sequence: export.superblock_sequence,
782                collection_count: export.collection_count as u32,
783                total_entities: export.total_entities as u64,
784            })
785            .collect();
786
787        NativeRecoverySummary {
788            snapshot_count: metadata.snapshots.len() as u32,
789            export_count: metadata.exports.len() as u32,
790            snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
791            exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
792            omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
793            omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
794            snapshots,
795            exports,
796        }
797    }
798
799    pub(crate) fn native_catalog_summary_from_metadata(
800        metadata: &PhysicalMetadataFile,
801    ) -> NativeCatalogSummary {
802        const SAMPLE_LIMIT: usize = 32;
803
804        let collections: Vec<_> = metadata
805            .catalog
806            .stats_by_collection
807            .iter()
808            .take(SAMPLE_LIMIT)
809            .map(|(name, stats)| NativeCatalogCollectionSummary {
810                name: name.clone(),
811                entities: stats.entities as u64,
812                cross_refs: stats.cross_refs as u64,
813                segments: stats.segments as u32,
814            })
815            .collect();
816
817        NativeCatalogSummary {
818            collection_count: metadata.catalog.total_collections as u32,
819            total_entities: metadata.catalog.total_entities as u64,
820            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
821            omitted_collection_count: metadata
822                .catalog
823                .stats_by_collection
824                .len()
825                .saturating_sub(collections.len()) as u32,
826            collections,
827        }
828    }
829
830    pub(crate) fn native_metadata_state_summary_from_metadata(
831        metadata: &PhysicalMetadataFile,
832    ) -> NativeMetadataStateSummary {
833        NativeMetadataStateSummary {
834            protocol_version: metadata.protocol_version.clone(),
835            generated_at_unix_ms: metadata.generated_at_unix_ms,
836            last_loaded_from: metadata.last_loaded_from.clone(),
837            last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
838        }
839    }
840
841    pub(crate) fn inspect_native_header_against_metadata(
842        native: PhysicalFileHeader,
843        metadata: &PhysicalMetadataFile,
844    ) -> NativeHeaderInspection {
845        let expected = Self::native_header_from_metadata(metadata);
846        let mut mismatches = Vec::new();
847
848        if native.format_version != expected.format_version {
849            mismatches.push(NativeHeaderMismatch {
850                field: "format_version",
851                native: native.format_version.to_string(),
852                expected: expected.format_version.to_string(),
853            });
854        }
855        if native.sequence != expected.sequence {
856            mismatches.push(NativeHeaderMismatch {
857                field: "sequence",
858                native: native.sequence.to_string(),
859                expected: expected.sequence.to_string(),
860            });
861        }
862        if native.manifest_oldest_root != expected.manifest_oldest_root {
863            mismatches.push(NativeHeaderMismatch {
864                field: "manifest_oldest_root",
865                native: native.manifest_oldest_root.to_string(),
866                expected: expected.manifest_oldest_root.to_string(),
867            });
868        }
869        if native.manifest_root != expected.manifest_root {
870            mismatches.push(NativeHeaderMismatch {
871                field: "manifest_root",
872                native: native.manifest_root.to_string(),
873                expected: expected.manifest_root.to_string(),
874            });
875        }
876        if native.free_set_root != expected.free_set_root {
877            mismatches.push(NativeHeaderMismatch {
878                field: "free_set_root",
879                native: native.free_set_root.to_string(),
880                expected: expected.free_set_root.to_string(),
881            });
882        }
883        if native.collection_root_count != expected.collection_root_count {
884            mismatches.push(NativeHeaderMismatch {
885                field: "collection_root_count",
886                native: native.collection_root_count.to_string(),
887                expected: expected.collection_root_count.to_string(),
888            });
889        }
890        if native.snapshot_count != expected.snapshot_count {
891            mismatches.push(NativeHeaderMismatch {
892                field: "snapshot_count",
893                native: native.snapshot_count.to_string(),
894                expected: expected.snapshot_count.to_string(),
895            });
896        }
897        if native.index_count != expected.index_count {
898            mismatches.push(NativeHeaderMismatch {
899                field: "index_count",
900                native: native.index_count.to_string(),
901                expected: expected.index_count.to_string(),
902            });
903        }
904        if native.catalog_collection_count != expected.catalog_collection_count {
905            mismatches.push(NativeHeaderMismatch {
906                field: "catalog_collection_count",
907                native: native.catalog_collection_count.to_string(),
908                expected: expected.catalog_collection_count.to_string(),
909            });
910        }
911        if native.catalog_total_entities != expected.catalog_total_entities {
912            mismatches.push(NativeHeaderMismatch {
913                field: "catalog_total_entities",
914                native: native.catalog_total_entities.to_string(),
915                expected: expected.catalog_total_entities.to_string(),
916            });
917        }
918        if native.export_count != expected.export_count {
919            mismatches.push(NativeHeaderMismatch {
920                field: "export_count",
921                native: native.export_count.to_string(),
922                expected: expected.export_count.to_string(),
923            });
924        }
925        if native.graph_projection_count != expected.graph_projection_count {
926            mismatches.push(NativeHeaderMismatch {
927                field: "graph_projection_count",
928                native: native.graph_projection_count.to_string(),
929                expected: expected.graph_projection_count.to_string(),
930            });
931        }
932        if native.analytics_job_count != expected.analytics_job_count {
933            mismatches.push(NativeHeaderMismatch {
934                field: "analytics_job_count",
935                native: native.analytics_job_count.to_string(),
936                expected: expected.analytics_job_count.to_string(),
937            });
938        }
939        if native.manifest_event_count != expected.manifest_event_count {
940            mismatches.push(NativeHeaderMismatch {
941                field: "manifest_event_count",
942                native: native.manifest_event_count.to_string(),
943                expected: expected.manifest_event_count.to_string(),
944            });
945        }
946
947        NativeHeaderInspection {
948            native,
949            expected,
950            consistent: mismatches.is_empty(),
951            mismatches,
952        }
953    }
954
955    pub(crate) fn repair_policy_for_inspection(
956        inspection: &NativeHeaderInspection,
957    ) -> NativeHeaderRepairPolicy {
958        if inspection.consistent {
959            return NativeHeaderRepairPolicy::InSync;
960        }
961
962        if inspection.expected.sequence >= inspection.native.sequence {
963            NativeHeaderRepairPolicy::RepairNativeFromMetadata
964        } else {
965            NativeHeaderRepairPolicy::NativeAheadOfMetadata
966        }
967    }
968
969    pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
970        let retention = self.options.export_retention.max(1);
971        if exports.len() <= retention {
972            return;
973        }
974
975        exports.sort_by_key(|export| export.created_at_unix_ms);
976        let removed: Vec<ExportDescriptor> =
977            exports.drain(0..(exports.len() - retention)).collect();
978
979        for export in removed {
980            let _ = fs::remove_file(&export.data_path);
981            let _ = fs::remove_file(&export.metadata_path);
982            let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
983                &export.data_path,
984            ));
985            let _ = fs::remove_file(binary_path);
986        }
987    }
988
989    pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
990        let mut catalog = IndexCatalog::register_default_vector_graph(
991            self.options.has_capability(Capability::Table),
992            self.options.has_capability(Capability::Graph),
993        );
994        if self.options.has_capability(Capability::FullText) {
995            catalog.register(RuntimeIndexConfig::new(
996                "text-fulltext",
997                IndexKind::FullText,
998            ));
999            catalog.register(RuntimeIndexConfig::new(
1000                "document-pathvalue",
1001                IndexKind::DocumentPathValue,
1002            ));
1003        }
1004        catalog.register(RuntimeIndexConfig::new(
1005            "search-hybrid",
1006            IndexKind::HybridSearch,
1007        ));
1008        catalog
1009    }
1010
1011    pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
1012        // Use a lightweight catalog snapshot that does NOT call physical_metadata()
1013        // to avoid infinite recursion: physical_metadata → metadata_from_native_state
1014        // → physical_index_state → catalog_model_snapshot → physical_metadata → ...
1015        let catalog = self.runtime_index_catalog();
1016        let snapshot = crate::catalog::snapshot_store_with_declarations(
1017            "reddb",
1018            self.store.as_ref(),
1019            Some(&catalog),
1020            None, // No declarations — breaks the recursive cycle
1021            None, // No contracts — avoids pulling physical metadata during bootstrap
1022        );
1023        let mut metrics_by_name = std::collections::BTreeMap::new();
1024        for metric in &snapshot.indices {
1025            metrics_by_name.insert(metric.name.clone(), metric.clone());
1026        }
1027
1028        let mut states = Vec::new();
1029        for collection in snapshot.collections {
1030            for index_name in &collection.indices {
1031                let metric = metrics_by_name.get(index_name);
1032                let kind = metric
1033                    .map(|metric| metric.kind)
1034                    .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
1035                let entries = estimate_index_entries(&collection, kind);
1036                states.push(PhysicalIndexState {
1037                    name: format!("{}::{}", collection.name, index_name),
1038                    kind,
1039                    collection: Some(collection.name.clone()),
1040                    enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
1041                    entries,
1042                    estimated_memory_bytes: estimate_index_memory(entries, kind),
1043                    last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
1044                    backend: index_backend_name(kind).to_string(),
1045                    artifact_kind: None,
1046                    artifact_root_page: None,
1047                    artifact_checksum: None,
1048                    build_state: "catalog-derived".to_string(),
1049                });
1050            }
1051        }
1052
1053        states
1054    }
1055
1056    pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
1057        let mut roots = BTreeMap::new();
1058
1059        for name in self.store.list_collections() {
1060            let Some(manager) = self.store.get_collection(&name) else {
1061                continue;
1062            };
1063
1064            let stats = manager.stats();
1065            let mut root = fnv1a_seed();
1066            fnv1a_hash_value(&mut root, &name);
1067            fnv1a_hash_value(&mut root, &stats.total_entities);
1068            fnv1a_hash_value(&mut root, &stats.growing_count);
1069            fnv1a_hash_value(&mut root, &stats.sealed_count);
1070            fnv1a_hash_value(&mut root, &stats.archived_count);
1071            fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
1072            fnv1a_hash_value(&mut root, &stats.seal_ops);
1073            fnv1a_hash_value(&mut root, &stats.compact_ops);
1074
1075            let mut entities = manager.query_all(|_| true);
1076            entities.sort_by_key(|entity| entity.id.raw());
1077
1078            for entity in entities {
1079                fnv1a_hash_value(&mut root, &entity.id.raw());
1080                fnv1a_hash_value(&mut root, &entity.kind);
1081                fnv1a_hash_value(&mut root, &entity.created_at);
1082                fnv1a_hash_value(&mut root, &entity.updated_at);
1083                fnv1a_hash_value(&mut root, &entity.data);
1084                fnv1a_hash_value(&mut root, &entity.sequence_id);
1085                fnv1a_hash_value(&mut root, &entity.embeddings().len());
1086                fnv1a_hash_value(&mut root, &entity.cross_refs().len());
1087            }
1088
1089            roots.insert(name, root);
1090        }
1091
1092        roots
1093    }
1094
1095    // ========================================================================
1096    // Reference Helpers - For Metadata Linking
1097    // ========================================================================
1098
1099    /// Create a reference to a table row
1100    pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
1101        TableRef::new(table, row_id)
1102    }
1103
1104    /// Create a reference to a graph node
1105    pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
1106        NodeRef::new(collection, node_id)
1107    }
1108
1109    /// Create a reference to a vector
1110    pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
1111        VectorRef::new(collection, vector_id)
1112    }
1113
1114    // ========================================================================
1115    // Query API
1116    // ========================================================================
1117
1118    /// Start building a query
1119    pub fn query(&self) -> QueryBuilder {
1120        QueryBuilder::new(self.store.clone())
1121    }
1122
1123    /// Quick vector similarity search.
1124    ///
1125    /// For collections with >= 100 vectors a lazily-built HNSW index is used
1126    /// for fast approximate nearest-neighbor lookup.  Smaller collections fall
1127    /// back to an exact brute-force scan so that the overhead of building an
1128    /// index is avoided when it would not pay off.
1129    pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1130        if self.store.get_collection(collection).is_none() {
1131            return Vec::new();
1132        }
1133
1134        // Try the HNSW fast path for collections with enough vectors.
1135        if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1136            let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1137            let results = hnsw.search(vector, k);
1138            let mapped = self.hnsw_results_to_similar(collection, &results);
1139            if !mapped.is_empty() {
1140                return mapped;
1141            }
1142        }
1143
1144        // Fallback: brute-force scan (small / mixed-type collections).
1145        self.similar_brute_force(collection, vector, k)
1146    }
1147
1148    /// Brute-force cosine similarity scan (exact results, O(n)).
1149    fn similar_brute_force(
1150        &self,
1151        collection: &str,
1152        vector: &[f32],
1153        k: usize,
1154    ) -> Vec<SimilarResult> {
1155        let manager = match self.store.get_collection(collection) {
1156            Some(m) => m,
1157            None => return Vec::new(),
1158        };
1159
1160        let entities = manager.query_all(|_| true);
1161        let mut results: Vec<SimilarResult> = entities
1162            .iter()
1163            .filter_map(|e| {
1164                let score = match &e.data {
1165                    EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1166                    _ => e
1167                        .embeddings()
1168                        .iter()
1169                        .map(|emb| cosine_similarity(vector, &emb.vector))
1170                        .fold(0.0f32, f32::max),
1171                };
1172                let distance = (1.0 - score).max(0.0);
1173                if score > 0.0 {
1174                    Some(SimilarResult {
1175                        entity_id: e.id,
1176                        score,
1177                        distance,
1178                        entity: e.clone(),
1179                    })
1180                } else {
1181                    None
1182                }
1183            })
1184            .collect();
1185
1186        results.sort_by(|a, b| {
1187            b.score
1188                .partial_cmp(&a.score)
1189                .unwrap_or(std::cmp::Ordering::Equal)
1190        });
1191        results.truncate(k);
1192        results
1193    }
1194
1195    /// Return (or lazily build) a per-collection HNSW index.
1196    ///
1197    /// Returns `None` when the collection has fewer than 100 dense vectors
1198    /// (the brute-force path is cheaper for tiny collections) or when there
1199    /// is a dimension mismatch with the query vector.
1200    ///
1201    /// The cached index is automatically invalidated when the live entity
1202    /// count in the collection changes, so inserts and deletes are picked up
1203    /// transparently without requiring explicit invalidation calls.
1204    fn get_or_build_hnsw_index(
1205        &self,
1206        collection: &str,
1207        query_dim: usize,
1208    ) -> Option<Arc<RwLock<HnswIndex>>> {
1209        let manager = self.store.get_collection(collection)?;
1210        let live_count = manager.count();
1211
1212        // Fast path: check if a fresh index already exists.
1213        {
1214            let indexes = self
1215                .vector_indexes
1216                .read()
1217                .unwrap_or_else(|e| e.into_inner());
1218            if let Some(cached) = indexes.get(collection) {
1219                if cached.entity_count == live_count {
1220                    return Some(Arc::clone(&cached.index));
1221                }
1222            }
1223        }
1224
1225        // Either no cached index exists or it is stale -- (re)build it.
1226        let entities = manager.query_all(|_| true);
1227
1228        let vectors: Vec<(u64, Vec<f32>)> = entities
1229            .iter()
1230            .filter_map(|e| match &e.data {
1231                EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1232                    Some((e.id.raw(), v.dense.clone()))
1233                }
1234                _ => None,
1235            })
1236            .collect();
1237
1238        // Only build the HNSW index when there are enough vectors to justify it.
1239        const MIN_VECTORS_FOR_HNSW: usize = 100;
1240        if vectors.len() < MIN_VECTORS_FOR_HNSW {
1241            return None;
1242        }
1243
1244        // Build the HNSW index with cosine distance (matching the brute-force path).
1245        let config = crate::storage::engine::HnswConfig::with_m(16)
1246            .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1247            .with_ef_construction(100)
1248            .with_ef_search(50);
1249        let mut hnsw = HnswIndex::new(query_dim, config);
1250
1251        for (id, vec) in &vectors {
1252            hnsw.insert_with_id(*id, vec.clone());
1253        }
1254
1255        let index = Arc::new(RwLock::new(hnsw));
1256
1257        // Store in the cache (double-check pattern to avoid duplicate builds).
1258        let mut indexes = self
1259            .vector_indexes
1260            .write()
1261            .unwrap_or_else(|e| e.into_inner());
1262        // Re-check under write lock: another thread may have built in the meantime.
1263        if let Some(cached) = indexes.get(collection) {
1264            if cached.entity_count == live_count {
1265                return Some(Arc::clone(&cached.index));
1266            }
1267        }
1268        indexes.insert(
1269            collection.to_string(),
1270            CachedVectorIndex {
1271                index: Arc::clone(&index),
1272                entity_count: live_count,
1273            },
1274        );
1275        Some(index)
1276    }
1277
1278    /// Convert HNSW `DistanceResult`s back to the DevX `SimilarResult` type.
1279    fn hnsw_results_to_similar(
1280        &self,
1281        collection: &str,
1282        results: &[crate::storage::engine::DistanceResult],
1283    ) -> Vec<SimilarResult> {
1284        results
1285            .iter()
1286            .filter_map(|dr| {
1287                let entity_id = EntityId::new(dr.id);
1288                let entity = self.store.get(collection, entity_id)?;
1289                // Cosine distance = 1 - similarity.
1290                let score = (1.0 - dr.distance).max(0.0);
1291                if score > 0.0 {
1292                    Some(SimilarResult {
1293                        entity_id,
1294                        score,
1295                        distance: dr.distance,
1296                        entity,
1297                    })
1298                } else {
1299                    None
1300                }
1301            })
1302            .collect()
1303    }
1304
1305    /// Invalidate the cached HNSW index for a collection.
1306    ///
1307    /// Called after vector inserts / deletes so the next search lazily rebuilds
1308    /// a fresh index that includes the new data.
1309    pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1310        let mut indexes = self
1311            .vector_indexes
1312            .write()
1313            .unwrap_or_else(|e| e.into_inner());
1314        indexes.remove(collection);
1315    }
1316
1317    /// Get entity by ID from any collection
1318    pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1319        self.store.get_any(id).map(|(_, e)| e)
1320    }
1321
1322    /// Get entity with its collection name
1323    pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1324        self.store.get_any(id)
1325    }
1326
1327    // ========================================================================
1328    // Batch Operations - Performance
1329    // ========================================================================
1330
1331    /// Batch get multiple entities by ID. More efficient than N individual get() calls.
1332    pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1333        ids.iter().map(|id| self.get(*id)).collect()
1334    }
1335
1336    /// Start a batch operation for bulk inserts
1337    pub fn batch(&self) -> BatchBuilder {
1338        BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1339    }
1340
1341    // ========================================================================
1342    // Preprocessing
1343    // ========================================================================
1344
1345    /// Add a preprocessor hook
1346    pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1347        let mut preprocessors = self
1348            .preprocessors
1349            .write()
1350            .unwrap_or_else(|poisoned| poisoned.into_inner());
1351        preprocessors.push(Arc::from(preprocessor));
1352    }
1353
1354    // ========================================================================
1355    // Cross-Reference Navigation
1356    // ========================================================================
1357
1358    /// Get all entities linked FROM the given entity
1359    pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1360        self.store
1361            .get_refs_from(id)
1362            .into_iter()
1363            .filter_map(|(target_id, ref_type, collection)| {
1364                self.store
1365                    .get(&collection, target_id)
1366                    .map(|entity| LinkedEntity {
1367                        entity,
1368                        ref_type,
1369                        collection,
1370                    })
1371            })
1372            .collect()
1373    }
1374
1375    /// Get all entities linked TO the given entity
1376    pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1377        self.store
1378            .get_refs_to(id)
1379            .into_iter()
1380            .filter_map(|(source_id, ref_type, collection)| {
1381                self.store
1382                    .get(&collection, source_id)
1383                    .map(|entity| LinkedEntity {
1384                        entity,
1385                        ref_type,
1386                        collection,
1387                    })
1388            })
1389            .collect()
1390    }
1391
1392    /// Get the underlying store (for advanced operations)
1393    pub fn store(&self) -> Arc<UnifiedStore> {
1394        self.store.clone()
1395    }
1396
1397    /// Lazily-initialised ML runtime. First caller wins; subsequent
1398    /// callers observe the same instance. The runtime is created
1399    /// with an in-memory persistence backend by default — production
1400    /// deployments that need durable model state will swap this for
1401    /// a store-backed backend when the persistence integration lands.
1402    pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1403        self.ml_runtime.get_or_init(|| {
1404            crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1405                // No-op work fn — the SQL scalars call predict
1406                // synchronously; async training jobs come in a later
1407                // sprint alongside `CREATE MODEL`.
1408                |_handle| Ok(String::new()),
1409            ))
1410        })
1411    }
1412
1413    /// Shared semantic cache for `SEMANTIC_CACHE_*` scalars. Uses
1414    /// default config until a `SET SEMANTIC_CACHE` admin surface
1415    /// lands.
1416    pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1417        self.semantic_cache.get_or_init(|| {
1418            Arc::new(crate::storage::ml::SemanticCache::new(
1419                crate::storage::ml::SemanticCacheConfig::default(),
1420            ))
1421        })
1422    }
1423
1424    /// Lazily-initialised hypertable registry. Populated by `CREATE
1425    /// HYPERTABLE` DDL; consumed by chunk routing + retention.
1426    pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1427        self.hypertables
1428            .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1429    }
1430
1431    /// Lazily-initialised continuous-aggregate engine. Populated by
1432    /// `CA_REGISTER` and read by `CA_REFRESH` / `CA_STATE` scalars.
1433    pub fn continuous_aggregates(
1434        &self,
1435    ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1436        self.continuous_aggregates.get_or_init(|| {
1437            Arc::new(
1438                crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1439            )
1440        })
1441    }
1442
1443    pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1444        let mut file = File::open(path)?;
1445        let mut magic = [0u8; 4];
1446        let read = file.read(&mut magic)?;
1447        Ok(read == 4 && &magic == b"RDST")
1448    }
1449}