use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
use anyhow::{anyhow, Result};
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::{Arc, OnceLock, RwLock};
use tokio::sync::Notify;
use tracing::{error, warn};
use crate::collectionset::CollectionSet;
use crate::entity::{Entity, WeakEntitySet};
use crate::error::MutationError;
use crate::error::RetrievalError;
use crate::notice_info;
use crate::policy::PolicyAgent;
use crate::property::{Property, PropertyError};
use crate::reactor::Reactor;
use crate::retrieval::LocalRetriever;
use crate::storage::{StorageCollectionWrapper, StorageEngine};
use crate::{property::backend::LWWBackend, value::Value};
pub const SYSTEM_COLLECTION_ID: &str = "_ankurah_system";
pub const PROTECTED_COLLECTIONS: &[&str] = &[SYSTEM_COLLECTION_ID];
pub struct SystemManager<SE, PA>(Arc<Inner<SE, PA>>);
impl<SE, PA> Clone for SystemManager<SE, PA> {
fn clone(&self) -> Self { Self(self.0.clone()) }
}
struct Inner<SE, PA> {
collectionset: CollectionSet<SE>,
collection_map: RwLock<BTreeMap<CollectionId, Entity>>,
entities: WeakEntitySet,
durable: bool,
root: RwLock<Option<Attested<EntityState>>>,
items: RwLock<Vec<Entity>>,
loaded: OnceLock<()>,
loading: Notify,
system_ready: RwLock<bool>,
system_ready_notify: Notify,
reactor: Reactor,
_phantom: PhantomData<PA>,
}
impl<SE, PA> SystemManager<SE, PA>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
pub(crate) fn new(collections: CollectionSet<SE>, entities: WeakEntitySet, reactor: Reactor, durable: bool) -> Self {
let me = Self(Arc::new(Inner {
collectionset: collections,
entities,
durable,
items: RwLock::new(Vec::new()),
root: RwLock::new(None),
loaded: OnceLock::new(),
loading: Notify::new(),
collection_map: RwLock::new(BTreeMap::new()),
system_ready: RwLock::new(false),
system_ready_notify: Notify::new(),
reactor,
_phantom: PhantomData,
}));
{
let me = me.clone();
crate::task::spawn(async move {
if let Err(e) = me.load_system_catalog().await {
error!("Failed to load system catalog: {}", e);
}
});
}
me
}
pub fn root(&self) -> Option<Attested<EntityState>> { self.0.root.read().unwrap().as_ref().map(|r| r.clone()) }
pub fn items(&self) -> Vec<Entity> { self.0.items.read().unwrap().clone() }
pub async fn collection(&self, id: &CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
self.wait_loaded().await;
self.0.collectionset.get(id).await
}
pub fn is_system_ready(&self) -> bool { *self.0.system_ready.read().unwrap() }
pub async fn wait_system_ready(&self) {
if !self.is_system_ready() {
self.0.system_ready_notify.notified().await;
}
}
pub async fn create(&self) -> Result<()> {
if !self.0.durable {
return Err(anyhow!("Only durable nodes can create a new system"));
}
self.wait_loaded().await;
{
let items = self.0.items.read().unwrap();
if !items.is_empty() {
return Err(anyhow!("System root already exists"));
}
}
let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
let storage = self.0.collectionset.get(&collection_id).await?;
let system_entity = self.0.entities.create(collection_id.clone());
let lww_backend = system_entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
lww_backend.set("item".into(), proto::sys::Item::SysRoot.into_value()?);
let event = system_entity.generate_commit_event()?.ok_or(anyhow!("Expected event"))?;
let root: Clock = event.id().into();
storage.add_event(&event.into()).await?;
system_entity.commit_head(root.clone());
let attested_state: Attested<EntityState> = system_entity.to_entity_state()?.into();
storage.set_state(attested_state.clone()).await?;
let mut items = self.0.items.write().unwrap();
items.push(system_entity);
*self.0.root.write().unwrap() = Some(attested_state);
*self.0.system_ready.write().unwrap() = true;
self.0.system_ready_notify.notify_waiters();
Ok(())
}
pub async fn join_system(&self, state: Attested<EntityState>) -> Result<(), MutationError> {
self.wait_loaded().await;
if self.0.durable {
warn!("Durable node attempted to join system - this is not allowed");
return Err(MutationError::General(Box::new(std::io::Error::other("Durable nodes cannot join an existing system"))));
}
let root_state = self.root();
if let Some(root) = root_state {
if root.payload.state.head == state.payload.state.head {
notice_info!("Found matching root - Node is part of the same system");
*self.0.system_ready.write().unwrap() = true;
self.0.system_ready_notify.notify_waiters();
return Ok(());
}
tracing::warn!("Mismatched root state during join: local={:?}, remote={:?}", root, state.payload.state.head);
tracing::info!("Resetting storage to replace mismatched root");
{
let mut root = self.0.root.write().expect("Root lock poisoned");
*root = None;
}
self.hard_reset().await.map_err(|e| MutationError::General(Box::new(std::io::Error::other(e.to_string()))))?;
}
let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
let storage = self.0.collectionset.get(&collection_id).await?;
storage.set_state(state.clone()).await?;
{
let mut root = self.0.root.write().expect("Root lock poisoned");
*root = Some(state);
}
*self.0.system_ready.write().unwrap() = true;
self.0.system_ready_notify.notify_waiters();
Ok(())
}
pub async fn hard_reset(&self) -> Result<()> {
self.0.collectionset.delete_all_collections().await?;
{
let mut items = self.0.items.write().unwrap();
items.clear();
}
{
let mut root = self.0.root.write().unwrap();
*root = None;
}
{
let mut collection_map = self.0.collection_map.write().unwrap();
collection_map.clear();
}
{
let mut system_ready = self.0.system_ready.write().unwrap();
*system_ready = false;
}
self.0.reactor.system_reset();
Ok(())
}
pub fn is_loaded(&self) -> bool { self.0.loaded.get().is_some() }
pub async fn wait_loaded(&self) {
if !self.is_loaded() {
self.0.loading.notified().await;
}
}
async fn load_system_catalog(&self) -> Result<()> {
if self.is_loaded() {
return Err(anyhow!("System catalog already loaded"));
}
let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
let storage = self.0.collectionset.get(&collection_id).await?;
let mut entities = Vec::new();
let mut root_state = None;
let retriever = LocalRetriever::new(storage.clone());
for state in
storage.fetch_states(&ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None }).await?
{
let (_entity_changed, entity) =
self.0.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state.clone()).await?;
let lww_backend = entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
if let Some(value) = lww_backend.get(&"item".to_string()) {
let item = proto::sys::Item::from_value(Some(value)).expect("Invalid sys item");
if let proto::sys::Item::SysRoot = &item {
root_state = Some(state);
}
entities.push(entity);
}
}
{
let mut items = self.0.items.write().unwrap();
items.extend(entities);
}
let has_root = root_state.is_some();
{
let mut root = self.0.root.write().expect("Root lock poisoned");
*root = root_state;
}
if has_root && self.0.durable {
*self.0.system_ready.write().unwrap() = true;
self.0.system_ready_notify.notify_waiters();
}
self.0.loaded.set(()).expect("Loading flag already set");
self.0.loading.notify_waiters();
Ok(())
}
}
impl Property for proto::sys::Item {
fn into_value(&self) -> std::result::Result<Option<Value>, crate::property::PropertyError> {
Ok(Some(Value::String(
serde_json::to_string(self).map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?,
)))
}
fn from_value(value: Option<Value>) -> std::result::Result<Self, crate::property::PropertyError> {
if let Some(Value::String(string)) = value {
let item: proto::sys::Item = serde_json::from_str(&string)
.map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?;
Ok(item)
} else {
Err(PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })
}
}
}