use super::delta::Delta;
use super::molecule::{Molecule, PrimaryKey};
use crate::syntax::ast::MergeFn;
use crate::value::{MoleculeKindId, Type, Value};
use crate::Error;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::sync::Arc;
pub struct MoleculeStore {
arrangements: DashMap<MoleculeKindId, Arrangement>,
type_registry: Arc<TypeRegistry>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ApplyResult {
Inserted,
Merged,
NoOp,
}
impl MoleculeStore {
pub fn new(type_registry: Arc<TypeRegistry>) -> Self {
Self {
arrangements: DashMap::new(),
type_registry,
}
}
pub fn type_registry(&self) -> &Arc<TypeRegistry> {
&self.type_registry
}
pub fn apply<F>(&self, delta: &Delta, merge_eval: F) -> Result<ApplyResult, Error>
where
F: FnOnce(&MergeFn, &Molecule, &Molecule) -> Result<Molecule, Error>,
{
match delta {
Delta::Insert(m) => {
let pk = self.primary_key_of(m)?;
let kind = m.kind;
let schema = self
.type_registry
.schema_by_id(kind)
.ok_or_else(|| Error::Runtime(format!("unknown kind for {}", m.kind_name)))?;
let arrangement = self
.arrangements
.entry(kind)
.or_insert_with(Arrangement::new);
let mut by_pk = arrangement.by_pk.write();
let (new_molecule, result) = if let Some(existing) = by_pk.get(&pk) {
if existing.fields == m.fields {
return Ok(ApplyResult::NoOp);
}
if let Some(merge) = &schema.merge {
(merge_eval(merge, existing, m)?, ApplyResult::Merged)
} else {
return Err(Error::Runtime(format!(
"primary-key conflict on {} (no merge clause)",
m.kind_name
)));
}
} else {
(m.clone(), ApplyResult::Inserted)
};
by_pk.insert(pk, new_molecule);
Ok(result)
}
Delta::Retract(kind, pk, _ts) => {
if let Some(arrangement) = self.arrangements.get(kind) {
arrangement.by_pk.write().remove(pk);
}
Ok(ApplyResult::Inserted) }
}
}
pub fn scan(&self, kind: MoleculeKindId) -> Vec<Molecule> {
self.arrangements
.get(&kind)
.map(|a| a.by_pk.read().values().cloned().collect())
.unwrap_or_default()
}
pub fn get(&self, kind: MoleculeKindId, pk: &PrimaryKey) -> Option<Molecule> {
self.arrangements
.get(&kind)
.and_then(|a| a.by_pk.read().get(pk).cloned())
}
pub fn get_singleton(&self, kind_name: &str) -> Option<Molecule> {
let kind = self.type_registry.id_by_name(kind_name)?;
self.arrangements
.get(&kind)
.and_then(|a| a.by_pk.read().values().next().cloned())
}
pub fn scan_by_name(&self, kind_name: &str) -> Vec<Molecule> {
match self.type_registry.id_by_name(kind_name) {
Some(kind) => self.scan(kind),
None => Vec::new(),
}
}
fn primary_key_of(&self, m: &Molecule) -> Result<PrimaryKey, Error> {
let schema = self.type_registry.schema_by_id(m.kind).ok_or_else(|| {
Error::Runtime(format!("unknown kind for {}", m.kind_name))
})?;
let mut parts = Vec::with_capacity(schema.primary_key.len());
for field in &schema.primary_key {
let v = m.fields.get(field).cloned().ok_or_else(|| {
Error::Runtime(format!(
"{} missing primary-key field {field}",
m.kind_name
))
})?;
parts.push(v);
}
Ok(PrimaryKey(parts))
}
}
pub struct Arrangement {
pub by_pk: RwLock<BTreeMap<PrimaryKey, Molecule>>,
}
impl Arrangement {
fn new() -> Self {
Self {
by_pk: RwLock::new(BTreeMap::new()),
}
}
}
#[derive(Default)]
pub struct TypeRegistry {
inner: parking_lot::RwLock<TypeRegistryInner>,
}
#[derive(Default)]
struct TypeRegistryInner {
by_id: BTreeMap<MoleculeKindId, MoleculeSchema>,
by_name: BTreeMap<String, MoleculeKindId>,
next_id: u32,
}
impl TypeRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn register(&self, schema: MoleculeSchema) -> MoleculeKindId {
let mut inner = self.inner.write();
if let Some(existing) = inner.by_name.get(&schema.name).copied() {
inner.by_id.insert(existing, schema);
return existing;
}
let id = MoleculeKindId(inner.next_id);
inner.next_id += 1;
inner.by_name.insert(schema.name.clone(), id);
inner.by_id.insert(id, schema);
id
}
pub fn id_by_name(&self, name: &str) -> Option<MoleculeKindId> {
self.inner.read().by_name.get(name).copied()
}
pub fn schema_by_id(&self, id: MoleculeKindId) -> Option<MoleculeSchema> {
self.inner.read().by_id.get(&id).cloned()
}
pub fn schema_by_name(&self, name: &str) -> Option<MoleculeSchema> {
let inner = self.inner.read();
inner
.by_name
.get(name)
.and_then(|id| inner.by_id.get(id).cloned())
}
}
#[derive(Clone, Debug)]
pub struct MoleculeSchema {
pub name: String,
pub fields: Vec<FieldSchema>,
pub primary_key: Vec<String>,
pub merge: Option<MergeFn>,
pub is_singleton: bool,
}
#[derive(Clone, Debug)]
pub struct FieldSchema {
pub name: String,
pub ty: Type,
pub default: Option<crate::syntax::ast::Expr>,
}
#[allow(dead_code)]
fn _value_unused(_: &Value) {}