Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_access.rs

1use super::*;
2
3impl RedDB {
4    /// Access the quorum coordinator (Phase 2.6 PG parity).
5    ///
6    /// `None` when this instance is not a primary. Write paths call
7    /// `quorum.wait_for_quorum(lsn)` after the primary WAL commit to
8    /// block until the configured replica quorum has acked; `Async`
9    /// mode (default) returns instantly so no behavioural change.
10    pub fn quorum_coordinator(
11        &self,
12    ) -> Option<&Arc<crate::replication::quorum::QuorumCoordinator>> {
13        self.quorum.as_ref()
14    }
15
16    /// Wait for the configured replica quorum on a given LSN.
17    ///
18    /// Convenience wrapper that returns `Ok(())` when:
19    /// * there is no quorum coordinator (non-primary instance), OR
20    /// * the coordinator is in `Async` mode (historical behaviour).
21    ///
22    /// Callers in the write path (INSERT/UPDATE/DELETE dispatch) invoke
23    /// this after logging the commit record. Returning `Err` signals
24    /// that replication did NOT reach quorum — the primary still has
25    /// the record but the client ack should be deferred or rejected.
26    pub fn wait_for_replication_quorum(
27        &self,
28        target_lsn: u64,
29    ) -> Result<(), crate::replication::QuorumError> {
30        match &self.quorum {
31            Some(q) => q.wait_for_quorum(target_lsn),
32            None => Ok(()),
33        }
34    }
35}
36
37impl RedDB {
38    pub(crate) fn native_registry_summary_from_metadata(
39        &self,
40        metadata: &PhysicalMetadataFile,
41    ) -> NativeRegistrySummary {
42        const SAMPLE_LIMIT: usize = 32;
43
44        let collection_names: Vec<_> = metadata
45            .catalog
46            .stats_by_collection
47            .keys()
48            .take(SAMPLE_LIMIT)
49            .cloned()
50            .collect();
51        let indexes: Vec<_> = metadata
52            .indexes
53            .iter()
54            .take(SAMPLE_LIMIT)
55            .map(|index| NativeRegistryIndexSummary {
56                name: index.name.clone(),
57                kind: index.kind.as_str().to_string(),
58                collection: index.collection.clone(),
59                enabled: index.enabled,
60                entries: index.entries as u64,
61                estimated_memory_bytes: index.estimated_memory_bytes,
62                last_refresh_ms: index.last_refresh_ms,
63                backend: index.backend.clone(),
64            })
65            .collect();
66        let graph_projections: Vec<_> = metadata
67            .graph_projections
68            .iter()
69            .take(SAMPLE_LIMIT)
70            .map(|projection| NativeRegistryProjectionSummary {
71                name: projection.name.clone(),
72                source: projection.source.clone(),
73                created_at_unix_ms: projection.created_at_unix_ms,
74                updated_at_unix_ms: projection.updated_at_unix_ms,
75                node_labels: projection.node_labels.clone(),
76                node_types: projection.node_types.clone(),
77                edge_labels: projection.edge_labels.clone(),
78                last_materialized_sequence: projection.last_materialized_sequence,
79            })
80            .collect();
81        let analytics_jobs = metadata
82            .analytics_jobs
83            .iter()
84            .take(SAMPLE_LIMIT)
85            .map(|job| NativeRegistryJobSummary {
86                id: job.id.clone(),
87                kind: job.kind.clone(),
88                projection: job.projection.clone(),
89                state: job.state.clone(),
90                created_at_unix_ms: job.created_at_unix_ms,
91                updated_at_unix_ms: job.updated_at_unix_ms,
92                last_run_sequence: job.last_run_sequence,
93                metadata: job.metadata.clone(),
94            })
95            .collect::<Vec<_>>();
96        let vector_artifacts = self
97            .native_vector_artifact_records()
98            .into_iter()
99            .map(|(summary, _)| summary)
100            .take(SAMPLE_LIMIT)
101            .collect::<Vec<_>>();
102        let vector_artifact_count = self.native_vector_artifact_collection_count() as u32;
103
104        NativeRegistrySummary {
105            collection_count: metadata.catalog.total_collections as u32,
106            index_count: metadata.indexes.len() as u32,
107            graph_projection_count: metadata.graph_projections.len() as u32,
108            analytics_job_count: metadata.analytics_jobs.len() as u32,
109            vector_artifact_count,
110            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
111            indexes_complete: metadata.indexes.len() <= SAMPLE_LIMIT,
112            graph_projections_complete: metadata.graph_projections.len() <= SAMPLE_LIMIT,
113            analytics_jobs_complete: metadata.analytics_jobs.len() <= SAMPLE_LIMIT,
114            vector_artifacts_complete: vector_artifact_count as usize <= SAMPLE_LIMIT,
115            omitted_collection_count: metadata
116                .catalog
117                .stats_by_collection
118                .len()
119                .saturating_sub(collection_names.len())
120                as u32,
121            omitted_index_count: metadata.indexes.len().saturating_sub(indexes.len()) as u32,
122            omitted_graph_projection_count: metadata
123                .graph_projections
124                .len()
125                .saturating_sub(graph_projections.len())
126                as u32,
127            omitted_analytics_job_count: metadata
128                .analytics_jobs
129                .len()
130                .saturating_sub(analytics_jobs.len())
131                as u32,
132            omitted_vector_artifact_count: vector_artifact_count
133                .saturating_sub(vector_artifacts.len() as u32),
134            collection_names,
135            indexes,
136            graph_projections,
137            analytics_jobs,
138            vector_artifacts,
139        }
140    }
141
142    fn native_vector_artifact_collection_count(&self) -> usize {
143        self.native_vector_artifact_records().len()
144    }
145
146    pub(crate) fn native_vector_artifact_records(
147        &self,
148    ) -> Vec<(NativeVectorArtifactSummary, Vec<u8>)> {
149        let mut artifacts = Vec::new();
150        for collection in self.store.list_collections() {
151            let Some(manager) = self.store.get_collection(&collection) else {
152                continue;
153            };
154            let entities = manager.query_all(|_| true);
155            let mut vectors = Vec::new();
156            let mut graph_edges = Vec::new();
157            let mut fulltext_documents = Vec::new();
158            let mut document_records = Vec::new();
159            for entity in entities {
160                match entity.data {
161                    EntityData::Vector(vector) => {
162                        if !vector.dense.is_empty() {
163                            vectors.push((entity.id, vector.dense));
164                        }
165                    }
166                    EntityData::Edge(edge) => {
167                        if let EntityKind::GraphEdge(edge_kind) = entity.kind {
168                            graph_edges.push((
169                                entity.id,
170                                edge_kind.from_node,
171                                edge_kind.to_node,
172                                edge_kind.label,
173                                edge.weight,
174                            ));
175                        }
176                    }
177                    data => {
178                        let text = Self::native_fulltext_text_for_entity(&data);
179                        if !text.trim().is_empty() {
180                            fulltext_documents.push((entity.id, text));
181                        }
182                        if let Some(document) =
183                            Self::native_document_pathvalue_for_entity(entity.id, &data)
184                        {
185                            document_records.push(document);
186                        }
187                    }
188                }
189            }
190            if !vectors.is_empty() {
191                let dimension = vectors[0].1.len();
192                let mut hnsw = HnswIndex::with_dimension(dimension);
193                for (id, vector) in vectors
194                    .into_iter()
195                    .filter(|(_, vector)| vector.len() == dimension)
196                {
197                    hnsw.insert_with_id(id.raw(), vector);
198                }
199                let stats = hnsw.stats();
200                let bytes = hnsw.to_bytes();
201                let summary = NativeVectorArtifactSummary {
202                    collection: collection.clone(),
203                    artifact_kind: "hnsw".to_string(),
204                    vector_count: stats.node_count as u64,
205                    dimension: stats.dimension as u32,
206                    max_layer: stats.max_layer as u32,
207                    serialized_bytes: bytes.len() as u64,
208                    checksum: crate::storage::engine::crc32(&bytes) as u64,
209                };
210                artifacts.push((summary, bytes));
211
212                let n_lists = ((stats.node_count as f64).sqrt().ceil() as usize).max(1);
213                let mut ivf = IvfIndex::new(IvfConfig::new(dimension, n_lists));
214                let training = manager
215                    .query_all(|_| true)
216                    .into_iter()
217                    .filter_map(|entity| match entity.data {
218                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
219                            Some(vector.dense)
220                        }
221                        _ => None,
222                    })
223                    .collect::<Vec<_>>();
224                ivf.train(&training);
225                let items = manager
226                    .query_all(|_| true)
227                    .into_iter()
228                    .filter_map(|entity| match entity.data {
229                        EntityData::Vector(vector) if vector.dense.len() == dimension => {
230                            Some((entity.id.raw(), vector.dense))
231                        }
232                        _ => None,
233                    })
234                    .collect::<Vec<_>>();
235                ivf.add_batch_with_ids(items);
236                let ivf_stats = ivf.stats();
237                let ivf_bytes = ivf.to_bytes();
238                let ivf_summary = NativeVectorArtifactSummary {
239                    collection: collection.clone(),
240                    artifact_kind: "ivf".to_string(),
241                    vector_count: ivf_stats.total_vectors as u64,
242                    dimension: ivf_stats.dimension as u32,
243                    max_layer: ivf_stats.n_lists as u32,
244                    serialized_bytes: ivf_bytes.len() as u64,
245                    checksum: crate::storage::engine::crc32(&ivf_bytes) as u64,
246                };
247                artifacts.push((ivf_summary, ivf_bytes));
248            }
249
250            if !graph_edges.is_empty() {
251                let bytes = Self::serialize_native_graph_adjacency_artifact(&graph_edges);
252                let (edge_count, node_count, label_count) =
253                    Self::inspect_native_graph_adjacency_artifact(&bytes).unwrap_or((0, 0, 0));
254                let summary = NativeVectorArtifactSummary {
255                    collection: collection.clone(),
256                    artifact_kind: "graph.adjacency".to_string(),
257                    vector_count: edge_count,
258                    dimension: node_count as u32,
259                    max_layer: label_count,
260                    serialized_bytes: bytes.len() as u64,
261                    checksum: crate::storage::engine::crc32(&bytes) as u64,
262                };
263                artifacts.push((summary, bytes));
264            }
265
266            if !fulltext_documents.is_empty() {
267                let bytes =
268                    Self::serialize_native_fulltext_artifact(&collection, &fulltext_documents);
269                let (doc_count, term_count, posting_count) =
270                    Self::inspect_native_fulltext_artifact(&bytes).unwrap_or((0, 0, 0));
271                let summary = NativeVectorArtifactSummary {
272                    collection: collection.clone(),
273                    artifact_kind: "text.fulltext".to_string(),
274                    vector_count: posting_count,
275                    dimension: doc_count as u32,
276                    max_layer: term_count as u32,
277                    serialized_bytes: bytes.len() as u64,
278                    checksum: crate::storage::engine::crc32(&bytes) as u64,
279                };
280                artifacts.push((summary, bytes));
281            }
282
283            if !document_records.is_empty() {
284                let bytes = Self::serialize_native_document_pathvalue_artifact(
285                    &collection,
286                    &document_records,
287                );
288                let (doc_count, path_count, value_count, unique_value_count) =
289                    Self::inspect_native_document_pathvalue_artifact(&bytes)
290                        .unwrap_or((0, 0, 0, 0));
291                let _ = unique_value_count;
292                let summary = NativeVectorArtifactSummary {
293                    collection: collection.clone(),
294                    artifact_kind: "document.pathvalue".to_string(),
295                    vector_count: value_count,
296                    dimension: doc_count as u32,
297                    max_layer: path_count as u32,
298                    serialized_bytes: bytes.len() as u64,
299                    checksum: crate::storage::engine::crc32(&bytes) as u64,
300                };
301                artifacts.push((summary, bytes));
302            }
303        }
304        artifacts
305    }
306
307    fn serialize_native_graph_adjacency_artifact(
308        edges: &[(EntityId, String, String, String, f32)],
309    ) -> Vec<u8> {
310        let edges: Vec<reddb_file::NativeGraphEdge> = edges
311            .iter()
312            .map(
313                |(edge_id, from_node, to_node, label, weight)| reddb_file::NativeGraphEdge {
314                    edge_id: edge_id.raw(),
315                    from_node: from_node.clone(),
316                    to_node: to_node.clone(),
317                    label: label.clone(),
318                    weight: *weight,
319                },
320            )
321            .collect();
322        reddb_file::encode_native_graph_adjacency_frame(&reddb_file::NativeGraphAdjacencyFrame {
323            edges,
324        })
325    }
326
327    pub(crate) fn inspect_native_graph_adjacency_artifact(
328        bytes: &[u8],
329    ) -> Result<(u64, u64, u32), String> {
330        let frame =
331            reddb_file::decode_native_graph_adjacency_frame(bytes).map_err(|e| e.to_string())?;
332        let edge_count = frame.edges.len() as u64;
333        let mut nodes = BTreeSet::new();
334        let mut labels = BTreeSet::new();
335        for edge in frame.edges {
336            nodes.insert(edge.from_node);
337            nodes.insert(edge.to_node);
338            labels.insert(edge.label);
339        }
340        Ok((edge_count, nodes.len() as u64, labels.len() as u32))
341    }
342
343    fn serialize_native_fulltext_artifact(
344        collection: &str,
345        documents: &[(EntityId, String)],
346    ) -> Vec<u8> {
347        let mut postings: BTreeMap<String, Vec<(u64, u32)>> = BTreeMap::new();
348        for (entity_id, text) in documents {
349            let mut frequencies: BTreeMap<String, u32> = BTreeMap::new();
350            for token in Self::native_fulltext_tokenize(text) {
351                *frequencies.entry(token).or_insert(0) += 1;
352            }
353            for (token, count) in frequencies {
354                postings
355                    .entry(token)
356                    .or_default()
357                    .push((entity_id.raw(), count));
358            }
359        }
360
361        let terms = postings
362            .into_iter()
363            .map(|(term, postings)| reddb_file::NativeFulltextTerm {
364                term,
365                postings: postings
366                    .into_iter()
367                    .map(
368                        |(entity_id, term_count)| reddb_file::NativeFulltextPosting {
369                            entity_id,
370                            term_count,
371                        },
372                    )
373                    .collect(),
374            })
375            .collect();
376        reddb_file::encode_native_fulltext_frame(&reddb_file::NativeFulltextFrame {
377            collection: collection.to_string(),
378            doc_count: documents.len() as u32,
379            terms,
380        })
381    }
382
383    pub(crate) fn inspect_native_fulltext_artifact(
384        bytes: &[u8],
385    ) -> Result<(u64, u64, u64), String> {
386        let frame = reddb_file::decode_native_fulltext_frame(bytes).map_err(|e| e.to_string())?;
387        let posting_count: u64 = frame
388            .terms
389            .iter()
390            .map(|term| term.postings.len() as u64)
391            .sum();
392        Ok((
393            frame.doc_count as u64,
394            frame.terms.len() as u64,
395            posting_count,
396        ))
397    }
398
399    fn serialize_native_document_pathvalue_artifact(
400        collection: &str,
401        documents: &[(EntityId, Vec<(String, String)>)],
402    ) -> Vec<u8> {
403        let docs: Vec<reddb_file::NativeDocPathValue> = documents
404            .iter()
405            .map(|(entity_id, entries)| reddb_file::NativeDocPathValue {
406                entity_id: entity_id.raw(),
407                entries: entries
408                    .iter()
409                    .map(|(path, value)| reddb_file::NativeDocPathValueEntry {
410                        path: path.clone(),
411                        value: value.clone(),
412                    })
413                    .collect(),
414            })
415            .collect();
416        reddb_file::encode_native_doc_pathvalue_frame(&reddb_file::NativeDocPathValueFrame {
417            collection: collection.to_string(),
418            docs,
419        })
420    }
421
422    pub(crate) fn inspect_native_document_pathvalue_artifact(
423        bytes: &[u8],
424    ) -> Result<(u64, u64, u64, u64), String> {
425        let frame =
426            reddb_file::decode_native_doc_pathvalue_frame(bytes).map_err(|e| e.to_string())?;
427        let doc_count = frame.docs.len() as u64;
428        let mut paths = BTreeSet::new();
429        let mut values = BTreeSet::new();
430        let mut total_entries = 0u64;
431        for doc in frame.docs {
432            for entry in doc.entries {
433                paths.insert(entry.path);
434                values.insert(entry.value);
435                total_entries += 1;
436            }
437        }
438        Ok((
439            doc_count,
440            paths.len() as u64,
441            total_entries,
442            values.len() as u64,
443        ))
444    }
445
446    fn native_document_pathvalue_for_entity(
447        entity_id: EntityId,
448        data: &EntityData,
449    ) -> Option<(EntityId, Vec<(String, String)>)> {
450        let mut entries = Vec::new();
451        match data {
452            EntityData::Row(row) => {
453                if let Some(named) = &row.named {
454                    for (key, value) in named {
455                        Self::collect_native_document_entries_from_value(key, value, &mut entries);
456                    }
457                }
458                for (idx, value) in row.columns.iter().enumerate() {
459                    let path = format!("columns[{idx}]");
460                    Self::collect_native_document_entries_from_value(&path, value, &mut entries);
461                }
462            }
463            EntityData::Node(node) => {
464                for (key, value) in &node.properties {
465                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
466                }
467            }
468            EntityData::Edge(edge) => {
469                for (key, value) in &edge.properties {
470                    Self::collect_native_document_entries_from_value(key, value, &mut entries);
471                }
472            }
473            EntityData::Vector(_) => {}
474            EntityData::TimeSeries(_) => {}
475            EntityData::QueueMessage(_) => {}
476        }
477        if entries.is_empty() {
478            None
479        } else {
480            Some((entity_id, entries))
481        }
482    }
483
484    fn collect_native_document_entries_from_value(
485        path: &str,
486        value: &Value,
487        out: &mut Vec<(String, String)>,
488    ) {
489        match value {
490            Value::Json(bytes) | Value::Blob(bytes) => {
491                if let Ok(json) = crate::json::from_slice::<JsonValue>(bytes) {
492                    Self::collect_native_document_entries_from_json(path, &json, out);
493                }
494            }
495            _ => {}
496        }
497    }
498
499    fn collect_native_document_entries_from_json(
500        path: &str,
501        value: &JsonValue,
502        out: &mut Vec<(String, String)>,
503    ) {
504        match value {
505            JsonValue::Object(entries) => {
506                for (key, value) in entries {
507                    let next = if path.is_empty() {
508                        key.clone()
509                    } else {
510                        format!("{path}.{key}")
511                    };
512                    Self::collect_native_document_entries_from_json(&next, value, out);
513                }
514            }
515            JsonValue::Array(items) => {
516                for (idx, value) in items.iter().enumerate() {
517                    let next = format!("{path}[{idx}]");
518                    Self::collect_native_document_entries_from_json(&next, value, out);
519                }
520            }
521            _ => {
522                if let Some(text) = Self::native_json_scalar_text(value) {
523                    out.push((path.to_string(), text));
524                }
525            }
526        }
527    }
528
529    fn native_json_scalar_text(value: &JsonValue) -> Option<String> {
530        match value {
531            JsonValue::Null => None,
532            JsonValue::Bool(value) => Some(value.to_string()),
533            JsonValue::Number(value) => Some(value.to_string()),
534            JsonValue::String(value) => Some(value.clone()),
535            JsonValue::Array(_) | JsonValue::Object(_) => None,
536        }
537    }
538
539    fn native_fulltext_text_for_entity(data: &EntityData) -> String {
540        match data {
541            EntityData::Row(row) => {
542                let mut parts = Vec::new();
543                if let Some(named) = &row.named {
544                    for value in named.values() {
545                        if let Some(text) = Self::native_value_text(value) {
546                            parts.push(text);
547                        }
548                    }
549                }
550                for value in &row.columns {
551                    if let Some(text) = Self::native_value_text(value) {
552                        parts.push(text);
553                    }
554                }
555                parts.join(" ")
556            }
557            EntityData::Node(node) => node
558                .properties
559                .values()
560                .filter_map(Self::native_value_text)
561                .collect::<Vec<_>>()
562                .join(" "),
563            EntityData::Edge(edge) => edge
564                .properties
565                .values()
566                .filter_map(Self::native_value_text)
567                .collect::<Vec<_>>()
568                .join(" "),
569            EntityData::Vector(vector) => vector.content.clone().unwrap_or_default(),
570            EntityData::TimeSeries(ts) => ts.metric.clone(),
571            EntityData::QueueMessage(_) => String::new(),
572        }
573    }
574
575    fn native_value_text(value: &Value) -> Option<String> {
576        match value {
577            Value::Text(value) => Some(value.to_string()),
578            Value::Json(value) => String::from_utf8(value.clone()).ok(),
579            Value::Blob(value) => String::from_utf8(value.clone()).ok(),
580            Value::Integer(value) => Some(value.to_string()),
581            Value::UnsignedInteger(value) => Some(value.to_string()),
582            Value::Float(value) => Some(value.to_string()),
583            Value::Boolean(value) => Some(value.to_string()),
584            Value::IpAddr(value) => Some(value.to_string()),
585            Value::NodeRef(value) => Some(value.clone()),
586            Value::EdgeRef(value) => Some(value.clone()),
587            Value::RowRef(table, row_id) => Some(format!("{table}:{row_id}")),
588            Value::VectorRef(collection, vector_id) => Some(format!("{collection}:{vector_id}")),
589            Value::Timestamp(value) => Some(value.to_string()),
590            Value::Duration(value) => Some(value.to_string()),
591            Value::Uuid(_) | Value::MacAddr(_) | Value::Vector(_) | Value::Null => None,
592            Value::Color([r, g, b]) => Some(format!("#{:02X}{:02X}{:02X}", r, g, b)),
593            Value::Email(s) => Some(s.clone()),
594            Value::Url(s) => Some(s.clone()),
595            Value::Phone(n) => Some(format!("+{}", n)),
596            Value::Semver(packed) => Some(format!(
597                "{}.{}.{}",
598                packed / 1_000_000,
599                (packed / 1_000) % 1_000,
600                packed % 1_000
601            )),
602            Value::Cidr(ip, prefix) => Some(format!(
603                "{}.{}.{}.{}/{}",
604                (ip >> 24) & 0xFF,
605                (ip >> 16) & 0xFF,
606                (ip >> 8) & 0xFF,
607                ip & 0xFF,
608                prefix
609            )),
610            Value::Date(days) => Some(days.to_string()),
611            Value::Time(ms) => {
612                let total_secs = ms / 1000;
613                Some(format!(
614                    "{:02}:{:02}:{:02}",
615                    total_secs / 3600,
616                    (total_secs / 60) % 60,
617                    total_secs % 60
618                ))
619            }
620            Value::Decimal(v) => Some(Value::Decimal(*v).display_string()),
621            Value::EnumValue(i) => Some(format!("enum({})", i)),
622            Value::Array(_) => None,
623            Value::TimestampMs(ms) => Some(ms.to_string()),
624            Value::Ipv4(ip) => Some(format!(
625                "{}.{}.{}.{}",
626                (ip >> 24) & 0xFF,
627                (ip >> 16) & 0xFF,
628                (ip >> 8) & 0xFF,
629                ip & 0xFF
630            )),
631            Value::Ipv6(bytes) => Some(format!("{}", std::net::Ipv6Addr::from(*bytes))),
632            Value::Subnet(ip, mask) => {
633                let prefix = mask.leading_ones();
634                Some(format!(
635                    "{}.{}.{}.{}/{}",
636                    (ip >> 24) & 0xFF,
637                    (ip >> 16) & 0xFF,
638                    (ip >> 8) & 0xFF,
639                    ip & 0xFF,
640                    prefix
641                ))
642            }
643            Value::Port(p) => Some(p.to_string()),
644            Value::Latitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
645            Value::Longitude(micro) => Some(format!("{:.6}", *micro as f64 / 1_000_000.0)),
646            Value::GeoPoint(lat, lon) => Some(format!(
647                "{:.6},{:.6}",
648                *lat as f64 / 1_000_000.0,
649                *lon as f64 / 1_000_000.0
650            )),
651            Value::Country2(c) => Some(String::from_utf8_lossy(c).to_string()),
652            Value::Country3(c) => Some(String::from_utf8_lossy(c).to_string()),
653            Value::Lang2(c) => Some(String::from_utf8_lossy(c).to_string()),
654            Value::Lang5(c) => Some(String::from_utf8_lossy(c).to_string()),
655            Value::Currency(c) => Some(String::from_utf8_lossy(c).to_string()),
656            Value::AssetCode(code) => Some(code.clone()),
657            Value::Money { .. } => Some(value.display_string()),
658            Value::ColorAlpha([r, g, b, a]) => {
659                Some(format!("#{:02X}{:02X}{:02X}{:02X}", r, g, b, a))
660            }
661            Value::BigInt(v) => Some(v.to_string()),
662            Value::KeyRef(col, key) => Some(format!("{}:{}", col, key)),
663            Value::DocRef(col, id) => Some(format!("{}#{}", col, id)),
664            Value::TableRef(name) => Some(name.clone()),
665            Value::PageRef(page_id) => Some(format!("page:{}", page_id)),
666            Value::Secret(_) | Value::Password(_) => None,
667        }
668    }
669
670    fn native_fulltext_tokenize(text: &str) -> Vec<String> {
671        text.to_lowercase()
672            .split(|c: char| !c.is_alphanumeric())
673            .filter(|s| s.len() >= 2)
674            .map(|s| s.to_string())
675            .collect()
676    }
677
678    pub(crate) fn native_recovery_summary_from_metadata(
679        metadata: &PhysicalMetadataFile,
680    ) -> NativeRecoverySummary {
681        const SAMPLE_LIMIT: usize = 16;
682
683        let snapshots: Vec<_> = metadata
684            .snapshots
685            .iter()
686            .rev()
687            .take(SAMPLE_LIMIT)
688            .map(|snapshot| NativeSnapshotSummary {
689                snapshot_id: snapshot.snapshot_id,
690                created_at_unix_ms: snapshot.created_at_unix_ms,
691                superblock_sequence: snapshot.superblock_sequence,
692                collection_count: snapshot.collection_count as u32,
693                total_entities: snapshot.total_entities as u64,
694            })
695            .collect();
696        let exports: Vec<_> = metadata
697            .exports
698            .iter()
699            .rev()
700            .take(SAMPLE_LIMIT)
701            .map(|export| NativeExportSummary {
702                name: export.name.clone(),
703                created_at_unix_ms: export.created_at_unix_ms,
704                snapshot_id: export.snapshot_id,
705                superblock_sequence: export.superblock_sequence,
706                collection_count: export.collection_count as u32,
707                total_entities: export.total_entities as u64,
708            })
709            .collect();
710
711        NativeRecoverySummary {
712            snapshot_count: metadata.snapshots.len() as u32,
713            export_count: metadata.exports.len() as u32,
714            snapshots_complete: metadata.snapshots.len() <= SAMPLE_LIMIT,
715            exports_complete: metadata.exports.len() <= SAMPLE_LIMIT,
716            omitted_snapshot_count: metadata.snapshots.len().saturating_sub(snapshots.len()) as u32,
717            omitted_export_count: metadata.exports.len().saturating_sub(exports.len()) as u32,
718            snapshots,
719            exports,
720        }
721    }
722
723    pub(crate) fn native_catalog_summary_from_metadata(
724        metadata: &PhysicalMetadataFile,
725    ) -> NativeCatalogSummary {
726        const SAMPLE_LIMIT: usize = 32;
727
728        let collections: Vec<_> = metadata
729            .catalog
730            .stats_by_collection
731            .iter()
732            .take(SAMPLE_LIMIT)
733            .map(|(name, stats)| NativeCatalogCollectionSummary {
734                name: name.clone(),
735                entities: stats.entities as u64,
736                cross_refs: stats.cross_refs as u64,
737                segments: stats.segments as u32,
738            })
739            .collect();
740
741        NativeCatalogSummary {
742            collection_count: metadata.catalog.total_collections as u32,
743            total_entities: metadata.catalog.total_entities as u64,
744            collections_complete: metadata.catalog.stats_by_collection.len() <= SAMPLE_LIMIT,
745            omitted_collection_count: metadata
746                .catalog
747                .stats_by_collection
748                .len()
749                .saturating_sub(collections.len()) as u32,
750            collections,
751        }
752    }
753
754    pub(crate) fn native_metadata_state_summary_from_metadata(
755        metadata: &PhysicalMetadataFile,
756    ) -> NativeMetadataStateSummary {
757        NativeMetadataStateSummary {
758            protocol_version: metadata.protocol_version.clone(),
759            generated_at_unix_ms: metadata.generated_at_unix_ms,
760            last_loaded_from: metadata.last_loaded_from.clone(),
761            last_healed_at_unix_ms: metadata.last_healed_at_unix_ms,
762        }
763    }
764
765    pub(crate) fn inspect_native_header_against_metadata(
766        native: PhysicalFileHeader,
767        metadata: &PhysicalMetadataFile,
768    ) -> NativeHeaderInspection {
769        let expected = Self::native_header_from_metadata(metadata);
770        let mut mismatches = Vec::new();
771
772        if native.format_version != expected.format_version {
773            mismatches.push(NativeHeaderMismatch {
774                field: "format_version",
775                native: native.format_version.to_string(),
776                expected: expected.format_version.to_string(),
777            });
778        }
779        if native.sequence != expected.sequence {
780            mismatches.push(NativeHeaderMismatch {
781                field: "sequence",
782                native: native.sequence.to_string(),
783                expected: expected.sequence.to_string(),
784            });
785        }
786        if native.manifest_oldest_root != expected.manifest_oldest_root {
787            mismatches.push(NativeHeaderMismatch {
788                field: "manifest_oldest_root",
789                native: native.manifest_oldest_root.to_string(),
790                expected: expected.manifest_oldest_root.to_string(),
791            });
792        }
793        if native.manifest_root != expected.manifest_root {
794            mismatches.push(NativeHeaderMismatch {
795                field: "manifest_root",
796                native: native.manifest_root.to_string(),
797                expected: expected.manifest_root.to_string(),
798            });
799        }
800        if native.free_set_root != expected.free_set_root {
801            mismatches.push(NativeHeaderMismatch {
802                field: "free_set_root",
803                native: native.free_set_root.to_string(),
804                expected: expected.free_set_root.to_string(),
805            });
806        }
807        if native.collection_root_count != expected.collection_root_count {
808            mismatches.push(NativeHeaderMismatch {
809                field: "collection_root_count",
810                native: native.collection_root_count.to_string(),
811                expected: expected.collection_root_count.to_string(),
812            });
813        }
814        if native.snapshot_count != expected.snapshot_count {
815            mismatches.push(NativeHeaderMismatch {
816                field: "snapshot_count",
817                native: native.snapshot_count.to_string(),
818                expected: expected.snapshot_count.to_string(),
819            });
820        }
821        if native.index_count != expected.index_count {
822            mismatches.push(NativeHeaderMismatch {
823                field: "index_count",
824                native: native.index_count.to_string(),
825                expected: expected.index_count.to_string(),
826            });
827        }
828        if native.catalog_collection_count != expected.catalog_collection_count {
829            mismatches.push(NativeHeaderMismatch {
830                field: "catalog_collection_count",
831                native: native.catalog_collection_count.to_string(),
832                expected: expected.catalog_collection_count.to_string(),
833            });
834        }
835        if native.catalog_total_entities != expected.catalog_total_entities {
836            mismatches.push(NativeHeaderMismatch {
837                field: "catalog_total_entities",
838                native: native.catalog_total_entities.to_string(),
839                expected: expected.catalog_total_entities.to_string(),
840            });
841        }
842        if native.export_count != expected.export_count {
843            mismatches.push(NativeHeaderMismatch {
844                field: "export_count",
845                native: native.export_count.to_string(),
846                expected: expected.export_count.to_string(),
847            });
848        }
849        if native.graph_projection_count != expected.graph_projection_count {
850            mismatches.push(NativeHeaderMismatch {
851                field: "graph_projection_count",
852                native: native.graph_projection_count.to_string(),
853                expected: expected.graph_projection_count.to_string(),
854            });
855        }
856        if native.analytics_job_count != expected.analytics_job_count {
857            mismatches.push(NativeHeaderMismatch {
858                field: "analytics_job_count",
859                native: native.analytics_job_count.to_string(),
860                expected: expected.analytics_job_count.to_string(),
861            });
862        }
863        if native.manifest_event_count != expected.manifest_event_count {
864            mismatches.push(NativeHeaderMismatch {
865                field: "manifest_event_count",
866                native: native.manifest_event_count.to_string(),
867                expected: expected.manifest_event_count.to_string(),
868            });
869        }
870
871        NativeHeaderInspection {
872            native,
873            expected,
874            consistent: mismatches.is_empty(),
875            mismatches,
876        }
877    }
878
879    pub(crate) fn repair_policy_for_inspection(
880        inspection: &NativeHeaderInspection,
881    ) -> NativeHeaderRepairPolicy {
882        if inspection.consistent {
883            return NativeHeaderRepairPolicy::InSync;
884        }
885
886        if inspection.expected.sequence >= inspection.native.sequence {
887            NativeHeaderRepairPolicy::RepairNativeFromMetadata
888        } else {
889            NativeHeaderRepairPolicy::NativeAheadOfMetadata
890        }
891    }
892
893    pub(crate) fn prune_export_registry(&self, exports: &mut Vec<ExportDescriptor>) {
894        let retention = self.options.export_retention.max(1);
895        if exports.len() <= retention {
896            return;
897        }
898
899        exports.sort_by_key(|export| export.created_at_unix_ms);
900        let removed: Vec<ExportDescriptor> =
901            exports.drain(0..(exports.len() - retention)).collect();
902
903        for export in removed {
904            let _ = fs::remove_file(&export.data_path);
905            let _ = fs::remove_file(&export.metadata_path);
906            let binary_path = PhysicalMetadataFile::metadata_binary_path_for(std::path::Path::new(
907                &export.data_path,
908            ));
909            let _ = fs::remove_file(binary_path);
910        }
911    }
912
913    pub(crate) fn runtime_index_catalog(&self) -> IndexCatalog {
914        let mut catalog = IndexCatalog::register_default_vector_graph(
915            self.options.has_capability(Capability::Table),
916            self.options.has_capability(Capability::Graph),
917        );
918        if self.options.has_capability(Capability::FullText) {
919            catalog.register(RuntimeIndexConfig::new(
920                "text-fulltext",
921                IndexKind::FullText,
922            ));
923            catalog.register(RuntimeIndexConfig::new(
924                "document-pathvalue",
925                IndexKind::DocumentPathValue,
926            ));
927        }
928        catalog.register(RuntimeIndexConfig::new(
929            "search-hybrid",
930            IndexKind::HybridSearch,
931        ));
932        catalog
933    }
934
935    pub(crate) fn physical_index_state(&self) -> Vec<PhysicalIndexState> {
936        // Use a lightweight catalog snapshot that does NOT call physical_metadata()
937        // to avoid infinite recursion: physical_metadata → metadata_from_native_state
938        // → physical_index_state → catalog_model_snapshot → physical_metadata → ...
939        let catalog = self.runtime_index_catalog();
940        let snapshot = crate::catalog::snapshot_store_with_declarations(
941            "reddb",
942            self.store.as_ref(),
943            Some(&catalog),
944            None, // No declarations — breaks the recursive cycle
945            None, // No contracts — avoids pulling physical metadata during bootstrap
946        );
947        let mut metrics_by_name = std::collections::BTreeMap::new();
948        for metric in &snapshot.indices {
949            metrics_by_name.insert(metric.name.clone(), metric.clone());
950        }
951
952        let mut states = Vec::new();
953        for collection in snapshot.collections {
954            for index_name in &collection.indices {
955                let metric = metrics_by_name.get(index_name);
956                let kind = metric
957                    .map(|metric| metric.kind)
958                    .unwrap_or_else(|| infer_collection_index_kind(collection.model, index_name));
959                let entries = estimate_index_entries(&collection, kind);
960                states.push(PhysicalIndexState {
961                    name: format!("{}::{}", collection.name, index_name),
962                    kind,
963                    collection: Some(collection.name.clone()),
964                    enabled: metric.map(|metric| metric.enabled).unwrap_or(true),
965                    entries,
966                    estimated_memory_bytes: estimate_index_memory(entries, kind),
967                    last_refresh_ms: metric.and_then(|metric| metric.last_refresh_ms),
968                    backend: index_backend_name(kind).to_string(),
969                    artifact_kind: None,
970                    artifact_root_page: None,
971                    artifact_checksum: None,
972                    build_state: "catalog-derived".to_string(),
973                });
974            }
975        }
976
977        states
978    }
979
980    pub(crate) fn physical_collection_roots(&self) -> BTreeMap<String, u64> {
981        let mut roots = BTreeMap::new();
982
983        for name in self.store.list_collections() {
984            let Some(manager) = self.store.get_collection(&name) else {
985                continue;
986            };
987
988            let stats = manager.stats();
989            let mut root = fnv1a_seed();
990            fnv1a_hash_value(&mut root, &name);
991            fnv1a_hash_value(&mut root, &stats.total_entities);
992            fnv1a_hash_value(&mut root, &stats.growing_count);
993            fnv1a_hash_value(&mut root, &stats.sealed_count);
994            fnv1a_hash_value(&mut root, &stats.archived_count);
995            fnv1a_hash_value(&mut root, &stats.total_memory_bytes);
996            fnv1a_hash_value(&mut root, &stats.seal_ops);
997            fnv1a_hash_value(&mut root, &stats.compact_ops);
998
999            let mut entities = manager.query_all(|_| true);
1000            entities.sort_by_key(|entity| entity.id.raw());
1001
1002            for entity in entities {
1003                fnv1a_hash_value(&mut root, &entity.id.raw());
1004                fnv1a_hash_value(&mut root, &entity.kind);
1005                fnv1a_hash_value(&mut root, &entity.created_at);
1006                fnv1a_hash_value(&mut root, &entity.updated_at);
1007                fnv1a_hash_value(&mut root, &entity.data);
1008                fnv1a_hash_value(&mut root, &entity.sequence_id);
1009                fnv1a_hash_value(&mut root, &entity.embeddings().len());
1010                fnv1a_hash_value(&mut root, &entity.cross_refs().len());
1011            }
1012
1013            roots.insert(name, root);
1014        }
1015
1016        roots
1017    }
1018
1019    // ========================================================================
1020    // Reference Helpers - For Metadata Linking
1021    // ========================================================================
1022
1023    /// Create a reference to a table row
1024    pub fn table_ref(&self, table: impl Into<String>, row_id: u64) -> TableRef {
1025        TableRef::new(table, row_id)
1026    }
1027
1028    /// Create a reference to a graph node
1029    pub fn node_ref(&self, collection: impl Into<String>, node_id: EntityId) -> NodeRef {
1030        NodeRef::new(collection, node_id)
1031    }
1032
1033    /// Create a reference to a vector
1034    pub fn vector_ref(&self, collection: impl Into<String>, vector_id: EntityId) -> VectorRef {
1035        VectorRef::new(collection, vector_id)
1036    }
1037
1038    // ========================================================================
1039    // Query API
1040    // ========================================================================
1041
1042    /// Start building a query
1043    pub fn query(&self) -> QueryBuilder {
1044        QueryBuilder::new(self.store.clone())
1045    }
1046
1047    /// Quick vector similarity search.
1048    ///
1049    /// For collections with >= 100 vectors a lazily-built HNSW index is used
1050    /// for fast approximate nearest-neighbor lookup.  Smaller collections fall
1051    /// back to an exact brute-force scan so that the overhead of building an
1052    /// index is avoided when it would not pay off.
1053    pub fn similar(&self, collection: &str, vector: &[f32], k: usize) -> Vec<SimilarResult> {
1054        if self.store.get_collection(collection).is_none() {
1055            return Vec::new();
1056        }
1057
1058        // Try the HNSW fast path for collections with enough vectors.
1059        if let Some(index) = self.get_or_build_hnsw_index(collection, vector.len()) {
1060            let hnsw = index.read().unwrap_or_else(|e| e.into_inner());
1061            let results = hnsw.search(vector, k);
1062            let mapped = self.hnsw_results_to_similar(collection, &results);
1063            if !mapped.is_empty() {
1064                return mapped;
1065            }
1066        }
1067
1068        // Fallback: brute-force scan (small / mixed-type collections).
1069        self.similar_brute_force(collection, vector, k)
1070    }
1071
1072    /// Brute-force cosine similarity scan (exact results, O(n)).
1073    fn similar_brute_force(
1074        &self,
1075        collection: &str,
1076        vector: &[f32],
1077        k: usize,
1078    ) -> Vec<SimilarResult> {
1079        let manager = match self.store.get_collection(collection) {
1080            Some(m) => m,
1081            None => return Vec::new(),
1082        };
1083
1084        let entities = manager.query_all(|_| true);
1085        let mut results: Vec<SimilarResult> = entities
1086            .iter()
1087            .filter_map(|e| {
1088                let score = match &e.data {
1089                    EntityData::Vector(v) => cosine_similarity(vector, &v.dense),
1090                    _ => e
1091                        .embeddings()
1092                        .iter()
1093                        .map(|emb| cosine_similarity(vector, &emb.vector))
1094                        .fold(0.0f32, f32::max),
1095                };
1096                let distance = (1.0 - score).max(0.0);
1097                if score > 0.0 {
1098                    Some(SimilarResult {
1099                        entity_id: e.id,
1100                        score,
1101                        distance,
1102                        entity: e.clone(),
1103                    })
1104                } else {
1105                    None
1106                }
1107            })
1108            .collect();
1109
1110        results.sort_by(|a, b| {
1111            b.score
1112                .partial_cmp(&a.score)
1113                .unwrap_or(std::cmp::Ordering::Equal)
1114        });
1115        results.truncate(k);
1116        results
1117    }
1118
1119    /// Return (or lazily build) a per-collection HNSW index.
1120    ///
1121    /// Returns `None` when the collection has fewer than 100 dense vectors
1122    /// (the brute-force path is cheaper for tiny collections) or when there
1123    /// is a dimension mismatch with the query vector.
1124    ///
1125    /// The cached index is automatically invalidated when the live entity
1126    /// count in the collection changes, so inserts and deletes are picked up
1127    /// transparently without requiring explicit invalidation calls.
1128    fn get_or_build_hnsw_index(
1129        &self,
1130        collection: &str,
1131        query_dim: usize,
1132    ) -> Option<Arc<RwLock<HnswIndex>>> {
1133        let manager = self.store.get_collection(collection)?;
1134        let live_count = manager.count();
1135
1136        // Fast path: check if a fresh index already exists.
1137        {
1138            let indexes = self
1139                .vector_indexes
1140                .read()
1141                .unwrap_or_else(|e| e.into_inner());
1142            if let Some(cached) = indexes.get(collection) {
1143                if cached.entity_count == live_count {
1144                    return Some(Arc::clone(&cached.index));
1145                }
1146            }
1147        }
1148
1149        // Either no cached index exists or it is stale -- (re)build it.
1150        let entities = manager.query_all(|_| true);
1151
1152        let vectors: Vec<(u64, Vec<f32>)> = entities
1153            .iter()
1154            .filter_map(|e| match &e.data {
1155                EntityData::Vector(v) if !v.dense.is_empty() && v.dense.len() == query_dim => {
1156                    Some((e.id.raw(), v.dense.clone()))
1157                }
1158                _ => None,
1159            })
1160            .collect();
1161
1162        // Only build the HNSW index when there are enough vectors to justify it.
1163        const MIN_VECTORS_FOR_HNSW: usize = 100;
1164        if vectors.len() < MIN_VECTORS_FOR_HNSW {
1165            return None;
1166        }
1167
1168        // Build the HNSW index with cosine distance (matching the brute-force path).
1169        let config = crate::storage::engine::HnswConfig::with_m(16)
1170            .with_metric(crate::storage::engine::DistanceMetric::Cosine)
1171            .with_ef_construction(100)
1172            .with_ef_search(50);
1173        let mut hnsw = HnswIndex::new(query_dim, config);
1174
1175        for (id, vec) in &vectors {
1176            hnsw.insert_with_id(*id, vec.clone());
1177        }
1178
1179        let index = Arc::new(RwLock::new(hnsw));
1180
1181        // Store in the cache (double-check pattern to avoid duplicate builds).
1182        let mut indexes = self
1183            .vector_indexes
1184            .write()
1185            .unwrap_or_else(|e| e.into_inner());
1186        // Re-check under write lock: another thread may have built in the meantime.
1187        if let Some(cached) = indexes.get(collection) {
1188            if cached.entity_count == live_count {
1189                return Some(Arc::clone(&cached.index));
1190            }
1191        }
1192        indexes.insert(
1193            collection.to_string(),
1194            CachedVectorIndex {
1195                index: Arc::clone(&index),
1196                entity_count: live_count,
1197            },
1198        );
1199        Some(index)
1200    }
1201
1202    /// Convert HNSW `DistanceResult`s back to the DevX `SimilarResult` type.
1203    fn hnsw_results_to_similar(
1204        &self,
1205        collection: &str,
1206        results: &[crate::storage::engine::DistanceResult],
1207    ) -> Vec<SimilarResult> {
1208        results
1209            .iter()
1210            .filter_map(|dr| {
1211                let entity_id = EntityId::new(dr.id);
1212                let entity = self.store.get(collection, entity_id)?;
1213                // Cosine distance = 1 - similarity.
1214                let score = (1.0 - dr.distance).max(0.0);
1215                if score > 0.0 {
1216                    Some(SimilarResult {
1217                        entity_id,
1218                        score,
1219                        distance: dr.distance,
1220                        entity,
1221                    })
1222                } else {
1223                    None
1224                }
1225            })
1226            .collect()
1227    }
1228
1229    /// Invalidate the cached HNSW index for a collection.
1230    ///
1231    /// Called after vector inserts / deletes so the next search lazily rebuilds
1232    /// a fresh index that includes the new data.
1233    pub(crate) fn invalidate_vector_index(&self, collection: &str) {
1234        let mut indexes = self
1235            .vector_indexes
1236            .write()
1237            .unwrap_or_else(|e| e.into_inner());
1238        indexes.remove(collection);
1239    }
1240
1241    /// Get entity by ID from any collection
1242    pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
1243        self.store.get_any(id).map(|(_, e)| e)
1244    }
1245
1246    /// Get entity with its collection name
1247    pub fn get_with_collection(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1248        self.store.get_any(id)
1249    }
1250
1251    // ========================================================================
1252    // Batch Operations - Performance
1253    // ========================================================================
1254
1255    /// Batch get multiple entities by ID. More efficient than N individual get() calls.
1256    pub fn batch_get(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1257        ids.iter().map(|id| self.get(*id)).collect()
1258    }
1259
1260    /// Start a batch operation for bulk inserts
1261    pub fn batch(&self) -> BatchBuilder {
1262        BatchBuilder::new(self.store.clone(), self.preprocessors.clone())
1263    }
1264
1265    // ========================================================================
1266    // Preprocessing
1267    // ========================================================================
1268
1269    /// Add a preprocessor hook
1270    pub fn add_preprocessor(&mut self, preprocessor: Box<dyn Preprocessor>) {
1271        let mut preprocessors = self
1272            .preprocessors
1273            .write()
1274            .unwrap_or_else(|poisoned| poisoned.into_inner());
1275        preprocessors.push(Arc::from(preprocessor));
1276    }
1277
1278    // ========================================================================
1279    // Cross-Reference Navigation
1280    // ========================================================================
1281
1282    /// Get all entities linked FROM the given entity
1283    pub fn linked_from(&self, id: EntityId) -> Vec<LinkedEntity> {
1284        self.store
1285            .get_refs_from(id)
1286            .into_iter()
1287            .filter_map(|(target_id, ref_type, collection)| {
1288                self.store
1289                    .get(&collection, target_id)
1290                    .map(|entity| LinkedEntity {
1291                        entity,
1292                        ref_type,
1293                        collection,
1294                    })
1295            })
1296            .collect()
1297    }
1298
1299    /// Get all entities linked TO the given entity
1300    pub fn linked_to(&self, id: EntityId) -> Vec<LinkedEntity> {
1301        self.store
1302            .get_refs_to(id)
1303            .into_iter()
1304            .filter_map(|(source_id, ref_type, collection)| {
1305                self.store
1306                    .get(&collection, source_id)
1307                    .map(|entity| LinkedEntity {
1308                        entity,
1309                        ref_type,
1310                        collection,
1311                    })
1312            })
1313            .collect()
1314    }
1315
1316    /// Get the underlying store (for advanced operations)
1317    pub fn store(&self) -> Arc<UnifiedStore> {
1318        self.store.clone()
1319    }
1320
1321    /// Per-process registry of `vector.turbo` collection state (issue
1322    /// #693). Lazily allocated on first access; the inner map holds an
1323    /// `Arc<TurboCollectionState>` per turbo collection so concurrent
1324    /// SEARCHers and INSERTers can share the in-memory index without
1325    /// holding the outer map lock for the whole operation.
1326    pub(crate) fn turbo_collections(
1327        &self,
1328    ) -> &Arc<
1329        parking_lot::Mutex<
1330            std::collections::HashMap<
1331                String,
1332                Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>,
1333            >,
1334        >,
1335    > {
1336        self.turbo_collections
1337            .get_or_init(|| Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())))
1338    }
1339
1340    /// Look up (or lazily create) the runtime state for a turbo
1341    /// collection. Returns `None` for collections that are not marked
1342    /// `turbo` in the catalog.
1343    ///
1344    /// Lazy creation matters for two reasons: (a) a fresh process
1345    /// reopens a persistent DB without any turbo state in memory; (b)
1346    /// the contract (dim + metric) is read here instead of being
1347    /// stored separately, which keeps the catalog as the single source
1348    /// of truth.
1349    pub(crate) fn turbo_state(
1350        &self,
1351        collection: &str,
1352    ) -> Option<Arc<crate::runtime::vector_turbo_kind::TurboCollectionState>> {
1353        if !crate::runtime::vector_turbo_kind::is_turbo(&self.store, collection) {
1354            return None;
1355        }
1356        let map = self.turbo_collections();
1357        {
1358            let guard = map.lock();
1359            if let Some(state) = guard.get(collection) {
1360                return Some(Arc::clone(state));
1361            }
1362        }
1363        let contract = self.collection_contract(collection)?;
1364        let dim = contract.vector_dimension?;
1365        let metric = contract
1366            .vector_metric
1367            .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine);
1368        let state = Arc::new(
1369            crate::runtime::vector_turbo_kind::TurboCollectionState::new(
1370                dim,
1371                metric,
1372                self.store.pager(),
1373            ),
1374        );
1375        // Issue #674 — derive the `.tv` snapshot path from the
1376        // resolved tier layout if the active layout enables snapshot
1377        // files. `Minimal` (embedded) returns `None` and the state
1378        // never writes a `.tv` for this collection.
1379        if let Some((_, paths)) = self.options.resolve_tiered_layout() {
1380            state.set_snapshot_path(paths.turbo_snapshot_path(collection));
1381        }
1382        let mut guard = map.lock();
1383        let inserted_now = !guard.contains_key(collection);
1384        let entry = guard
1385            .entry(collection.to_string())
1386            .or_insert_with(|| Arc::clone(&state));
1387        let handle = Arc::clone(entry);
1388        drop(guard);
1389        // Issue #673 — first materialisation of this collection's
1390        // state kicks off a background rebuild worker. Non-vector
1391        // traffic is unaffected (it never reaches `turbo_state`);
1392        // vector SEARCH/INSERT observes the readiness flag via
1393        // `wait_until_ready` instead of blocking inline on the
1394        // synchronous lazy populate.
1395        if inserted_now {
1396            let join = crate::runtime::vector_turbo_kind::spawn_background_rebuild(
1397                Arc::clone(&self.store),
1398                collection.to_string(),
1399                Arc::clone(&handle),
1400            );
1401            self.turbo_rebuild_workers.lock().push(join);
1402        }
1403        Some(handle)
1404    }
1405
1406    /// Lazily-initialised ML runtime. First caller wins; subsequent
1407    /// callers observe the same instance. The runtime is created
1408    /// with an in-memory persistence backend by default — production
1409    /// deployments that need durable model state will swap this for
1410    /// a store-backed backend when the persistence integration lands.
1411    pub fn ml_runtime(&self) -> &crate::storage::ml::MlRuntime {
1412        self.ml_runtime.get_or_init(|| {
1413            crate::storage::ml::MlRuntime::in_memory(std::sync::Arc::new(
1414                // No-op work fn — the SQL scalars call predict
1415                // synchronously; async training jobs come in a later
1416                // sprint alongside `CREATE MODEL`.
1417                |_handle| Ok(String::new()),
1418            ))
1419        })
1420    }
1421
1422    /// Shared semantic cache for `SEMANTIC_CACHE_*` scalars. Uses
1423    /// default config until a `SET SEMANTIC_CACHE` admin surface
1424    /// lands.
1425    pub fn semantic_cache(&self) -> &Arc<crate::storage::ml::SemanticCache> {
1426        self.semantic_cache.get_or_init(|| {
1427            Arc::new(crate::storage::ml::SemanticCache::new(
1428                crate::storage::ml::SemanticCacheConfig::default(),
1429            ))
1430        })
1431    }
1432
1433    /// Lazily-initialised hypertable registry. Populated by `CREATE
1434    /// HYPERTABLE` DDL; consumed by chunk routing + retention.
1435    pub fn hypertables(&self) -> &Arc<crate::storage::timeseries::HypertableRegistry> {
1436        self.hypertables
1437            .get_or_init(|| Arc::new(crate::storage::timeseries::HypertableRegistry::new()))
1438    }
1439
1440    /// Lazily-initialised continuous-aggregate engine. Populated by
1441    /// `CA_REGISTER` and read by `CA_REFRESH` / `CA_STATE` scalars.
1442    pub fn continuous_aggregates(
1443        &self,
1444    ) -> &Arc<crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine> {
1445        self.continuous_aggregates.get_or_init(|| {
1446            Arc::new(
1447                crate::storage::timeseries::continuous_aggregate::ContinuousAggregateEngine::new(),
1448            )
1449        })
1450    }
1451
1452    pub(crate) fn is_binary_dump(path: &Path) -> Result<bool, std::io::Error> {
1453        let mut file = File::open(path)?;
1454        let mut magic = [0u8; 4];
1455        let read = file.read(&mut magic)?;
1456        Ok(read == 4 && reddb_file::native_store_magic_matches(&magic))
1457    }
1458}