use std::{
collections::HashSet,
sync::{Arc, Mutex},
};
use ahash::{HashMap, RandomState};
use tokio::sync::RwLock;
use crate::{
catalog::{Collections, fetch_catalog},
error::MotivaError,
index::{EntityHandle, IndexProvider},
matching::MatchParams,
model::{Entity, HasProperties, SearchEntity},
prelude::MatchingAlgorithm,
schemas::SCHEMAS,
scoring,
};
pub enum GetEntityBehavior {
RootOnly,
FetchNestedEntity,
}
#[derive(Clone, Debug)]
pub struct Motiva<P: IndexProvider> {
yente: Option<String>,
catalog: Arc<RwLock<Collections>>,
index: P,
}
impl<P: IndexProvider> Motiva<P> {
pub async fn new(provider: P, yente: Option<String>) -> Result<Self, MotivaError> {
crate::init();
let catalog = fetch_catalog(¥te.as_ref().map(|y| format!("{y}/catalog"))).await?;
Ok(Self {
index: provider,
yente,
catalog: Arc::new(RwLock::new(catalog)),
})
}
pub async fn health(&self) -> Result<bool, MotivaError> {
self.index.health().await
}
pub async fn search(&self, entity: &SearchEntity, params: &MatchParams) -> Result<Vec<Entity>, MotivaError> {
self.index.search(&self.catalog, entity, params).await
}
pub async fn get_entity(&self, id: &str, behavior: GetEntityBehavior) -> Result<EntityHandle, MotivaError> {
match self.index.get_entity(id).await? {
EntityHandle::Referent(id) => Ok(EntityHandle::Referent(id)),
EntityHandle::Nominal(mut entity) => {
let id = id.to_string();
if let GetEntityBehavior::RootOnly = behavior {
return Ok(EntityHandle::Nominal(entity));
}
let mut root = Some(&id);
let mut seen = HashSet::<_, RandomState>::from_iter([id.clone()]);
let mut ids: Vec<String> = Vec::new();
let mut root_arena: HashMap<String, String> = Default::default();
let mut arena: HashMap<String, (Arc<Mutex<Entity>>, String)> = Default::default();
if let Some(properties) = entity.schema.properties() {
for (name, property) in properties {
if property._type != "entity" {
continue;
}
for assoc in entity.property(&name) {
root_arena.insert(assoc.to_string(), name.clone());
}
ids.extend(entity.property(&name).iter().cloned());
}
while !ids.is_empty() {
let associations = self.index.get_related_entities(root, &ids, &seen).await?;
root = None;
ids.clear();
for association in associations {
let Some(schema) = SCHEMAS.get(association.schema.as_str()) else {
continue;
};
let ptr = Arc::new(Mutex::new(association.clone()));
match root_arena.get_mut(&association.id) {
Some(attr) => entity.properties.entities.entry(attr.clone()).or_default().push(Arc::clone(&ptr)),
_ => {
if let Some((parent, attr)) = arena.get_mut(&association.id)
&& let Ok(mut e) = parent.lock()
{
e.properties.entities.entry(attr.clone()).or_default().push(Arc::clone(&ptr));
}
}
}
for (name, values) in &association.properties.strings {
let Some(property) = schema.properties.get(name) else {
continue;
};
if property._type != "entity" {
continue;
}
ids.extend(values.iter().cloned());
for value in values {
arena.insert(value.clone(), (Arc::clone(&ptr), name.clone()));
}
if let Some(reverse) = property.reverse.as_ref()
&& values.contains(&entity.id)
{
entity.properties.entities.entry(reverse.name.clone()).or_default().push(Arc::clone(&ptr));
}
}
seen.insert(association.id);
}
}
}
Ok(EntityHandle::Nominal(entity))
}
}
}
pub async fn refresh_catalog(&self) {
match fetch_catalog(&self.yente.as_ref().map(|y| format!("{y}/catalog"))).await {
Ok(catalog) => {
*self.catalog.write().await = catalog;
}
Err(err) => tracing::error!(error = err.to_string(), "could not refresh catalog"),
}
}
pub fn score<A: MatchingAlgorithm>(&self, entity: &SearchEntity, hits: Vec<Entity>, cutoff: f64) -> anyhow::Result<Vec<(Entity, f64)>> {
scoring::score::<A>(entity, hits, cutoff)
}
}