use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use crate::catalog::Catalog;
use crate::config::OpenRecoveryInfo;
use crate::error::{DbError, SchemaError};
use crate::index::IndexState;
use crate::record::{RowValue, ScalarValue};
use crate::schema::CollectionId;
use crate::storage::Store;
use super::{validate_subset_model, Collection, Database, LatestMap};
impl<S: Store> Database<S> {
pub(crate) fn live_shared_snapshot(
&self,
) -> Result<Arc<super::handle_registry::SharedDbState>, DbError> {
let shared = self.shared_mirror.as_ref().ok_or_else(|| {
DbError::Io(std::io::Error::other(
"read-only attached handle missing shared state",
))
})?;
let g = shared
.read()
.map_err(|_| DbError::Io(std::io::Error::other("shared database lock poisoned")))?;
Ok(Arc::clone(&*g))
}
pub(crate) fn with_live_snapshot<R>(
&self,
f: impl FnOnce(&Catalog, &IndexState, &LatestMap) -> R,
) -> Result<R, DbError> {
if self.read_only_attached {
let state = self.live_shared_snapshot()?;
return Ok(f(&state.catalog, &state.indexes, &state.latest));
}
if let Some(st) = &self.txn_staging {
Ok(f(&st.shadow_catalog, &st.shadow_indexes, &st.shadow_latest))
} else {
Ok(f(&self.catalog, &self.indexes, &self.latest))
}
}
pub(crate) fn catalog_for_read(&self) -> &Catalog {
if let Some(st) = &self.txn_staging {
&st.shadow_catalog
} else {
&self.catalog
}
}
pub(crate) fn indexes_for_read(&self) -> &IndexState {
if let Some(st) = &self.txn_staging {
&st.shadow_indexes
} else {
&self.indexes
}
}
pub(crate) fn latest_for_read(&self) -> &LatestMap {
if let Some(st) = &self.txn_staging {
&st.shadow_latest
} else {
&self.latest
}
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn recovery_info(&self) -> &OpenRecoveryInfo {
&self.recovery_info
}
pub fn verify_index_consistency(&self) -> Result<(), DbError> {
self.with_live_snapshot(|catalog, indexes, latest| {
super::verify_indexes_match_rows(catalog, latest, indexes)
})?
}
pub fn catalog(&self) -> &Catalog {
self.catalog_for_read()
}
pub fn snapshot_catalog(&self) -> Catalog {
self.with_live_snapshot(|c, _, _| c.clone())
.expect("live snapshot")
}
pub fn collection_names(&self) -> Vec<String> {
self.with_live_snapshot(|catalog, _, _| catalog.collection_names())
.expect("live snapshot")
}
pub fn index_state(&self) -> &IndexState {
self.indexes_for_read()
}
pub fn snapshot_index_state(&self) -> IndexState {
self.with_live_snapshot(|_, indexes, _| indexes.clone())
.expect("live snapshot")
}
pub fn query(
&self,
q: &crate::query::Query,
) -> Result<Vec<std::collections::BTreeMap<String, RowValue>>, DbError> {
self.with_live_snapshot(|catalog, indexes, latest| {
crate::query::execute_query(catalog, indexes, latest, q)
})?
}
pub fn explain_query(&self, q: &crate::query::Query) -> Result<String, DbError> {
self.with_live_snapshot(|catalog, _, _| crate::query::explain_query(catalog, q))?
}
pub fn query_iter(
&self,
q: &crate::query::Query,
) -> Result<crate::query::QueryRowIter<'_>, DbError> {
if self.read_only_attached {
let snapshot = self.live_shared_snapshot()?;
return crate::query::execute_query_iter_owned(snapshot, q, Some(self.path.as_path()));
}
crate::query::execute_query_iter_with_spill_path(
self.catalog_for_read(),
self.indexes_for_read(),
self.latest_for_read(),
q,
Some(self.path.as_path()),
)
}
pub fn collection<'a, T: crate::schema::DbModel>(
&'a self,
) -> Result<Collection<'a, S, T>, DbError> {
let cid = self.collection_id_named(T::collection_name())?;
self.with_live_snapshot(|catalog, _, _| {
let col = catalog
.get(cid)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: cid.0,
}))?;
validate_subset_model::<T>(col)?;
Ok(Collection {
db: self,
collection_id: cid,
_marker: PhantomData,
})
})?
}
pub fn collection_id_named(&self, name: &str) -> Result<CollectionId, DbError> {
self.with_live_snapshot(|catalog, _, _| {
catalog
.lookup_name(name)
.ok_or(DbError::Schema(SchemaError::UnknownCollectionName {
name: name.trim().to_string(),
}))
})?
}
pub fn get(
&self,
collection_id: CollectionId,
pk: &ScalarValue,
) -> Result<Option<std::collections::BTreeMap<String, RowValue>>, DbError> {
self.with_live_snapshot(|catalog, _, latest| {
let col = catalog.get(collection_id).ok_or(DbError::Schema(
SchemaError::UnknownCollection {
id: collection_id.0,
},
))?;
let pk_name =
col.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: collection_id.0,
}))?;
let pk_ty = col
.fields
.iter()
.find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
.map(|f| &f.ty)
.ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
name: pk_name.to_string(),
}))?;
if !pk.ty_matches(pk_ty) {
return Err(DbError::Schema(SchemaError::PrimaryKeyTypeMismatch {
collection_id: collection_id.0,
}));
}
let key = (collection_id.0, pk.canonical_key_bytes());
Ok(latest.get(&key).cloned())
})?
}
}