Skip to main content

modelvault_core/db/
read.rs

1//! Read paths: queries, catalog views, and collection handles.
2
3use std::marker::PhantomData;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::catalog::Catalog;
8use crate::config::OpenRecoveryInfo;
9use crate::error::{DbError, SchemaError};
10use crate::index::IndexState;
11use crate::record::{RowValue, ScalarValue};
12use crate::schema::CollectionId;
13use crate::storage::Store;
14
15use super::{validate_subset_model, Collection, Database, LatestMap};
16
17impl<S: Store> Database<S> {
18    pub(crate) fn live_shared_snapshot(
19        &self,
20    ) -> Result<Arc<super::handle_registry::SharedDbState>, DbError> {
21        let shared = self.shared_mirror.as_ref().ok_or_else(|| {
22            DbError::Io(std::io::Error::other(
23                "read-only attached handle missing shared state",
24            ))
25        })?;
26        let g = shared
27            .read()
28            .map_err(|_| DbError::Io(std::io::Error::other("shared database lock poisoned")))?;
29        Ok(Arc::clone(&*g))
30    }
31
32    pub(crate) fn with_live_snapshot<R>(
33        &self,
34        f: impl FnOnce(&Catalog, &IndexState, &LatestMap) -> R,
35    ) -> Result<R, DbError> {
36        if self.read_only_attached {
37            let state = self.live_shared_snapshot()?;
38            return Ok(f(&state.catalog, &state.indexes, &state.latest));
39        }
40        if let Some(st) = &self.txn_staging {
41            Ok(f(&st.shadow_catalog, &st.shadow_indexes, &st.shadow_latest))
42        } else {
43            Ok(f(&self.catalog, &self.indexes, &self.latest))
44        }
45    }
46
47    pub(crate) fn catalog_for_read(&self) -> &Catalog {
48        if let Some(st) = &self.txn_staging {
49            &st.shadow_catalog
50        } else {
51            &self.catalog
52        }
53    }
54
55    pub(crate) fn indexes_for_read(&self) -> &IndexState {
56        if let Some(st) = &self.txn_staging {
57            &st.shadow_indexes
58        } else {
59            &self.indexes
60        }
61    }
62
63    pub(crate) fn latest_for_read(&self) -> &LatestMap {
64        if let Some(st) = &self.txn_staging {
65            &st.shadow_latest
66        } else {
67            &self.latest
68        }
69    }
70
71    /// Path passed to [`Database::open`](super::Database::<crate::storage::FileStore>::open), or `":memory:"` for [`crate::storage::VecStore`].
72    pub fn path(&self) -> &Path {
73        &self.path
74    }
75
76    /// Recovery metadata from the most recent open (truncation, etc.).
77    pub fn recovery_info(&self) -> &OpenRecoveryInfo {
78        &self.recovery_info
79    }
80
81    /// Rebuild secondary indexes from `latest` rows and compare to replayed index state.
82    pub fn verify_index_consistency(&self) -> Result<(), DbError> {
83        self.with_live_snapshot(|catalog, indexes, latest| {
84            super::verify_indexes_match_rows(catalog, latest, indexes)
85        })?
86    }
87
88    /// Read-only view of the schema catalog built from `Schema` segments.
89    ///
90    /// On same-process attached read-only handles, prefer [`Self::collection_names`] and
91    /// [`Self::collection_id_named`] for live metadata; this borrows the open-time snapshot.
92    pub fn catalog(&self) -> &Catalog {
93        self.catalog_for_read()
94    }
95
96    /// Clone of the live schema catalog (always current on attached read-only handles).
97    pub fn snapshot_catalog(&self) -> Catalog {
98        self.with_live_snapshot(|c, _, _| c.clone())
99            .expect("live snapshot")
100    }
101
102    /// All registered collection names in lexicographic order.
103    pub fn collection_names(&self) -> Vec<String> {
104        self.with_live_snapshot(|catalog, _, _| catalog.collection_names())
105            .expect("live snapshot")
106    }
107
108    /// Read-only access to the in-memory secondary index state (rebuilt from `Index` segments).
109    ///
110    /// On same-process attached read-only handles this returns the index state captured at open;
111    /// use [`Self::snapshot_index_state`] for a live clone after writer updates.
112    pub fn index_state(&self) -> &IndexState {
113        self.indexes_for_read()
114    }
115
116    /// Clone of the live secondary index state (always current on attached read-only handles).
117    pub fn snapshot_index_state(&self) -> IndexState {
118        self.with_live_snapshot(|_, indexes, _| indexes.clone())
119            .expect("live snapshot")
120    }
121
122    /// Execute a query against the current in-memory snapshot of the database.
123    pub fn query(
124        &self,
125        q: &crate::query::Query,
126    ) -> Result<Vec<std::collections::BTreeMap<String, RowValue>>, DbError> {
127        self.with_live_snapshot(|catalog, indexes, latest| {
128            crate::query::execute_query(catalog, indexes, latest, q)
129        })?
130    }
131
132    /// Return a human-readable explanation of the chosen plan for `q`.
133    pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
134        self.with_live_snapshot(|catalog, _, _| crate::query::explain_query(catalog, q))?
135    }
136
137    /// Lazy iterator over query rows (same semantics as [`Self::query`]).
138    ///
139    /// See [`crate::query::QueryRowIter`] — this is the v0.7 pull-based execution boundary, not a
140    /// full operator graph.
141    pub fn query_iter(
142        &self,
143        q: &crate::query::Query,
144    ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
145        if self.read_only_attached {
146            let snapshot = self.live_shared_snapshot()?;
147            return crate::query::execute_query_iter_owned(snapshot, q, Some(self.path.as_path()));
148        }
149        crate::query::execute_query_iter_with_spill_path(
150            self.catalog_for_read(),
151            self.indexes_for_read(),
152            self.latest_for_read(),
153            q,
154            Some(self.path.as_path()),
155        )
156    }
157
158    /// Typed handle over a registered collection; `T` may be a *subset model*.
159    pub fn collection<'a, T: crate::schema::DbModel>(
160        &'a self,
161    ) -> Result<Collection<'a, S, T>, DbError> {
162        let cid = self.collection_id_named(T::collection_name())?;
163        self.with_live_snapshot(|catalog, _, _| {
164            let col = catalog
165                .get(cid)
166                .ok_or(DbError::Schema(SchemaError::UnknownCollection {
167                    id: cid.0,
168                }))?;
169            validate_subset_model::<T>(col)?;
170            Ok(Collection {
171                db: self,
172                collection_id: cid,
173                _marker: PhantomData,
174            })
175        })?
176    }
177
178    /// Look up [`CollectionId`] by collection name (leading/trailing whitespace trimmed).
179    ///
180    /// Returns [`SchemaError::UnknownCollectionName`] when the name is not registered.
181    pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
182        self.with_live_snapshot(|catalog, _, _| {
183            catalog
184                .lookup_name(name)
185                .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
186                    name: name.trim().to_string(),
187                }))
188        })?
189    }
190
191    /// Return the latest stored row for `pk`, or `None` if no insert has been replayed for that key.
192    ///
193    /// `pk` must match the declared primary field’s [`crate::schema::Type`].
194    pub fn get(
195        &self,
196        collection_id: CollectionId,
197        pk: &ScalarValue,
198    ) -> Result<Option<std::collections::BTreeMap<String, RowValue>>, DbError> {
199        self.with_live_snapshot(|catalog, _, latest| {
200            let col = catalog.get(collection_id).ok_or(DbError::Schema(
201                SchemaError::UnknownCollection {
202                    id: collection_id.0,
203                },
204            ))?;
205            let pk_name =
206                col.primary_field
207                    .as_deref()
208                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
209                        collection_id: collection_id.0,
210                    }))?;
211            let pk_ty = col
212                .fields
213                .iter()
214                .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
215                .map(|f| &f.ty)
216                .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
217                    name: pk_name.to_string(),
218                }))?;
219            if !pk.ty_matches(pk_ty) {
220                return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
221                    collection_id: collection_id.0,
222                }));
223            }
224            let key = (collection_id.0, pk.canonical_key_bytes());
225            Ok(latest.get(&key).cloned())
226        })?
227    }
228}