Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_access.rs

1use super::*;
2
3impl RedDB {
4    /// Access the quorum coordinator (Phase 2.6 PG parity).
5    ///
6    /// `None` when this instance is not a primary. Write paths call
7    /// `quorum.wait_for_quorum(lsn)` after the primary WAL commit to
8    /// block until the configured replica quorum has acked; `Async`
9    /// mode (default) returns instantly so no behavioural change.
10    pub fn quorum_coordinator(
11        &self,
12    ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13        self.quorum.as_ref()
14    }
15
16    /// Wait for the configured replica quorum on a given LSN.
17    ///
18    /// Convenience wrapper that returns `Ok(())` when:
19    /// * there is no quorum coordinator (non-primary instance), OR
20    /// * the coordinator is in `Async` mode (historical behaviour).
21    ///
22    /// Callers in the write path (INSERT/UPDATE/DELETE dispatch) invoke
23    /// this after logging the commit record. Returning `Err` signals
24    /// that replication did NOT reach quorum — the primary still has
25    /// the record but the client ack should be deferred or rejected.
26    pub fn wait_for_replication_quorum(
27        &self,
28        target_lsn: u64,
29    ) -> Result<(), crate::replication::QuorumError> {
30        match &self.quorum {
31            Some(q) => q.wait_for_quorum(target_lsn),
32            None => Ok(()),
33        }
34    }
35}
36
37impl RedDB {
38    pub(crate) fn native_registry_summary_from_metadata(
39        &self,
40        metadata: &PhysicalMetadataFile,
41    ) -> NativeRegistrySummary {
42        const SAMPLE_LIMIT: usize = 32;
43
44        let collection_names: Vec<_> = metadata
45            .catalog
46            .stats_by_collection
47            .keys()
48            .take(SAMPLE_LIMIT)
49            .cloned()
50            .collect();
51        let indexes: Vec<_> = metadata
52            .indexes
53            .iter()
54            .take(SAMPLE_LIMIT)
55            .map(|index| NativeRegistryIndexSummary {
56                name: index.name.clone(),
57                kind: index.kind.as_str().to_string(),
58                collection: index.collection.clone(),
59                enabled: index.enabled,
60                entries: index.entries as u64,
61                estimated_memory_bytes: index.estimated_memory_bytes,
62                last_refresh_ms: index.last_refresh_ms,
63                backend: index.backend.clone(),
64            })
65            .collect();
66        let graph_projections: Vec<_> = metadata
67            .graph_projections
68            .iter()
69            .take(SAMPLE_LIMIT)
70            .map(|projection| NativeRegistryProjectionSummary {
71                name: projection.name.clone(),
72                source: projection.source.clone(),
73                created_at_unix_ms: projection.created_at_unix_ms,
74                updated_at_unix_ms: projection.updated_at_unix_ms,
75                node_labels: projection.node_labels.clone(),
76                node_types: projection.node_types.clone(),
77                edge_labels: projection.edge_labels.clone(),
78                last_materialized_sequence: projection.last_materialized_sequence,
79            })
80            .collect();
81        let analytics_jobs = metadata
82            .analytics_jobs
83            .iter()
84            .take(SAMPLE_LIMIT)
85            .map(|job| NativeRegistryJobSummary {
86                id: job.id.clone(),
87                kind: job.kind.clone(),
88                projection: job.projection.clone(),
89                state: job.state.clone(),
90                created_at_unix_ms: job.created_at_unix_ms,
91                updated_at_unix_ms: job.updated_at_unix_ms,
92                last_run_sequence: job.last_run_sequence,
93                metadata: job.metadata.clone(),
94            })
95            .collect::<Vec<_>>();
96        let vector_artifacts = self
97            .native_vector_artifact_records()
98            .into_iter()
99            .map(|(summary, _)| summary)
100            .take(SAMPLE_LIMIT)
101            .collect::<Vec<_>>();
102        let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104        NativeRegistrySummary {
105            collection_count: metadata.catalog.total_collections as u32,
106            index_count: metadata.indexes.len() as u32,
107            graph_projection_count: metadata.graph_projections.len() as u32,
108            analytics_job_count: metadata.analytics_jobs.len() as u32,
109            vector_artifact_count,
110            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111            indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112            graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113            analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114            vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115            omitted_collection_count: metadata
116                .catalog
117                .stats_by_collection
118                .len()
119                .saturating_sub(collection_names.len())
120                as u32,
121            omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122            omitted_graph_projection_count: metadata
123                .graph_projections
124                .len()
125                .saturating_sub(graph_projections.len())
126                as u32,
127            omitted_analytics_job_count: metadata
128                .analytics_jobs
129                .len()
130                .saturating_sub(analytics_jobs.len())
131                as u32,
132            omitted_vector_artifact_count: vector_artifact_count
133                .saturating_sub(vector_artifacts.len() as u32),
134            collection_names,
135            indexes,
136            graph_projections,
137            analytics_jobs,
138            vector_artifacts,
139        }
140    }
141
142    fn native_vector_artifact_collection_count(&self) -> usize {
143        self.native_vector_artifact_records().len()
144    }
145
146    pub(crate) fn native_vector_artifact_records(
147        &self,
148    ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149        let mut artifacts = Vec::new();
150        for collection in self.store.list_collections() {
151            let Some(manager) = self.store.get_collection(&collection) else {
152                continue;
153            };
154            let entities = manager.query_all(|_| true);
155            let mut vectors = Vec::new();
156            let mut graph_edges = Vec::new();
157            let mut fulltext_documents = Vec::new();
158            let mut document_records = Vec::new();
159            for entity in entities {
160                match entity.data {
161                    EntityData::Vector(vector) => {
162                        if !vector.dense.is_empty() {
163                            vectors.push((entity.id, vector.dense));
164                        }
165                    }
166                    EntityData::Edge(edge) => {
167                        if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168                            graph_edges.push((
169                                entity.id,
170                                edge_kind.from_node,
171                                edge_kind.to_node,
172                                edge_kind.label,
173                                edge.weight,
174                            ));
175                        }
176                    }
177                    data => {
178                        let text = Self::native_fulltext_text_for_entity(&data);
179                        if !text.trim().is_empty() {
180                            fulltext_documents.push((entity.id, text));
181                        }
182                        if let Some(document) =
183                            Self::native_document_pathvalue_for_entity(entity.id, &data)
184                        {
185                            document_records.push(document);
186                        }
187                    }
188                }
189            }
190            if !vectors.is_empty() {
191                let dimension = vectors[0].1.len();
192                let mut hnsw = HnswIndex::with_dimension(dimension);
193                for (id, vector) in vectors
194                    .into_iter()
195                    .filter(|(_, vector)| vector.len() == dimension)
196                {
197                    hnsw.insert_with_id(id.raw(), vector);
198                }
199                let stats = hnsw.stats();
200                let bytes = hnsw.to_bytes();
201                let summary = NativeVectorArtifactSummary {
202                    collection: collection.clone(),
203                    artifact_kind: "hnsw".to_string(),
204                    vector_count: stats.node_count as u64,
205                    dimension: stats.dimension as u32,
206                    max_layer: stats.max_layer as u32,
207                    serialized_bytes: bytes.len() as u64,
208                    checksum: crate::storage::engine::crc32(&bytes) as u64,
209                };
210                artifacts.push((summary, bytes));
211
212                let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213                let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214                let training = manager
215                    .query_all(|_| true)
216                    .into_iter()
217                    .filter_map(|entity| match entity.data {
218                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
219                            Some(vector.dense)
220                        }
221                        _ => None,
222                    })
223                    .collect::<Vec<_>>();
224                ivf.train(&training);
225                let items = manager
226                    .query_all(|_| true)
227                    .into_iter()
228                    .filter_map(|entity| match entity.data {
229                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
230                            Some((entity.id.raw(), vector.dense))
231                        }
232                        _ => None,
233                    })
234                    .collect::<Vec<_>>();
235                ivf.add_batch_with_ids(items);
236                let ivf_stats = ivf.stats();
237                let ivf_bytes = ivf.to_bytes();
238                let ivf_summary = NativeVectorArtifactSummary {
239                    collection: collection.clone(),
240                    artifact_kind: "ivf".to_string(),
241                    vector_count: ivf_stats.total_vectors as u64,
242                    dimension: ivf_stats.dimension as u32,
243                    max_layer: ivf_stats.n_lists as u32,
244                    serialized_bytes: ivf_bytes.len() as u64,
245                    checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246                };
247                artifacts.push((ivf_summary, ivf_bytes));
248            }
249
250            if !graph_edges.is_empty() {
251                let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252                let (edge_count, node_count, label_count) =
253                    Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254                let summary = NativeVectorArtifactSummary {
255                    collection: collection.clone(),
256                    artifact_kind: "graph.adjacency".to_string(),
257                    vector_count: edge_count,
258                    dimension: node_count as u32,
259                    max_layer: label_count,
260                    serialized_bytes: bytes.len() as u64,
261                    checksum: crate::storage::engine::crc32(&bytes) as u64,
262                };
263                artifacts.push((summary, bytes));
264            }
265
266            if !fulltext_documents.is_empty() {
267                let bytes =
268                    Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269                let (doc_count, term_count, posting_count) =
270                    Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271                let summary = NativeVectorArtifactSummary {
272                    collection: collection.clone(),
273                    artifact_kind: "text.fulltext".to_string(),
274                    vector_count: posting_count,
275                    dimension: doc_count as u32,
276                    max_layer: term_count as u32,
277                    serialized_bytes: bytes.len() as u64,
278                    checksum: crate::storage::engine::crc32(&bytes) as u64,
279                };
280                artifacts.push((summary, bytes));
281            }
282
283            if !document_records.is_empty() {
284                let bytes = Self::serialize_native_document_pathvalue_artifact(
285                    &collection,
286                    &document_records,
287                );
288                let (doc_count, path_count, value_count, unique_value_count) =
289                    Self::inspect_native_document_pathvalue_artifact(&bytes)
290                        .unwrap_or((0, 0, 0, 0));
291                let _ = unique_value_count;
292                let summary = NativeVectorArtifactSummary {
293                    collection: collection.clone(),
294                    artifact_kind: "document.pathvalue".to_string(),
295                    vector_count: value_count,
296                    dimension: doc_count as u32,
297                    max_layer: path_count as u32,
298                    serialized_bytes: bytes.len() as u64,
299                    checksum: crate::storage::engine::crc32(&bytes) as u64,
300                };
301                artifacts.push((summary, bytes));
302            }
303        }
304        artifacts
305    }
306
307    fn serialize_native_graph_adjacency_artifact(
308        edges: &[(EntityId, String, String, String, f32)],
309    ) -> Vec<u8> {
310        let edges: Vec<reddb_file::GraphAdjacencyEdge> = edges
311            .iter()
312            .map(
313                |(edge_id, from_node, to_node, label, weight)| reddb_file::GraphAdjacencyEdge {
314                    edge_id: edge_id.raw(),
315                    from_node: from_node.clone(),
316                    to_node: to_node.clone(),
317                    label: label.clone(),
318                    weight: *weight,
319                },
320            )
321            .collect();
322        reddb_file::encode_graph_adjacency(&edges)
323    }
324
325    pub(crate) fn inspect_native_graph_adjacency_artifact(
326        bytes: &[u8],
327    ) -> Result<(u64, u64, u32), String> {
328        let edges = reddb_file::decode_graph_adjacency(bytes).map_err(|e| e.to_string())?;
329        let edge_count = edges.len() as u64;
330        let mut nodes = BTreeSet::new();
331        let mut labels = BTreeSet::new();
332        for edge in edges {
333            nodes.insert(edge.from_node);
334            nodes.insert(edge.to_node);
335            labels.insert(edge.label);
336        }
337        Ok((edge_count, nodes.len() as u64, labels.len() as u32))
338    }
339
340    fn serialize_native_fulltext_artifact(
341        collection: &str,
342        documents: &[(EntityId, String)],
343    ) -> Vec<u8> {
344        let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
345        for (entity_id, text) in documents {
346            let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
347            for token in Self::native_fulltext_tokenize(text) {
348                *frequencies.entry(token).or_insert(0) += 1;
349            }
350            for (token, count) in frequencies {
351                postings
352                    .entry(token)
353                    .or_default()
354                    .push((entity_id.raw(), count));
355            }
356        }
357
358        reddb_file::encode_fulltext_index(collection, documents.len(), &postings)
359    }
360
361    pub(crate) fn inspect_native_fulltext_artifact(
362        bytes: &[u8],
363    ) -> Result<(u64, u64, u64), String> {
364        let index = reddb_file::decode_fulltext_index(bytes).map_err(|e| e.to_string())?;
365        let posting_count: u64 = index.postings.values().map(|e| e.len() as u64).sum();
366        Ok((
367            index.doc_count as u64,
368            index.postings.len() as u64,
369            posting_count,
370        ))
371    }
372
373    fn serialize_native_document_pathvalue_artifact(
374        collection: &str,
375        documents: &[(EntityId, Vec<(String, String)>)],
376    ) -> Vec<u8> {
377        let documents: Vec<reddb_file::DocPathValueRecord> = documents
378            .iter()
379            .map(|(entity_id, entries)| reddb_file::DocPathValueRecord {
380                entity_id: entity_id.raw(),
381                entries: entries.clone(),
382            })
383            .collect();
384        reddb_file::encode_document_pathvalue(collection, &documents)
385    }
386
387    pub(crate) fn inspect_native_document_pathvalue_artifact(
388        bytes: &[u8],
389    ) -> Result<(u64, u64, u64, u64), String> {
390        let index = reddb_file::decode_document_pathvalue(bytes).map_err(|e| e.to_string())?;
391        let doc_count = index.documents.len() as u64;
392        let mut paths = BTreeSet::new();
393        let mut values = BTreeSet::new();
394        let mut total_entries = 0u64;
395        for doc in index.documents {
396            for (path, value) in doc.entries {
397                paths.insert(path);
398                values.insert(value);
399                total_entries += 1;
400            }
401        }
402        Ok((
403            doc_count,
404            paths.len() as u64,
405            total_entries,
406            values.len() as u64,
407        ))
408    }
409
410    fn native_document_pathvalue_for_entity(
411        entity_id: EntityId,
412        data: &EntityData,
413    ) -> Option<(EntityId, Vec<(String, String)>)> {
414        let mut entries = Vec::new();
415        match data {
416            EntityData::Row(row) => {
417                if let Some(named) = &row.named {
418                    for (key, value) in named {
419                        Self::collect_native_document_entries_from_value(key, value, &mut entries);
420                    }
421                }
422                for (idx, value) in row.columns.iter().enumerate() {
423                    let path = format!("columns[{idx}]");
424                    Self::collect_native_document_entries_from_value(&path, value, &mut entries);
425                }
426            }
427            EntityData::Node(node) => {
428                for (key, value) in &node.properties {
429                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
430                }
431            }
432            EntityData::Edge(edge) => {
433                for (key, value) in &edge.properties {
434                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
435                }
436            }
437            EntityData::Vector(_) => {}
438            EntityData::TimeSeries(_) => {}
439            EntityData::QueueMessage(_) => {}
440        }
441        if entries.is_empty() {
442            None
443        } else {
444            Some((entity_id, entries))
445        }
446    }
447
448    fn collect_native_document_entries_from_value(
449        path: &str,
450        value: &Value,
451        out: &mut Vec<(String, String)>,
452    ) {
453        match value {
454            Value::Json(bytes) | Value::Blob(bytes) => {
455                if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
456                    Self::collect_native_document_entries_from_json(path, &json, out);
457                }
458            }
459            _ => {}
460        }
461    }
462
463    fn collect_native_document_entries_from_json(
464        path: &str,
465        value: &JsonValue,
466        out: &mut Vec<(String, String)>,
467    ) {
468        match value {
469            JsonValue::Object(entries) => {
470                for (key, value) in entries {
471                    let next = if path.is_empty() {
472                        key.clone()
473                    } else {
474                        format!("{path}.{key}")
475                    };
476                    Self::collect_native_document_entries_from_json(&next, value, out);
477                }
478            }
479            JsonValue::Array(items) => {
480                for (idx, value) in items.iter().enumerate() {
481                    let next = format!("{path}[{idx}]");
482                    Self::collect_native_document_entries_from_json(&next, value, out);
483                }
484            }
485            _ => {
486                if let Some(text) = Self::native_json_scalar_text(value) {
487                    out.push((path.to_string(), text));
488                }
489            }
490        }
491    }
492
493    fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
494        match value {
495            JsonValue::Null => None,
496            JsonValue::Bool(value) => Some(value.to_string()),
497            JsonValue::Number(value) => Some(value.to_string()),
498            JsonValue::String(value) => Some(value.clone()),
499            JsonValue::Array(_) | JsonValue::Object(_) => None,
500        }
501    }
502
503    fn native_fulltext_text_for_entity(data: &EntityData) -> String {
504        match data {
505            EntityData::Row(row) => {
506                let mut parts = Vec::new();
507                if let Some(named) = &row.named {
508                    for value in named.values() {
509                        if let Some(text) = Self::native_value_text(value) {
510                            parts.push(text);
511                        }
512                    }
513                }
514                for value in &row.columns {
515                    if let Some(text) = Self::native_value_text(value) {
516                        parts.push(text);
517                    }
518                }
519                parts.join(" ")
520            }
521            EntityData::Node(node) => node
522                .properties
523                .values()
524                .filter_map(Self::native_value_text)
525                .collect::<Vec<_>>()
526                .join(" "),
527            EntityData::Edge(edge) => edge
528                .properties
529                .values()
530                .filter_map(Self::native_value_text)
531                .collect::<Vec<_>>()
532                .join(" "),
533            EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
534            EntityData::TimeSeries(ts) => ts.metric.clone(),
535            EntityData::QueueMessage(_) => String::new(),
536        }
537    }
538
539    fn native_value_text(value: &Value) -> Option<String> {
540        match value {
541            Value::Text(value) => Some(value.to_string()),
542            Value::Json(value) => String::from_utf8(value.clone()).ok(),
543            Value::Blob(value) => String::from_utf8(value.clone()).ok(),
544            Value::Integer(value) => Some(value.to_string()),
545            Value::UnsignedInteger(value) => Some(value.to_string()),
546            Value::Float(value) => Some(value.to_string()),
547            Value::Boolean(value) => Some(value.to_string()),
548            Value::IpAddr(value) => Some(value.to_string()),
549            Value::NodeRef(value) => Some(value.clone()),
550            Value::EdgeRef(value) => Some(value.clone()),
551            Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
552            Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
553            Value::Timestamp(value) => Some(value.to_string()),
554            Value::Duration(value) => Some(value.to_string()),
555            Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
556            Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
557            Value::Email(s) => Some(s.clone()),
558            Value::Url(s) => Some(s.clone()),
559            Value::Phone(n) => Some(format!("+{}", n)),
560            Value::Semver(packed) => Some(format!(
561                "{}.{}.{}",
562                packed / 1_000_000,
563                (packed / 1_000) % 1_000,
564                packed % 1_000
565            )),
566            Value::Cidr(ip, prefix) => Some(format!(
567                "{}.{}.{}.{}/{}",
568                (ip >> 24) & 0xFF,
569                (ip >> 16) & 0xFF,
570                (ip >> 8) & 0xFF,
571                ip & 0xFF,
572                prefix
573            )),
574            Value::Date(days) => Some(days.to_string()),
575            Value::Time(ms) => {
576                let total_secs = ms / 1000;
577                Some(format!(
578                    "{:02}:{:02}:{:02}",
579                    total_secs / 3600,
580                    (total_secs / 60) % 60,
581                    total_secs % 60
582                ))
583            }
584            Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
585            Value::EnumValue(i) => Some(format!("enum({})", i)),
586            Value::Array(_) => None,
587            Value::TimestampMs(ms) => Some(ms.to_string()),
588            Value::Ipv4(ip) => Some(format!(
589                "{}.{}.{}.{}",
590                (ip >> 24) & 0xFF,
591                (ip >> 16) & 0xFF,
592                (ip >> 8) & 0xFF,
593                ip & 0xFF
594            )),
595            Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
596            Value::Subnet(ip, mask) => {
597                let prefix = mask.leading_ones();
598                Some(format!(
599                    "{}.{}.{}.{}/{}",
600                    (ip >> 24) & 0xFF,
601                    (ip >> 16) & 0xFF,
602                    (ip >> 8) & 0xFF,
603                    ip & 0xFF,
604                    prefix
605                ))
606            }
607            Value::Port(p) => Some(p.to_string()),
608            Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
609            Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
610            Value::GeoPoint(lat, lon) => Some(format!(
611                "{:.6},{:.6}",
612                *lat as f64 / 1_000_000.0,
613                *lon as f64 / 1_000_000.0
614            )),
615            Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
616            Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
617            Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
618            Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
619            Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
620            Value::AssetCode(code) => Some(code.clone()),
621            Value::Money { .. } => Some(value.display_string()),
622            Value::ColorAlpha([r, g, b, a]) => {
623                Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
624            }
625            Value::BigInt(v) => Some(v.to_string()),
626            Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
627            Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
628            Value::TableRef(name) => Some(name.clone()),
629            Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
630            Value::Secret(_) | Value::Password(_) => None,
631        }
632    }
633
634    fn native_fulltext_tokenize(text: &str) -> Vec<String> {
635        text.to_lowercase()
636            .split(|c: char| !c.is_alphanumeric())
637            .filter(|s| s.len() >= 2)
638            .map(|s| s.to_string())
639            .collect()
640    }
641
642    pub(crate) fn native_recovery_summary_from_metadata(
643        metadata: &PhysicalMetadataFile,
644    ) -> NativeRecoverySummary {
645        const SAMPLE_LIMIT: usize = 16;
646
647        let snapshots: Vec<_> = metadata
648            .snapshots
649            .iter()
650            .rev()
651            .take(SAMPLE_LIMIT)
652            .map(|snapshot| NativeSnapshotSummary {
653                snapshot_id: snapshot.snapshot_id,
654                created_at_unix_ms: snapshot.created_at_unix_ms,
655                superblock_sequence: snapshot.superblock_sequence,
656                collection_count: snapshot.collection_count as u32,
657                total_entities: snapshot.total_entities as u64,
658            })
659            .collect();
660        let exports: Vec<_> = metadata
661            .exports
662            .iter()
663            .rev()
664            .take(SAMPLE_LIMIT)
665            .map(|export| NativeExportSummary {
666                name: export.name.clone(),
667                created_at_unix_ms: export.created_at_unix_ms,
668                snapshot_id: export.snapshot_id,
669                superblock_sequence: export.superblock_sequence,
670                collection_count: export.collection_count as u32,
671                total_entities: export.total_entities as u64,
672            })
673            .collect();
674
675        NativeRecoverySummary {
676            snapshot_count: metadata.snapshots.len() as u32,
677            export_count: metadata.exports.len() as u32,
678            snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
679            exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
680            omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
681            omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
682            snapshots,
683            exports,
684        }
685    }
686
687    pub(crate) fn native_catalog_summary_from_metadata(
688        metadata: &PhysicalMetadataFile,
689    ) -> NativeCatalogSummary {
690        const SAMPLE_LIMIT: usize = 32;
691
692        let collections: Vec<_> = metadata
693            .catalog
694            .stats_by_collection
695            .iter()
696            .take(SAMPLE_LIMIT)
697            .map(|(name, stats)| NativeCatalogCollectionSummary {
698                name: name.clone(),
699                entities: stats.entities as u64,
700                cross_refs: stats.cross_refs as u64,
701                segments: stats.segments as u32,
702            })
703            .collect();
704
705        NativeCatalogSummary {
706            collection_count: metadata.catalog.total_collections as u32,
707            total_entities: metadata.catalog.total_entities as u64,
708            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
709            omitted_collection_count: metadata
710                .catalog
711                .stats_by_collection
712                .len()
713                .saturating_sub(collections.len()) as u32,
714            collections,
715        }
716    }
717
718    pub(crate) fn native_metadata_state_summary_from_metadata(
719        metadata: &PhysicalMetadataFile,
720    ) -> NativeMetadataStateSummary {
721        NativeMetadataStateSummary {
722            protocol_version: metadata.protocol_version.clone(),
723            generated_at_unix_ms: metadata.generated_at_unix_ms,
724            last_loaded_from: metadata.last_loaded_from.clone(),
725            last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
726        }
727    }
728
729    pub(crate) fn inspect_native_header_against_metadata(
730        native: PhysicalFileHeader,
731        metadata: &PhysicalMetadataFile,
732    ) -> NativeHeaderInspection {
733        let expected = Self::native_header_from_metadata(metadata);
734        let mut mismatches = Vec::new();
735
736        if native.format_version != expected.format_version {
737            mismatches.push(NativeHeaderMismatch {
738                field: "format_version",
739                native: native.format_version.to_string(),
740                expected: expected.format_version.to_string(),
741            });
742        }
743        if native.sequence != expected.sequence {
744            mismatches.push(NativeHeaderMismatch {
745                field: "sequence",
746                native: native.sequence.to_string(),
747                expected: expected.sequence.to_string(),
748            });
749        }
750        if native.manifest_oldest_root != expected.manifest_oldest_root {
751            mismatches.push(NativeHeaderMismatch {
752                field: "manifest_oldest_root",
753                native: native.manifest_oldest_root.to_string(),
754                expected: expected.manifest_oldest_root.to_string(),
755            });
756        }
757        if native.manifest_root != expected.manifest_root {
758            mismatches.push(NativeHeaderMismatch {
759                field: "manifest_root",
760                native: native.manifest_root.to_string(),
761                expected: expected.manifest_root.to_string(),
762            });
763        }
764        if native.free_set_root != expected.free_set_root {
765            mismatches.push(NativeHeaderMismatch {
766                field: "free_set_root",
767                native: native.free_set_root.to_string(),
768                expected: expected.free_set_root.to_string(),
769            });
770        }
771        if native.collection_root_count != expected.collection_root_count {
772            mismatches.push(NativeHeaderMismatch {
773                field: "collection_root_count",
774                native: native.collection_root_count.to_string(),
775                expected: expected.collection_root_count.to_string(),
776            });
777        }
778        if native.snapshot_count != expected.snapshot_count {
779            mismatches.push(NativeHeaderMismatch {
780                field: "snapshot_count",
781                native: native.snapshot_count.to_string(),
782                expected: expected.snapshot_count.to_string(),
783            });
784        }
785        if native.index_count != expected.index_count {
786            mismatches.push(NativeHeaderMismatch {
787                field: "index_count",
788                native: native.index_count.to_string(),
789                expected: expected.index_count.to_string(),
790            });
791        }
792        if native.catalog_collection_count != expected.catalog_collection_count {
793            mismatches.push(NativeHeaderMismatch {
794                field: "catalog_collection_count",
795                native: native.catalog_collection_count.to_string(),
796                expected: expected.catalog_collection_count.to_string(),
797            });
798        }
799        if native.catalog_total_entities != expected.catalog_total_entities {
800            mismatches.push(NativeHeaderMismatch {
801                field: "catalog_total_entities",
802                native: native.catalog_total_entities.to_string(),
803                expected: expected.catalog_total_entities.to_string(),
804            });
805        }
806        if native.export_count != expected.export_count {
807            mismatches.push(NativeHeaderMismatch {
808                field: "export_count",
809                native: native.export_count.to_string(),
810                expected: expected.export_count.to_string(),
811            });
812        }
813        if native.graph_projection_count != expected.graph_projection_count {
814            mismatches.push(NativeHeaderMismatch {
815                field: "graph_projection_count",
816                native: native.graph_projection_count.to_string(),
817                expected: expected.graph_projection_count.to_string(),
818            });
819        }
820        if native.analytics_job_count != expected.analytics_job_count {
821            mismatches.push(NativeHeaderMismatch {
822                field: "analytics_job_count",
823                native: native.analytics_job_count.to_string(),
824                expected: expected.analytics_job_count.to_string(),
825            });
826        }
827        if native.manifest_event_count != expected.manifest_event_count {
828            mismatches.push(NativeHeaderMismatch {
829                field: "manifest_event_count",
830                native: native.manifest_event_count.to_string(),
831                expected: expected.manifest_event_count.to_string(),
832            });
833        }
834
835        NativeHeaderInspection {
836            native,
837            expected,
838            consistent: mismatches.is_empty(),
839            mismatches,
840        }
841    }
842
843    pub(crate) fn repair_policy_for_inspection(
844        inspection: &NativeHeaderInspection,
845    ) -> NativeHeaderRepairPolicy {
846        if inspection.consistent {
847            return NativeHeaderRepairPolicy::InSync;
848        }
849
850        if inspection.expected.sequence >= inspection.native.sequence {
851            NativeHeaderRepairPolicy::RepairNativeFromMetadata
852        } else {
853            NativeHeaderRepairPolicy::NativeAheadOfMetadata
854        }
855    }
856
857    pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
858        let retention = self.options.export_retention.max(1);
859        if exports.len() <= retention {
860            return;
861        }
862
863        exports.sort_by_key(|export| export.created_at_unix_ms);
864        let removed: Vec<ExportDescriptor> =
865            exports.drain(0..(exports.len() - retention)).collect();
866
867        for export in removed {
868            let _ = fs::remove_file(&export.data_path);
869            let _ = fs::remove_file(&export.metadata_path);
870            let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
871                &export.data_path,
872            ));
873            let _ = fs::remove_file(binary_path);
874        }
875    }
876
877    pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
878        let mut catalog = IndexCatalog::register_default_vector_graph(
879            self.options.has_capability(Capability::Table),
880            self.options.has_capability(Capability::Graph),
881        );
882        if self.options.has_capability(Capability::FullText) {
883            catalog.register(RuntimeIndexConfig::new(
884                "text-fulltext",
885                IndexKind::FullText,
886            ));
887            catalog.register(RuntimeIndexConfig::new(
888                "document-pathvalue",
889                IndexKind::DocumentPathValue,
890            ));
891        }
892        catalog.register(RuntimeIndexConfig::new(
893            "search-hybrid",
894            IndexKind::HybridSearch,
895        ));
896        catalog
897    }
898
899    pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
900        // Use a lightweight catalog snapshot that does NOT call physical_metadata()
901        // to avoid infinite recursion: physical_metadata → metadata_from_native_state
902        // → physical_index_state → catalog_model_snapshot → physical_metadata → ...
903        let catalog = self.runtime_index_catalog();
904        let snapshot = crate::catalog::snapshot_store_with_declarations(
905            "reddb",
906            self.store.as_ref(),
907            Some(&catalog),
908            None, // No declarations — breaks the recursive cycle
909            None, // No contracts — avoids pulling physical metadata during bootstrap
910        );
911        let mut metrics_by_name = std::collections::BTreeMap::new();
912        for metric in &snapshot.indices {
913            metrics_by_name.insert(metric.name.clone(), metric.clone());
914        }
915
916        let mut states = Vec::new();
917        for collection in snapshot.collections {
918            for index_name in &collection.indices {
919                let metric = metrics_by_name.get(index_name);
920                let kind = metric
921                    .map(|metric| metric.kind)
922                    .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
923                let entries = estimate_index_entries(&collection, kind);
924                states.push(PhysicalIndexState {
925                    name: format!("{}::{}", collection.name, index_name),
926                    kind,
927                    collection: Some(collection.name.clone()),
928                    enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
929                    entries,
930                    estimated_memory_bytes: estimate_index_memory(entries, kind),
931                    last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
932                    backend: index_backend_name(kind).to_string(),
933                    artifact_kind: None,
934                    artifact_root_page: None,
935                    artifact_checksum: None,
936                    build_state: "catalog-derived".to_string(),
937                });
938            }
939        }
940
941        states
942    }
943
944    pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
945        let mut roots = BTreeMap::new();
946
947        for name in self.store.list_collections() {
948            let Some(manager) = self.store.get_collection(&name) else {
949                continue;
950            };
951
952            let stats = manager.stats();
953            let mut root = fnv1a_seed();
954            fnv1a_hash_value(&mut root, &name);
955            fnv1a_hash_value(&mut root, &stats.total_entities);
956            fnv1a_hash_value(&mut root, &stats.growing_count);
957            fnv1a_hash_value(&mut root, &stats.sealed_count);
958            fnv1a_hash_value(&mut root, &stats.archived_count);
959            fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
960            fnv1a_hash_value(&mut root, &stats.seal_ops);
961            fnv1a_hash_value(&mut root, &stats.compact_ops);
962
963            let mut entities = manager.query_all(|_| true);
964            entities.sort_by_key(|entity| entity.id.raw());
965
966            for entity in entities {
967                fnv1a_hash_value(&mut root, &entity.id.raw());
968                fnv1a_hash_value(&mut root, &entity.kind);
969                fnv1a_hash_value(&mut root, &entity.created_at);
970                fnv1a_hash_value(&mut root, &entity.updated_at);
971                fnv1a_hash_value(&mut root, &entity.data);
972                fnv1a_hash_value(&mut root, &entity.sequence_id);
973                fnv1a_hash_value(&mut root, &entity.embeddings().len());
974                fnv1a_hash_value(&mut root, &entity.cross_refs().len());
975            }
976
977            roots.insert(name, root);
978        }
979
980        roots
981    }
982
983    // ========================================================================
984    // Reference Helpers - For Metadata Linking
985    // ========================================================================
986
987    /// Create a reference to a table row
988    pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
989        TableRef::new(table, row_id)
990    }
991
992    /// Create a reference to a graph node
993    pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
994        NodeRef::new(collection, node_id)
995    }
996
997    /// Create a reference to a vector
998    pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
999        VectorRef::new(collection, vector_id)
1000    }
1001
1002    // ========================================================================
1003    // Query API
1004    // ========================================================================
1005
1006    /// Start building a query
1007    pub fn query(&self) -> QueryBuilder {
1008        QueryBuilder::new(self.store.clone())
1009    }
1010
1011    /// Quick vector similarity search.
1012    ///
1013    /// For collections with >= 100 vectors a lazily-built HNSW index is used
1014    /// for fast approximate nearest-neighbor lookup.  Smaller collections fall
1015    /// back to an exact brute-force scan so that the overhead of building an
1016    /// index is avoided when it would not pay off.
1017    pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1018        if self.store.get_collection(collection).is_none() {
1019            return Vec::new();
1020        }
1021
1022        // Try the HNSW fast path for collections with enough vectors.
1023        if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1024            let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1025            let results = hnsw.search(vector, k);
1026            let mapped = self.hnsw_results_to_similar(collection, &results);
1027            if !mapped.is_empty() {
1028                return mapped;
1029            }
1030        }
1031
1032        // Fallback: brute-force scan (small / mixed-type collections).
1033        self.similar_brute_force(collection, vector, k)
1034    }
1035
1036    /// Brute-force cosine similarity scan (exact results, O(n)).
1037    fn similar_brute_force(
1038        &self,
1039        collection: &str,
1040        vector: &[f32],
1041        k: usize,
1042    ) -> Vec<SimilarResult> {
1043        let manager = match self.store.get_collection(collection) {
1044            Some(m) => m,
1045            None => return Vec::new(),
1046        };
1047
1048        let entities = manager.query_all(|_| true);
1049        let mut results: Vec<SimilarResult> = entities
1050            .iter()
1051            .filter_map(|e| {
1052                let score = match &e.data {
1053                    EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1054                    _ => e
1055                        .embeddings()
1056                        .iter()
1057                        .map(|emb| cosine_similarity(vector, &emb.vector))
1058                        .fold(0.0f32, f32::max),
1059                };
1060                let distance = (1.0 - score).max(0.0);
1061                if score > 0.0 {
1062                    Some(SimilarResult {
1063                        entity_id: e.id,
1064                        score,
1065                        distance,
1066                        entity: e.clone(),
1067                    })
1068                } else {
1069                    None
1070                }
1071            })
1072            .collect();
1073
1074        results.sort_by(|a, b| {
1075            b.score
1076                .partial_cmp(&a.score)
1077                .unwrap_or(std::cmp::Ordering::Equal)
1078        });
1079        results.truncate(k);
1080        results
1081    }
1082
1083    /// Return (or lazily build) a per-collection HNSW index.
1084    ///
1085    /// Returns `None` when the collection has fewer than 100 dense vectors
1086    /// (the brute-force path is cheaper for tiny collections) or when there
1087    /// is a dimension mismatch with the query vector.
1088    ///
1089    /// The cached index is automatically invalidated when the live entity
1090    /// count in the collection changes, so inserts and deletes are picked up
1091    /// transparently without requiring explicit invalidation calls.
1092    fn get_or_build_hnsw_index(
1093        &self,
1094        collection: &str,
1095        query_dim: usize,
1096    ) -> Option<Arc<RwLock<HnswIndex>>> {
1097        let manager = self.store.get_collection(collection)?;
1098        let live_count = manager.count();
1099
1100        // Fast path: check if a fresh index already exists.
1101        {
1102            let indexes = self
1103                .vector_indexes
1104                .read()
1105                .unwrap_or_else(|e| e.into_inner());
1106            if let Some(cached) = indexes.get(collection) {
1107                if cached.entity_count == live_count {
1108                    return Some(Arc::clone(&cached.index));
1109                }
1110            }
1111        }
1112
1113        // Either no cached index exists or it is stale -- (re)build it.
1114        let entities = manager.query_all(|_| true);
1115
1116        let vectors: Vec<(u64, Vec<f32>)> = entities
1117            .iter()
1118            .filter_map(|e| match &e.data {
1119                EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1120                    Some((e.id.raw(), v.dense.clone()))
1121                }
1122                _ => None,
1123            })
1124            .collect();
1125
1126        // Only build the HNSW index when there are enough vectors to justify it.
1127        const MIN_VECTORS_FOR_HNSW: usize = 100;
1128        if vectors.len() < MIN_VECTORS_FOR_HNSW {
1129            return None;
1130        }
1131
1132        // Build the HNSW index with cosine distance (matching the brute-force path).
1133        let config = crate::storage::engine::HnswConfig::with_m(16)
1134            .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1135            .with_ef_construction(100)
1136            .with_ef_search(50);
1137        let mut hnsw = HnswIndex::new(query_dim, config);
1138
1139        for (id, vec) in &vectors {
1140            hnsw.insert_with_id(*id, vec.clone());
1141        }
1142
1143        let index = Arc::new(RwLock::new(hnsw));
1144
1145        // Store in the cache (double-check pattern to avoid duplicate builds).
1146        let mut indexes = self
1147            .vector_indexes
1148            .write()
1149            .unwrap_or_else(|e| e.into_inner());
1150        // Re-check under write lock: another thread may have built in the meantime.
1151        if let Some(cached) = indexes.get(collection) {
1152            if cached.entity_count == live_count {
1153                return Some(Arc::clone(&cached.index));
1154            }
1155        }
1156        indexes.insert(
1157            collection.to_string(),
1158            CachedVectorIndex {
1159                index: Arc::clone(&index),
1160                entity_count: live_count,
1161            },
1162        );
1163        Some(index)
1164    }
1165
1166    /// Convert HNSW `DistanceResult`s back to the DevX `SimilarResult` type.
1167    fn hnsw_results_to_similar(
1168        &self,
1169        collection: &str,
1170        results: &[crate::storage::engine::DistanceResult],
1171    ) -> Vec<SimilarResult> {
1172        results
1173            .iter()
1174            .filter_map(|dr| {
1175                let entity_id = EntityId::new(dr.id);
1176                let entity = self.store.get(collection, entity_id)?;
1177                // Cosine distance = 1 - similarity.
1178                let score = (1.0 - dr.distance).max(0.0);
1179                if score > 0.0 {
1180                    Some(SimilarResult {
1181                        entity_id,
1182                        score,
1183                        distance: dr.distance,
1184                        entity,
1185                    })
1186                } else {
1187                    None
1188                }
1189            })
1190            .collect()
1191    }
1192
1193    /// Invalidate the cached HNSW index for a collection.
1194    ///
1195    /// Called after vector inserts / deletes so the next search lazily rebuilds
1196    /// a fresh index that includes the new data.
1197    pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1198        let mut indexes = self
1199            .vector_indexes
1200            .write()
1201            .unwrap_or_else(|e| e.into_inner());
1202        indexes.remove(collection);
1203    }
1204
1205    /// Get entity by ID from any collection
1206    pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1207        self.store.get_any(id).map(|(_, e)| e)
1208    }
1209
1210    /// Get entity with its collection name
1211    pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1212        self.store.get_any(id)
1213    }
1214
1215    // ========================================================================
1216    // Batch Operations - Performance
1217    // ========================================================================
1218
1219    /// Batch get multiple entities by ID. More efficient than N individual get() calls.
1220    pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1221        ids.iter().map(|id| self.get(*id)).collect()
1222    }
1223
1224    /// Start a batch operation for bulk inserts
1225    pub fn batch(&self) -> BatchBuilder {
1226        BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1227    }
1228
1229    // ========================================================================
1230    // Preprocessing
1231    // ========================================================================
1232
1233    /// Add a preprocessor hook
1234    pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1235        let mut preprocessors = self
1236            .preprocessors
1237            .write()
1238            .unwrap_or_else(|poisoned| poisoned.into_inner());
1239        preprocessors.push(Arc::from(preprocessor));
1240    }
1241
1242    // ========================================================================
1243    // Cross-Reference Navigation
1244    // ========================================================================
1245
1246    /// Get all entities linked FROM the given entity
1247    pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1248        self.store
1249            .get_refs_from(id)
1250            .into_iter()
1251            .filter_map(|(target_id, ref_type, collection)| {
1252                self.store
1253                    .get(&collection, target_id)
1254                    .map(|entity| LinkedEntity {
1255                        entity,
1256                        ref_type,
1257                        collection,
1258                    })
1259            })
1260            .collect()
1261    }
1262
1263    /// Get all entities linked TO the given entity
1264    pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1265        self.store
1266            .get_refs_to(id)
1267            .into_iter()
1268            .filter_map(|(source_id, ref_type, collection)| {
1269                self.store
1270                    .get(&collection, source_id)
1271                    .map(|entity| LinkedEntity {
1272                        entity,
1273                        ref_type,
1274                        collection,
1275                    })
1276            })
1277            .collect()
1278    }
1279
1280    /// Get the underlying store (for advanced operations)
1281    pub fn store(&self) -> Arc<UnifiedStore> {
1282        self.store.clone()
1283    }
1284
1285    /// Per-process registry of `vector.turbo` collection state (issue
1286    /// #693). Lazily allocated on first access; the inner map holds an
1287    /// `Arc<TurboCollectionState>` per turbo collection so concurrent
1288    /// SEARCHers and INSERTers can share the in-memory index without
1289    /// holding the outer map lock for the whole operation.
1290    pub(crate) fn turbo_collections(
1291        &self,
1292    ) -> &Arc<
1293        parking_lot::Mutex<
1294            std::collections::HashMap<
1295                String,
1296                Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
1297            >,
1298        >,
1299    > {
1300        self.turbo_collections
1301            .get_or_init(|| Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())))
1302    }
1303
1304    /// Look up (or lazily create) the runtime state for a turbo
1305    /// collection. Returns `None` for collections that are not marked
1306    /// `turbo` in the catalog.
1307    ///
1308    /// Lazy creation matters for two reasons: (a) a fresh process
1309    /// reopens a persistent DB without any turbo state in memory; (b)
1310    /// the contract (dim + metric) is read here instead of being
1311    /// stored separately, which keeps the catalog as the single source
1312    /// of truth.
1313    pub(crate) fn turbo_state(
1314        &self,
1315        collection: &str,
1316    ) -> Option<Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>> {
1317        if !crate::runtime::vector_turbo_kind::is_turbo(&self.store, collection) {
1318            return None;
1319        }
1320        let map = self.turbo_collections();
1321        {
1322            let guard = map.lock();
1323            if let Some(state) = guard.get(collection) {
1324                return Some(Arc::clone(state));
1325            }
1326        }
1327        let contract = self.collection_contract(collection)?;
1328        let dim = contract.vector_dimension?;
1329        let metric = contract
1330            .vector_metric
1331            .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine);
1332        let state = Arc::new(
1333            crate::runtime::vector_turbo_kind::TurboCollectionState::new(
1334                dim,
1335                metric,
1336                self.store.pager(),
1337            ),
1338        );
1339        // Issue #674 — derive the `.tv` snapshot path from the
1340        // resolved tier layout if the active layout enables snapshot
1341        // files. `Minimal` (embedded) returns `None` and the state
1342        // never writes a `.tv` for this collection.
1343        if let Some((_, paths)) = self.options.resolve_tiered_layout() {
1344            state.set_snapshot_path(paths.turbo_snapshot_path(collection));
1345        }
1346        let mut guard = map.lock();
1347        let inserted_now = !guard.contains_key(collection);
1348        let entry = guard
1349            .entry(collection.to_string())
1350            .or_insert_with(|| Arc::clone(&state));
1351        let handle = Arc::clone(entry);
1352        drop(guard);
1353        // Issue #673 — first materialisation of this collection's
1354        // state kicks off a background rebuild worker. Non-vector
1355        // traffic is unaffected (it never reaches `turbo_state`);
1356        // vector SEARCH/INSERT observes the readiness flag via
1357        // `wait_until_ready` instead of blocking inline on the
1358        // synchronous lazy populate.
1359        if inserted_now {
1360            let join = crate::runtime::vector_turbo_kind::spawn_background_rebuild(
1361                Arc::clone(&self.store),
1362                collection.to_string(),
1363                Arc::clone(&handle),
1364            );
1365            self.turbo_rebuild_workers.lock().push(join);
1366        }
1367        Some(handle)
1368    }
1369
1370    /// Lazily-initialised ML runtime. First caller wins; subsequent
1371    /// callers observe the same instance. The runtime is created
1372    /// with an in-memory persistence backend by default — production
1373    /// deployments that need durable model state will swap this for
1374    /// a store-backed backend when the persistence integration lands.
1375    pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1376        self.ml_runtime.get_or_init(|| {
1377            crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1378                // No-op work fn — the SQL scalars call predict
1379                // synchronously; async training jobs come in a later
1380                // sprint alongside `CREATE MODEL`.
1381                |_handle| Ok(String::new()),
1382            ))
1383        })
1384    }
1385
1386    /// Shared semantic cache for `SEMANTIC_CACHE_*` scalars. Uses
1387    /// default config until a `SET SEMANTIC_CACHE` admin surface
1388    /// lands.
1389    pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1390        self.semantic_cache.get_or_init(|| {
1391            Arc::new(crate::storage::ml::SemanticCache::new(
1392                crate::storage::ml::SemanticCacheConfig::default(),
1393            ))
1394        })
1395    }
1396
1397    /// Lazily-initialised hypertable registry. Populated by `CREATE
1398    /// HYPERTABLE` DDL; consumed by chunk routing + retention.
1399    pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1400        self.hypertables
1401            .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1402    }
1403
1404    /// Lazily-initialised continuous-aggregate engine. Populated by
1405    /// `CA_REGISTER` and read by `CA_REFRESH` / `CA_STATE` scalars.
1406    pub fn continuous_aggregates(
1407        &self,
1408    ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1409        self.continuous_aggregates.get_or_init(|| {
1410            Arc::new(
1411                crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1412            )
1413        })
1414    }
1415
1416    pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1417        let mut file = File::open(path)?;
1418        let mut magic = [0u8; 4];
1419        let read = file.read(&mut magic)?;
1420        Ok(read == 4 && reddb_file::native_store_magic_matches(&magic))
1421    }
1422}