modelvault_core/db/
read.rs1use std::marker::PhantomData;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::catalog::Catalog;
8use crate::config::OpenRecoveryInfo;
9use crate::error::{DbError, SchemaError};
10use crate::index::IndexState;
11use crate::record::{RowValue, ScalarValue};
12use crate::schema::CollectionId;
13use crate::storage::Store;
14
15use super::{validate_subset_model, Collection, Database, LatestMap};
16
17impl<S: Store> Database<S> {
18 pub(crate) fn live_shared_snapshot(
19 &self,
20 ) -> Result<Arc<super::handle_registry::SharedDbState>, DbError> {
21 let shared = self.shared_mirror.as_ref().ok_or_else(|| {
22 DbError::Io(std::io::Error::other(
23 "read-only attached handle missing shared state",
24 ))
25 })?;
26 let g = shared
27 .read()
28 .map_err(|_| DbError::Io(std::io::Error::other("shared database lock poisoned")))?;
29 Ok(Arc::clone(&*g))
30 }
31
32 pub(crate) fn with_live_snapshot<R>(
33 &self,
34 f: impl FnOnce(&Catalog, &IndexState, &LatestMap) -> R,
35 ) -> Result<R, DbError> {
36 if self.read_only_attached {
37 let state = self.live_shared_snapshot()?;
38 return Ok(f(&state.catalog, &state.indexes, &state.latest));
39 }
40 if let Some(st) = &self.txn_staging {
41 Ok(f(&st.shadow_catalog, &st.shadow_indexes, &st.shadow_latest))
42 } else {
43 Ok(f(&self.catalog, &self.indexes, &self.latest))
44 }
45 }
46
47 pub(crate) fn catalog_for_read(&self) -> &Catalog {
48 if let Some(st) = &self.txn_staging {
49 &st.shadow_catalog
50 } else {
51 &self.catalog
52 }
53 }
54
55 pub(crate) fn indexes_for_read(&self) -> &IndexState {
56 if let Some(st) = &self.txn_staging {
57 &st.shadow_indexes
58 } else {
59 &self.indexes
60 }
61 }
62
63 pub(crate) fn latest_for_read(&self) -> &LatestMap {
64 if let Some(st) = &self.txn_staging {
65 &st.shadow_latest
66 } else {
67 &self.latest
68 }
69 }
70
71 pub fn path(&self) -> &Path {
73 &self.path
74 }
75
76 pub fn recovery_info(&self) -> &OpenRecoveryInfo {
78 &self.recovery_info
79 }
80
81 pub fn verify_index_consistency(&self) -> Result<(), DbError> {
83 self.with_live_snapshot(|catalog, indexes, latest| {
84 super::verify_indexes_match_rows(catalog, latest, indexes)
85 })?
86 }
87
88 pub fn catalog(&self) -> &Catalog {
93 self.catalog_for_read()
94 }
95
96 pub fn snapshot_catalog(&self) -> Catalog {
98 self.with_live_snapshot(|c, _, _| c.clone())
99 .expect("live snapshot")
100 }
101
102 pub fn collection_names(&self) -> Vec<String> {
104 self.with_live_snapshot(|catalog, _, _| catalog.collection_names())
105 .expect("live snapshot")
106 }
107
108 pub fn index_state(&self) -> &IndexState {
113 self.indexes_for_read()
114 }
115
116 pub fn snapshot_index_state(&self) -> IndexState {
118 self.with_live_snapshot(|_, indexes, _| indexes.clone())
119 .expect("live snapshot")
120 }
121
122 pub fn query(
124 &self,
125 q: &crate::query::Query,
126 ) -> Result<Vec<std::collections::BTreeMap<String, RowValue>>, DbError> {
127 self.with_live_snapshot(|catalog, indexes, latest| {
128 crate::query::execute_query(catalog, indexes, latest, q)
129 })?
130 }
131
132 pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
134 self.with_live_snapshot(|catalog, _, _| crate::query::explain_query(catalog, q))?
135 }
136
137 pub fn query_iter(
142 &self,
143 q: &crate::query::Query,
144 ) -> Result<crate::query::QueryRowIter<'_>, DbError> {
145 if self.read_only_attached {
146 let snapshot = self.live_shared_snapshot()?;
147 return crate::query::execute_query_iter_owned(snapshot, q, Some(self.path.as_path()));
148 }
149 crate::query::execute_query_iter_with_spill_path(
150 self.catalog_for_read(),
151 self.indexes_for_read(),
152 self.latest_for_read(),
153 q,
154 Some(self.path.as_path()),
155 )
156 }
157
158 pub fn collection<'a, T: crate::schema::DbModel>(
160 &'a self,
161 ) -> Result<Collection<'a, S, T>, DbError> {
162 let cid = self.collection_id_named(T::collection_name())?;
163 self.with_live_snapshot(|catalog, _, _| {
164 let col = catalog
165 .get(cid)
166 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
167 id: cid.0,
168 }))?;
169 validate_subset_model::<T>(col)?;
170 Ok(Collection {
171 db: self,
172 collection_id: cid,
173 _marker: PhantomData,
174 })
175 })?
176 }
177
178 pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
182 self.with_live_snapshot(|catalog, _, _| {
183 catalog
184 .lookup_name(name)
185 .ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
186 name: name.trim().to_string(),
187 }))
188 })?
189 }
190
191 pub fn get(
195 &self,
196 collection_id: CollectionId,
197 pk: &ScalarValue,
198 ) -> Result<Option<std::collections::BTreeMap<String, RowValue>>, DbError> {
199 self.with_live_snapshot(|catalog, _, latest| {
200 let col = catalog.get(collection_id).ok_or(DbError::Schema(
201 SchemaError::UnknownCollection {
202 id: collection_id.0,
203 },
204 ))?;
205 let pk_name =
206 col.primary_field
207 .as_deref()
208 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
209 collection_id: collection_id.0,
210 }))?;
211 let pk_ty = col
212 .fields
213 .iter()
214 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
215 .map(|f| &f.ty)
216 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
217 name: pk_name.to_string(),
218 }))?;
219 if !pk.ty_matches(pk_ty) {
220 return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
221 collection_id: collection_id.0,
222 }));
223 }
224 let key = (collection_id.0, pk.canonical_key_bytes());
225 Ok(latest.get(&key).cloned())
226 })?
227 }
228}