ankurah_core/
system.rs

1use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
2use anyhow::{anyhow, Result};
3use std::collections::BTreeMap;
4use std::marker::PhantomData;
5use std::sync::{Arc, OnceLock, RwLock};
6use tokio::sync::Notify;
7use tracing::{error, warn};
8
9use crate::collectionset::CollectionSet;
10use crate::entity::{Entity, WeakEntitySet};
11use crate::error::MutationError;
12use crate::error::RetrievalError;
13use crate::notice_info;
14use crate::policy::PolicyAgent;
15use crate::property::{Property, PropertyError};
16use crate::reactor::Reactor;
17use crate::retrieval::LocalRetriever;
18use crate::storage::{StorageCollectionWrapper, StorageEngine};
19use crate::{property::backend::LWWBackend, value::Value};
20pub const SYSTEM_COLLECTION_ID: &str = "_ankurah_system";
21pub const PROTECTED_COLLECTIONS: &[&str] = &[SYSTEM_COLLECTION_ID];
22
23/// System catalog manager for storing various metadata about the system
24/// * root clock
25/// * valid collections (TODO)
26/// * property definitions (TODO)
27
28pub struct SystemManager<SE, PA>(Arc<Inner<SE, PA>>);
29impl<SE, PA> Clone for SystemManager<SE, PA> {
30    fn clone(&self) -> Self { Self(self.0.clone()) }
31}
32
33struct Inner<SE, PA> {
34    collectionset: CollectionSet<SE>,
35    collection_map: RwLock<BTreeMap<CollectionId, Entity>>,
36    entities: WeakEntitySet,
37    durable: bool,
38    root: RwLock<Option<Attested<EntityState>>>,
39    items: RwLock<Vec<Entity>>,
40    loaded: OnceLock<()>,
41    loading: Notify,
42    system_ready: RwLock<bool>,
43    system_ready_notify: Notify,
44    reactor: Reactor,
45    _phantom: PhantomData<PA>,
46}
47
48impl<SE, PA> SystemManager<SE, PA>
49where
50    SE: StorageEngine + Send + Sync + 'static,
51    PA: PolicyAgent + Send + Sync + 'static,
52{
53    pub(crate) fn new(collections: CollectionSet<SE>, entities: WeakEntitySet, reactor: Reactor, durable: bool) -> Self {
54        let me = Self(Arc::new(Inner {
55            collectionset: collections,
56            entities,
57            durable,
58            items: RwLock::new(Vec::new()),
59            root: RwLock::new(None),
60            loaded: OnceLock::new(),
61            loading: Notify::new(),
62            collection_map: RwLock::new(BTreeMap::new()),
63            system_ready: RwLock::new(false),
64            system_ready_notify: Notify::new(),
65            reactor,
66            _phantom: PhantomData,
67        }));
68        {
69            let me = me.clone();
70            crate::task::spawn(async move {
71                if let Err(e) = me.load_system_catalog().await {
72                    error!("Failed to load system catalog: {}", e);
73                }
74            });
75        }
76        me
77    }
78
79    pub fn root(&self) -> Option<Attested<EntityState>> { self.0.root.read().unwrap().as_ref().map(|r| r.clone()) }
80
81    pub fn items(&self) -> Vec<Entity> { self.0.items.read().unwrap().clone() }
82
83    /// get an existing collection if it's defined in the system catalog, else insert a SysItem::Collection
84    /// then return collections.get to get the StorageCollectionWrapper
85    pub async fn collection(&self, id: &CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
86        self.wait_loaded().await;
87        // TODO - update the system catalog to create an entity for this collection
88
89        // Return the collection wrapper
90        self.0.collectionset.get(id).await
91    }
92
93    /// Returns true if we've successfully initialized or joined a system
94    pub fn is_system_ready(&self) -> bool { *self.0.system_ready.read().unwrap() }
95
96    /// Waits until we've successfully initialized or joined a system
97    pub async fn wait_system_ready(&self) {
98        if !self.is_system_ready() {
99            self.0.system_ready_notify.notified().await;
100        }
101    }
102
103    /// Creates a new system root. This should only be called once per system by durable nodes
104    /// The rest of the nodes must "join" this system.
105    pub async fn create(&self) -> Result<()> {
106        if !self.0.durable {
107            return Err(anyhow!("Only durable nodes can create a new system"));
108        }
109
110        // Wait for local system catalog to be loaded
111        self.wait_loaded().await;
112
113        {
114            let items = self.0.items.read().unwrap();
115            if !items.is_empty() {
116                return Err(anyhow!("System root already exists"));
117            }
118        }
119
120        // TODO - see if we can use the Model derive macro for a SysCatalogItem model rather than doing this manually
121        let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
122        let storage = self.0.collectionset.get(&collection_id).await?;
123
124        let system_entity = self.0.entities.create(collection_id.clone());
125
126        let lww_backend = system_entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
127        lww_backend.set("item".into(), proto::sys::Item::SysRoot.into_value()?);
128
129        let event = system_entity.generate_commit_event()?.ok_or(anyhow!("Expected event"))?;
130        let root: Clock = event.id().into();
131
132        // Add the event to storage first
133        storage.add_event(&event.into()).await?;
134
135        // Update the entity's head clock
136        system_entity.commit_head(root.clone());
137        // Now get the entity state after the head is updated
138        let attested_state: Attested<EntityState> = system_entity.to_entity_state()?.into();
139        storage.set_state(attested_state.clone()).await?;
140
141        // Update our system state
142        let mut items = self.0.items.write().unwrap();
143        items.push(system_entity);
144        *self.0.root.write().unwrap() = Some(attested_state);
145
146        // Mark system as ready and notify waiters
147        *self.0.system_ready.write().unwrap() = true;
148        self.0.system_ready_notify.notify_waiters();
149
150        Ok(())
151    }
152
153    /// Joins an existing system. This should only be called by ephemeral nodes.
154    pub async fn join_system(&self, state: Attested<EntityState>) -> Result<(), MutationError> {
155        // Wait for catalog to be loaded before proceeding
156        self.wait_loaded().await;
157
158        // If node is durable, fail - durable nodes should not join an existing system
159        if self.0.durable {
160            warn!("Durable node attempted to join system - this is not allowed");
161            return Err(MutationError::General(Box::new(std::io::Error::other("Durable nodes cannot join an existing system"))));
162        }
163
164        let root_state = self.root();
165
166        // If we have a matching root, we're already in sync - just mark ready and return
167        if let Some(root) = root_state {
168            if root.payload.state.head == state.payload.state.head {
169                notice_info!("Found matching root - Node is part of the same system");
170                *self.0.system_ready.write().unwrap() = true;
171                self.0.system_ready_notify.notify_waiters();
172                return Ok(());
173            }
174            tracing::warn!("Mismatched root state during join: local={:?}, remote={:?}", root, state.payload.state.head);
175
176            // Only reset storage if we have a root that needs to be replaced
177            tracing::info!("Resetting storage to replace mismatched root");
178            // Drop locks before reset
179            {
180                let mut root = self.0.root.write().expect("Root lock poisoned");
181                *root = None;
182            }
183            self.hard_reset().await.map_err(|e| MutationError::General(Box::new(std::io::Error::other(e.to_string()))))?;
184        }
185
186        let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
187        let storage = self.0.collectionset.get(&collection_id).await?;
188
189        // Set the state
190        storage.set_state(state.clone()).await?;
191
192        // Set root and mark system as ready
193        {
194            let mut root = self.0.root.write().expect("Root lock poisoned");
195            *root = Some(state);
196        }
197        *self.0.system_ready.write().unwrap() = true;
198        self.0.system_ready_notify.notify_waiters();
199
200        Ok(())
201    }
202
203    /// Resets all storage by deleting all collections, including the system collection.
204    /// This is used when an ephemeral node needs to join a system with a different root.
205    /// **This is a destructive operation and should be used with extreme caution.**
206    pub async fn hard_reset(&self) -> Result<()> {
207        // Delete all collections from storage
208        self.0.collectionset.delete_all_collections().await?;
209
210        // Reset our state
211        {
212            let mut items = self.0.items.write().unwrap();
213            items.clear();
214        }
215        {
216            let mut root = self.0.root.write().unwrap();
217            *root = None;
218        }
219        {
220            let mut collection_map = self.0.collection_map.write().unwrap();
221            collection_map.clear();
222        }
223        {
224            let mut system_ready = self.0.system_ready.write().unwrap();
225            *system_ready = false;
226        }
227
228        // Reset the reactor state to notify subscriptions
229        self.0.reactor.system_reset();
230
231        Ok(())
232    }
233
234    /// Returns true if the local system catalog is loaded
235    pub fn is_loaded(&self) -> bool { self.0.loaded.get().is_some() }
236
237    /// Waits for the local system catalog to be loaded
238    pub async fn wait_loaded(&self) {
239        if !self.is_loaded() {
240            self.0.loading.notified().await;
241        }
242    }
243
244    async fn load_system_catalog(&self) -> Result<()> {
245        if self.is_loaded() {
246            return Err(anyhow!("System catalog already loaded"));
247        }
248
249        let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
250        let storage = self.0.collectionset.get(&collection_id).await?;
251
252        let mut entities = Vec::new();
253        let mut root_state = None;
254
255        let retriever = LocalRetriever::new(storage.clone());
256
257        for state in
258            storage.fetch_states(&ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None }).await?
259        {
260            let (_entity_changed, entity) =
261                self.0.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state.clone()).await?;
262            let lww_backend = entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
263            if let Some(value) = lww_backend.get(&"item".to_string()) {
264                let item = proto::sys::Item::from_value(Some(value)).expect("Invalid sys item");
265
266                if let proto::sys::Item::SysRoot = &item {
267                    root_state = Some(state);
268                }
269                entities.push(entity);
270            }
271        }
272
273        // Update our system state
274        {
275            let mut items = self.0.items.write().unwrap();
276            items.extend(entities);
277        }
278
279        // If we loaded a system root and we're a durable node, we're ready
280        let has_root = root_state.is_some();
281        {
282            let mut root = self.0.root.write().expect("Root lock poisoned");
283            *root = root_state;
284        }
285
286        // Only mark ready if we're a durable node and found a root
287        // Ephemeral nodes must explicitly join via join_system()
288        if has_root && self.0.durable {
289            *self.0.system_ready.write().unwrap() = true;
290            self.0.system_ready_notify.notify_waiters();
291        }
292
293        // Set loaded state and notify waiters
294        self.0.loaded.set(()).expect("Loading flag already set");
295        self.0.loading.notify_waiters();
296        Ok(())
297    }
298}
299
300impl Property for proto::sys::Item {
301    fn into_value(&self) -> std::result::Result<Option<Value>, crate::property::PropertyError> {
302        Ok(Some(Value::String(
303            serde_json::to_string(self).map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?,
304        )))
305    }
306
307    fn from_value(value: Option<Value>) -> std::result::Result<Self, crate::property::PropertyError> {
308        if let Some(Value::String(string)) = value {
309            let item: proto::sys::Item = serde_json::from_str(&string)
310                .map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?;
311            Ok(item)
312        } else {
313            Err(PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })
314        }
315    }
316}