1use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
2use anyhow::{anyhow, Result};
3use std::collections::BTreeMap;
4use std::marker::PhantomData;
5use std::sync::{Arc, OnceLock, RwLock};
6use tokio::sync::Notify;
7use tracing::{error, warn};
8
9use crate::collectionset::CollectionSet;
10use crate::entity::{Entity, WeakEntitySet};
11use crate::error::MutationError;
12use crate::error::RetrievalError;
13use crate::notice_info;
14use crate::policy::PolicyAgent;
15use crate::property::{Property, PropertyError};
16use crate::reactor::Reactor;
17use crate::retrieval::LocalRetriever;
18use crate::storage::{StorageCollectionWrapper, StorageEngine};
19use crate::{property::backend::LWWBackend, value::Value};
20pub const SYSTEM_COLLECTION_ID: &str = "_ankurah_system";
21pub const PROTECTED_COLLECTIONS: &[&str] = &[SYSTEM_COLLECTION_ID];
22
23pub struct SystemManager<SE, PA>(Arc<Inner<SE, PA>>);
29impl<SE, PA> Clone for SystemManager<SE, PA> {
30 fn clone(&self) -> Self { Self(self.0.clone()) }
31}
32
33struct Inner<SE, PA> {
34 collectionset: CollectionSet<SE>,
35 collection_map: RwLock<BTreeMap<CollectionId, Entity>>,
36 entities: WeakEntitySet,
37 durable: bool,
38 root: RwLock<Option<Attested<EntityState>>>,
39 items: RwLock<Vec<Entity>>,
40 loaded: OnceLock<()>,
41 loading: Notify,
42 system_ready: RwLock<bool>,
43 system_ready_notify: Notify,
44 reactor: Reactor,
45 _phantom: PhantomData<PA>,
46}
47
48impl<SE, PA> SystemManager<SE, PA>
49where
50 SE: StorageEngine + Send + Sync + 'static,
51 PA: PolicyAgent + Send + Sync + 'static,
52{
53 pub(crate) fn new(collections: CollectionSet<SE>, entities: WeakEntitySet, reactor: Reactor, durable: bool) -> Self {
54 let me = Self(Arc::new(Inner {
55 collectionset: collections,
56 entities,
57 durable,
58 items: RwLock::new(Vec::new()),
59 root: RwLock::new(None),
60 loaded: OnceLock::new(),
61 loading: Notify::new(),
62 collection_map: RwLock::new(BTreeMap::new()),
63 system_ready: RwLock::new(false),
64 system_ready_notify: Notify::new(),
65 reactor,
66 _phantom: PhantomData,
67 }));
68 {
69 let me = me.clone();
70 crate::task::spawn(async move {
71 if let Err(e) = me.load_system_catalog().await {
72 error!("Failed to load system catalog: {}", e);
73 }
74 });
75 }
76 me
77 }
78
79 pub fn root(&self) -> Option<Attested<EntityState>> { self.0.root.read().unwrap().as_ref().map(|r| r.clone()) }
80
81 pub fn items(&self) -> Vec<Entity> { self.0.items.read().unwrap().clone() }
82
83 pub async fn collection(&self, id: &CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
86 self.wait_loaded().await;
87 self.0.collectionset.get(id).await
91 }
92
93 pub fn is_system_ready(&self) -> bool { *self.0.system_ready.read().unwrap() }
95
96 pub async fn wait_system_ready(&self) {
98 if !self.is_system_ready() {
99 self.0.system_ready_notify.notified().await;
100 }
101 }
102
103 pub async fn create(&self) -> Result<()> {
106 if !self.0.durable {
107 return Err(anyhow!("Only durable nodes can create a new system"));
108 }
109
110 self.wait_loaded().await;
112
113 {
114 let items = self.0.items.read().unwrap();
115 if !items.is_empty() {
116 return Err(anyhow!("System root already exists"));
117 }
118 }
119
120 let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
122 let storage = self.0.collectionset.get(&collection_id).await?;
123
124 let system_entity = self.0.entities.create(collection_id.clone());
125
126 let lww_backend = system_entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
127 lww_backend.set("item".into(), proto::sys::Item::SysRoot.into_value()?);
128
129 let event = system_entity.generate_commit_event()?.ok_or(anyhow!("Expected event"))?;
130 let root: Clock = event.id().into();
131
132 storage.add_event(&event.into()).await?;
134
135 system_entity.commit_head(root.clone());
137 let attested_state: Attested<EntityState> = system_entity.to_entity_state()?.into();
139 storage.set_state(attested_state.clone()).await?;
140
141 let mut items = self.0.items.write().unwrap();
143 items.push(system_entity);
144 *self.0.root.write().unwrap() = Some(attested_state);
145
146 *self.0.system_ready.write().unwrap() = true;
148 self.0.system_ready_notify.notify_waiters();
149
150 Ok(())
151 }
152
153 pub async fn join_system(&self, state: Attested<EntityState>) -> Result<(), MutationError> {
155 self.wait_loaded().await;
157
158 if self.0.durable {
160 warn!("Durable node attempted to join system - this is not allowed");
161 return Err(MutationError::General(Box::new(std::io::Error::other("Durable nodes cannot join an existing system"))));
162 }
163
164 let root_state = self.root();
165
166 if let Some(root) = root_state {
168 if root.payload.state.head == state.payload.state.head {
169 notice_info!("Found matching root - Node is part of the same system");
170 *self.0.system_ready.write().unwrap() = true;
171 self.0.system_ready_notify.notify_waiters();
172 return Ok(());
173 }
174 tracing::warn!("Mismatched root state during join: local={:?}, remote={:?}", root, state.payload.state.head);
175
176 tracing::info!("Resetting storage to replace mismatched root");
178 {
180 let mut root = self.0.root.write().expect("Root lock poisoned");
181 *root = None;
182 }
183 self.hard_reset().await.map_err(|e| MutationError::General(Box::new(std::io::Error::other(e.to_string()))))?;
184 }
185
186 let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
187 let storage = self.0.collectionset.get(&collection_id).await?;
188
189 storage.set_state(state.clone()).await?;
191
192 {
194 let mut root = self.0.root.write().expect("Root lock poisoned");
195 *root = Some(state);
196 }
197 *self.0.system_ready.write().unwrap() = true;
198 self.0.system_ready_notify.notify_waiters();
199
200 Ok(())
201 }
202
203 pub async fn hard_reset(&self) -> Result<()> {
207 self.0.collectionset.delete_all_collections().await?;
209
210 {
212 let mut items = self.0.items.write().unwrap();
213 items.clear();
214 }
215 {
216 let mut root = self.0.root.write().unwrap();
217 *root = None;
218 }
219 {
220 let mut collection_map = self.0.collection_map.write().unwrap();
221 collection_map.clear();
222 }
223 {
224 let mut system_ready = self.0.system_ready.write().unwrap();
225 *system_ready = false;
226 }
227
228 self.0.reactor.system_reset();
230
231 Ok(())
232 }
233
234 pub fn is_loaded(&self) -> bool { self.0.loaded.get().is_some() }
236
237 pub async fn wait_loaded(&self) {
239 if !self.is_loaded() {
240 self.0.loading.notified().await;
241 }
242 }
243
244 async fn load_system_catalog(&self) -> Result<()> {
245 if self.is_loaded() {
246 return Err(anyhow!("System catalog already loaded"));
247 }
248
249 let collection_id = CollectionId::fixed_name(SYSTEM_COLLECTION_ID);
250 let storage = self.0.collectionset.get(&collection_id).await?;
251
252 let mut entities = Vec::new();
253 let mut root_state = None;
254
255 let retriever = LocalRetriever::new(storage.clone());
256
257 for state in
258 storage.fetch_states(&ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None }).await?
259 {
260 let (_entity_changed, entity) =
261 self.0.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state.clone()).await?;
262 let lww_backend = entity.get_backend::<LWWBackend>().expect("LWW Backend should exist");
263 if let Some(value) = lww_backend.get(&"item".to_string()) {
264 let item = proto::sys::Item::from_value(Some(value)).expect("Invalid sys item");
265
266 if let proto::sys::Item::SysRoot = &item {
267 root_state = Some(state);
268 }
269 entities.push(entity);
270 }
271 }
272
273 {
275 let mut items = self.0.items.write().unwrap();
276 items.extend(entities);
277 }
278
279 let has_root = root_state.is_some();
281 {
282 let mut root = self.0.root.write().expect("Root lock poisoned");
283 *root = root_state;
284 }
285
286 if has_root && self.0.durable {
289 *self.0.system_ready.write().unwrap() = true;
290 self.0.system_ready_notify.notify_waiters();
291 }
292
293 self.0.loaded.set(()).expect("Loading flag already set");
295 self.0.loading.notify_waiters();
296 Ok(())
297 }
298}
299
300impl Property for proto::sys::Item {
301 fn into_value(&self) -> std::result::Result<Option<Value>, crate::property::PropertyError> {
302 Ok(Some(Value::String(
303 serde_json::to_string(self).map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?,
304 )))
305 }
306
307 fn from_value(value: Option<Value>) -> std::result::Result<Self, crate::property::PropertyError> {
308 if let Some(Value::String(string)) = value {
309 let item: proto::sys::Item = serde_json::from_str(&string)
310 .map_err(|_| PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })?;
311 Ok(item)
312 } else {
313 Err(PropertyError::InvalidValue { value: "".to_string(), ty: "sys::Item".to_string() })
314 }
315 }
316}