pub mod adapters;
pub mod edge;
pub mod error;
pub mod object;
#[doc(hidden)]
pub fn __msgpack_serialize<T: serde::Serialize>(v: &T) -> Vec<u8> {
rmp_serde::to_vec_named(v).expect("msgpack serialization failed")
}
pub mod query;
#[cfg(feature = "ledger")]
pub use ledger;
use metrics::histogram;
use std::sync::Arc;
use std::time::Instant;
pub use crate::adapters::{
Adapter, EdgeRecord, MultiEdgeContext, MultiOwnedContext, MultiPreloadContext, ObjectRecord,
Query, QueryContext,
};
pub use crate::edge::meta::*;
pub use crate::edge::query::EdgeQuery;
pub use crate::edge::traits::*;
pub use crate::error::Error;
pub use crate::object::*;
use crate::query::{IndexKind, QueryFilter};
use chrono::Utc;
pub use query::IndexQuery;
use uuid::Uuid;
#[cfg(feature = "derive")]
pub use ousia_derive::*;
pub struct ReplicaConfig {
pub url: String,
}
#[derive(Clone)]
pub struct Engine {
inner: Arc<Ousia>,
}
pub struct Ousia {
adapter: Box<dyn Adapter>,
#[cfg(feature = "ledger")]
ledger: Option<Arc<dyn ledger::LedgerAdapter>>,
}
impl Engine {
pub fn new(adapter: Box<dyn Adapter>) -> Self {
#[cfg(feature = "ledger")]
let ledger = adapter.ledger_adapter();
Self {
inner: Arc::new(Ousia {
adapter: adapter,
#[cfg(feature = "ledger")]
ledger,
}),
}
}
pub async fn create_object<T: Object>(&self, obj: &T) -> Result<(), Error> {
if !T::HAS_UNIQUE_FIELDS {
self.inner
.adapter
.insert_object(ObjectRecord::from_object(obj))
.await?;
} else {
let unique_hashes = obj.derive_unique_hashes();
self.inner
.adapter
.insert_unique_hashes(obj.type_name(), obj.id(), unique_hashes)
.await?;
self.inner
.adapter
.insert_object(ObjectRecord::from_object(obj))
.await?;
}
Ok(())
}
pub async fn fetch_object<T: Object>(&self, id: Uuid) -> Result<Option<T>, Error> {
let val = self.inner.adapter.fetch_object(T::TYPE, id).await?;
match val {
Some(record) => record.to_object().map(Some),
None => Ok(None),
}
}
pub async fn fetch_objects<T: Object>(&self, ids: Vec<Uuid>) -> Result<Vec<T>, Error> {
let records = self.inner.adapter.fetch_bulk_objects(T::TYPE, ids).await?;
records.into_iter().map(|r| r.to_object()).collect()
}
pub async fn fetch_objects_batch(
&self,
pairs: Vec<(&'static str, Vec<Uuid>)>,
) -> Result<Vec<ObjectRecord>, Error> {
self.inner.adapter.fetch_objects_batch(pairs).await
}
pub async fn update_object<T: Object>(&self, obj: &mut T) -> Result<(), Error> {
let meta = obj.meta_mut();
meta.updated_at = Utc::now();
if !T::HAS_UNIQUE_FIELDS {
self.inner
.adapter
.update_object(ObjectRecord::from_object(obj))
.await?;
} else {
let object_id = obj.id();
let type_name = obj.type_name();
let old_hashes = self.inner.adapter.get_hashes_for_object(object_id).await?;
let new_hashes = obj.derive_unique_hashes();
let hashes_to_add: Vec<_> = new_hashes
.iter()
.filter(|(hash, _)| !old_hashes.contains(hash))
.cloned()
.collect();
let hashes_to_remove: Vec<_> = old_hashes
.iter()
.filter(|hash| !new_hashes.iter().any(|(h, _)| h == *hash))
.cloned()
.collect();
if hashes_to_add.is_empty() && hashes_to_remove.is_empty() {
self.inner
.adapter
.update_object(ObjectRecord::from_object(obj))
.await?;
} else {
if !hashes_to_add.is_empty() {
self.inner
.adapter
.insert_unique_hashes(
type_name,
object_id,
hashes_to_add.iter().cloned().collect(),
)
.await?;
}
match self
.inner
.adapter
.update_object(ObjectRecord::from_object(obj))
.await
{
Ok(_) => (),
Err(err) => {
if !hashes_to_add.is_empty() {
let hashes = hashes_to_add
.into_iter()
.map(|(hash, _)| hash)
.collect::<Vec<String>>();
self.inner.adapter.delete_unique_hashes(hashes).await?;
}
return Err(err);
}
}
if !hashes_to_remove.is_empty() {
for hash in hashes_to_remove {
self.inner.adapter.delete_unique(&hash).await?;
}
}
}
}
Ok(())
}
pub async fn delete_object<T: Object>(
&self,
id: Uuid,
owner: Uuid,
) -> Result<Option<T>, Error> {
if T::HAS_UNIQUE_FIELDS {
let hashes = self.inner.adapter.get_hashes_for_object(id).await?;
if !hashes.is_empty() {
self.inner.adapter.delete_unique_hashes(hashes).await?;
}
}
let record = self.inner.adapter.delete_object(T::TYPE, id, owner).await?;
match record {
Some(r) => r.to_object().map(Some),
None => Ok(None),
}
}
pub async fn delete_objects<T: Object>(
&self,
ids: Vec<Uuid>,
owner: Uuid,
) -> Result<u64, Error> {
if T::HAS_UNIQUE_FIELDS && !ids.is_empty() {
let hashes = self
.inner
.adapter
.get_hashes_for_objects(ids.clone())
.await?;
if !hashes.is_empty() {
self.inner.adapter.delete_unique_hashes(hashes).await?;
}
}
self.inner
.adapter
.delete_bulk_objects(T::TYPE, ids, owner)
.await
}
pub async fn delete_owned_objects<T: Object>(&self, owner: Uuid) -> Result<u64, Error> {
let record = self
.inner
.adapter
.delete_owned_objects(T::TYPE, owner)
.await?;
Ok(record)
}
pub async fn transfer_object<T: Object>(
&self,
id: Uuid,
from_owner: Uuid,
to_owner: Uuid,
) -> Result<T, Error> {
let record = self
.inner
.adapter
.transfer_object(T::TYPE, id, from_owner, to_owner)
.await?;
record.to_object()
}
pub async fn find_object<T: Object>(
&self,
filters: &[QueryFilter],
) -> Result<Option<T>, Error> {
let record = self
.inner
.adapter
.find_object(T::TYPE, SYSTEM_OWNER, filters)
.await?;
match record {
Some(r) => r.to_object().map(Some),
None => Ok(None),
}
}
pub async fn find_object_with_owner<T: Object>(
&self,
owner: Uuid,
filters: &[QueryFilter],
) -> Result<Option<T>, Error> {
let record = self
.inner
.adapter
.find_object(T::TYPE, owner, filters)
.await?;
match record {
Some(r) => r.to_object().map(Some),
None => Ok(None),
}
}
pub async fn query_objects<T: Object>(&self, query: Query) -> Result<Vec<T>, Error> {
let start = Instant::now();
let records = self.inner.adapter.query_objects(T::TYPE, query).await?;
histogram!("ousia.query.duration_ms",
"type" => T::TYPE
)
.record(start.elapsed().as_millis() as f64);
records.into_iter().map(|r| r.to_object()).collect()
}
pub async fn count_objects<T: Object>(&self, query: Option<Query>) -> Result<u64, Error> {
self.inner.adapter.count_objects(T::TYPE, query).await
}
pub async fn fetch_owned_objects<T: Object>(&self, owner: Uuid) -> Result<Vec<T>, Error> {
let records = self
.inner
.adapter
.fetch_owned_objects(T::TYPE, owner)
.await?;
records.into_iter().map(|r| r.to_object()).collect()
}
pub async fn fetch_owned_object<T: Object>(&self, owner: Uuid) -> Result<Option<T>, Error> {
let record = self
.inner
.adapter
.fetch_owned_object(T::TYPE, owner)
.await?;
match record {
Some(r) => r.to_object().map(Some),
None => Ok(None),
}
}
pub async fn fetch_union_object<A: Object, B: Object>(
&self,
id: Uuid,
) -> Result<Option<Union<A, B>>, Error> {
let record = self
.inner
.adapter
.fetch_union_object(A::TYPE, B::TYPE, id)
.await?;
match record {
Some(r) => Ok(Some(r.into())),
None => Ok(None),
}
}
pub async fn fetch_union_objects<A: Object, B: Object>(
&self,
id: Vec<Uuid>,
) -> Result<Vec<Union<A, B>>, Error> {
let records = self
.inner
.adapter
.fetch_union_objects(A::TYPE, B::TYPE, id)
.await?;
records.into_iter().map(|r| Ok(r.into())).collect()
}
pub async fn fetch_owned_union_object<A: Object, B: Object>(
&self,
owner: Uuid,
) -> Result<Option<Union<A, B>>, Error> {
let record = self
.inner
.adapter
.fetch_owned_union_object(A::TYPE, B::TYPE, owner)
.await?;
match record {
Some(r) => Ok(Some(r.into())),
None => Ok(None),
}
}
pub async fn fetch_owned_union_objects<A: Object, B: Object>(
&self,
owner: Uuid,
) -> Result<Vec<Union<A, B>>, Error> {
let records = self
.inner
.adapter
.fetch_owned_union_objects(A::TYPE, B::TYPE, owner)
.await?;
records.into_iter().map(|r| Ok(r.into())).collect()
}
pub async fn create_edge<E: Edge>(&self, edge: &E) -> Result<(), Error> {
self.inner
.adapter
.insert_edge(EdgeRecord::from_edge(edge))
.await
}
pub async fn update_edge<E: Edge>(&self, edge: &mut E, to: Option<Uuid>) -> Result<(), Error> {
let old_link_id = edge.to();
if let Some(to) = to {
edge.meta_mut().to = to;
}
let _ = self
.inner
.adapter
.update_edge(EdgeRecord::from_edge(edge), old_link_id, to)
.await?;
Ok(())
}
pub async fn delete_edge<E: Edge>(&self, from: Uuid, to: Uuid) -> Result<(), Error> {
self.inner.adapter.delete_edge(E::TYPE, from, to).await
}
pub async fn delete_object_edge<E: Edge>(&self, from: Uuid) -> Result<(), Error> {
self.inner.adapter.delete_object_edge(E::TYPE, from).await
}
pub async fn fetch_edge<E: Edge>(&self, from: Uuid, to: Uuid) -> Result<Option<E>, Error> {
let edge_record = self.inner.adapter.fetch_edge(E::TYPE, from, to).await?;
let Some(edge_record) = edge_record else {
return Ok(None);
};
edge_record.to_edge().map(|edge| Some(edge))
}
pub async fn query_edges<E: Edge>(
&self,
from: Uuid,
query: EdgeQuery,
) -> Result<Vec<E>, Error> {
let start = Instant::now();
let records = self.inner.adapter.query_edges(E::TYPE, from, query).await?;
histogram!("ousia.query_edges.duration_ms",
"type" => E::TYPE
)
.record(start.elapsed().as_millis() as f64);
records.into_iter().map(|r| r.to_edge()).collect()
}
pub async fn query_reverse_edges<E: Edge>(
&self,
to: Uuid,
query: EdgeQuery,
) -> Result<Vec<E>, Error> {
let start = Instant::now();
let records = self
.inner
.adapter
.query_reverse_edges(E::TYPE, to, query)
.await?;
histogram!("ousia.query_edges.duration_ms",
"type" => E::TYPE
)
.record(start.elapsed().as_millis() as f64);
records.into_iter().map(|r| r.to_edge()).collect()
}
pub async fn count_edges<E: Edge>(
&self,
from: Uuid,
query: Option<EdgeQuery>,
) -> Result<u64, Error> {
self.inner.adapter.count_edges(E::TYPE, from, query).await
}
pub async fn count_reverse_edges<E: Edge>(
&self,
to: Uuid,
query: Option<EdgeQuery>,
) -> Result<u64, Error> {
self.inner
.adapter
.count_reverse_edges(E::TYPE, to, query)
.await
}
pub async fn counter_value(&self, key: String) -> u64 {
self.inner.adapter.sequence_value(key).await
}
pub async fn counter_next_value(&self, key: String) -> u64 {
self.inner.adapter.sequence_next_value(key).await
}
pub fn preload_object<'a, T: Object>(&'a self, id: Uuid) -> QueryContext<'a, T> {
self.inner.adapter.preload_object(id)
}
pub fn preload_objects<'a, P: Object>(&'a self, query: Query) -> MultiPreloadContext<'a, P> {
self.inner.adapter.preload_objects(query)
}
pub async fn check_schema<T: Object + IndexQuery>(&self) -> Result<(), Error> {
let current_hash = compute_schema_hash::<T>();
let current_prefix = schema_version_prefix();
let current_entry = format!("{}:{}", current_prefix, current_hash);
let stored = self.inner.adapter.read_schema_hash(T::TYPE).await?;
match stored {
None => {
self.inner
.adapter
.upsert_schema_hash(T::TYPE, ¤t_entry)
.await?;
}
Some(ref s) if s == ¤t_entry => {}
Some(stored_entry) => {
let (stored_major, stored_hash) = parse_schema_entry(&stored_entry);
let current_major = current_prefix
.split('.')
.next()
.unwrap_or("0");
if stored_major != current_major {
return Err(Error::SchemaMigrationRequired(format!(
"type '{}': stored schema version '{}' is incompatible with current \
version prefix '{}'. Run a manual migration then update ousia_meta.",
T::TYPE, stored_major, current_major
)));
}
eprintln!(
"[ousia warn] Schema drift detected for '{}': \
index structure changed (stored: {}…, current: {}…). \
Rows written before this change will not appear in queries on new indexes.",
T::TYPE,
&stored_hash[..stored_hash.len().min(12)],
¤t_hash[..current_hash.len().min(12)],
);
self.inner
.adapter
.upsert_schema_hash(T::TYPE, ¤t_entry)
.await?;
}
}
Ok(())
}
#[cfg(feature = "ledger")]
pub fn ledger(&self) -> &Arc<dyn ledger::LedgerAdapter> {
let ledger = self
.inner
.ledger
.as_ref()
.expect("This adapter does not support the ledger. Use PostgresAdapter.");
ledger
}
#[cfg(feature = "ledger")]
pub fn ledger_ctx(&self) -> ledger::LedgerContext {
let arc = self
.inner
.ledger
.as_ref()
.expect("This adapter does not support the ledger. Use PostgresAdapter.");
ledger::LedgerContext::new(Arc::clone(arc))
}
}
fn compute_schema_hash<T: Object + IndexQuery>() -> String {
let mut fields: Vec<_> = T::indexed_fields()
.iter()
.map(|f| {
let mut kinds: Vec<&str> = f
.kinds
.iter()
.map(|k| match k {
IndexKind::Search => "Search",
IndexKind::Sort => "Sort",
})
.collect();
kinds.sort_unstable();
(f.name, kinds.join("+"))
})
.collect();
fields.sort_unstable_by_key(|(name, _)| *name);
let canonical = fields
.iter()
.map(|(name, kinds)| format!("{}:{}", name, kinds))
.collect::<Vec<_>>()
.join(",");
let input = format!("{}:{}", T::TYPE, canonical);
blake3::hash(input.as_bytes()).to_hex().to_string()
}
fn schema_version_prefix() -> String {
let ver = env!("CARGO_PKG_VERSION");
let mut parts = ver.splitn(3, '.');
let major = parts.next().unwrap_or("0");
let minor = parts.next().unwrap_or("0");
format!("{}.{}", major, minor)
}
fn parse_schema_entry(entry: &str) -> (&str, &str) {
match entry.find(':') {
Some(pos) => {
let prefix = &entry[..pos];
let hash = &entry[pos + 1..];
let major = prefix.split('.').next().unwrap_or("0");
(major, hash)
}
None => ("0", entry),
}
}