1mod candidate_changes;
2mod comparison_index;
3pub mod fetch_gap;
4mod property_path;
5mod subscription;
6mod subscription_state;
7mod update;
8mod watcherset;
9
10pub(crate) use self::{
11 candidate_changes::CandidateChanges,
12 subscription::{ReactorSubscription, ReactorSubscriptionId},
13 update::{MembershipChange, ReactorUpdate, ReactorUpdateItem},
14 watcherset::{WatcherChange, WatcherSet},
15};
16
17pub(crate) use self::fetch_gap::GapFetcher;
19
20use crate::{
21 entity::Entity,
22 error::SubscriptionError,
23 indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder},
24 reactor::{subscription::ReactorSubInner, subscription_state::Subscription, watcherset::WatcherOp},
25 resultset::EntityResultSet,
26 selection::filter::Filterable,
27 value::{Value, ValueType},
28};
29use ankurah_proto::{self as proto};
30use futures::future::join_all;
31use std::{
32 collections::HashMap,
33 sync::{Arc, Mutex},
34};
35
36pub trait AbstractEntity: Clone + std::fmt::Debug {
38 fn collection(&self) -> proto::CollectionId;
39 fn id(&self) -> &proto::EntityId;
40 fn value(&self, field: &str) -> Option<Value>;
41}
42
43pub trait ChangeNotification: std::fmt::Debug + std::fmt::Display {
45 type Entity: AbstractEntity;
46 type Event: Clone + std::fmt::Debug;
47
48 fn into_parts(self) -> (Self::Entity, Vec<Self::Event>);
49 fn entity(&self) -> &Self::Entity;
50 fn events(&self) -> &[Self::Event];
51}
52
53pub trait PreNotifyHook {
55 fn pre_notify(&self, version: u32);
56}
57
58impl PreNotifyHook for () {
60 fn pre_notify(&self, _version: u32) {}
61}
62
63pub struct Reactor<
65 E: AbstractEntity + Filterable + Send + 'static = Entity,
66 Ev: Clone + Send + 'static = ankurah_proto::Attested<ankurah_proto::Event>,
67>(Arc<ReactorInner<E, Ev>>);
68
69struct ReactorInner<E: AbstractEntity + Filterable, Ev> {
70 subscriptions: std::sync::Mutex<HashMap<ReactorSubscriptionId, Subscription<E, Ev>>>,
71 watcher_set: Arc<std::sync::Mutex<WatcherSet>>,
73 notify_lock: tokio::sync::Mutex<()>,
75}
76impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Clone for Reactor<E, Ev> {
78 fn clone(&self) -> Self { Self(self.0.clone()) }
79}
80
81impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Default for Reactor<E, Ev> {
82 fn default() -> Self { Self::new() }
83}
84
85impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Reactor<E, Ev> {
86 pub fn new() -> Self {
87 Self(Arc::new(ReactorInner {
88 subscriptions: Mutex::new(HashMap::new()),
89 watcher_set: Arc::new(Mutex::new(WatcherSet::new())),
90 notify_lock: tokio::sync::Mutex::new(()),
91 }))
92 }
93
94 pub fn subscribe(&self) -> ReactorSubscription<E, Ev> {
96 let broadcast = ankurah_signals::broadcast::Broadcast::new();
97 let subscription = Subscription::new(broadcast.clone(), self.0.watcher_set.clone());
98 let subscription_id = subscription.id();
99 self.0.subscriptions.lock().unwrap().insert(subscription_id, subscription);
100 ReactorSubscription(Arc::new(ReactorSubInner { subscription_id, reactor: self.clone(), broadcast }))
101 }
102
103 pub(crate) fn unsubscribe(&self, sub_id: ReactorSubscriptionId) -> Result<(), SubscriptionError> {
105 let subscription = {
106 let mut subscriptions = self.0.subscriptions.lock().unwrap();
107 subscriptions.remove(&sub_id).ok_or(SubscriptionError::SubscriptionNotFound)?
108 };
109
110 let queries = subscription.take_all_queries();
112
113 let mut watcher_set = self.0.watcher_set.lock().unwrap();
115 for (query_id, query_state) in queries {
116 if let Some(selection) = &query_state.selection {
118 watcher_set.recurse_predicate_watchers(
119 &query_state.collection_id,
120 &selection.predicate,
121 (sub_id, query_id),
122 WatcherOp::Remove,
123 );
124 }
125
126 let entity_ids: Vec<_> = query_state.resultset.keys().collect();
128 watcher_set.remove_entity_subscriptions(sub_id, entity_ids);
129 }
130
131 Ok(())
132 }
133
134 pub fn remove_query(&self, subscription_id: ReactorSubscriptionId, query_id: proto::QueryId) -> Result<(), SubscriptionError> {
136 let subscription = {
137 let subscriptions = self.0.subscriptions.lock().unwrap();
138 subscriptions.get(&subscription_id).cloned().ok_or(SubscriptionError::SubscriptionNotFound)?
139 };
140
141 let query_state = subscription.remove_query(query_id).ok_or(SubscriptionError::PredicateNotFound)?;
143
144 if let Some(selection) = &query_state.selection {
146 let mut watcher_set = self.0.watcher_set.lock().unwrap();
147 let watcher_id = (subscription_id, query_id);
148 watcher_set.recurse_predicate_watchers(&query_state.collection_id, &selection.predicate, watcher_id, WatcherOp::Remove);
149 }
150
151 Ok(())
152 }
153
154 pub fn add_entity_subscriptions(&self, subscription_id: ReactorSubscriptionId, entity_ids: impl IntoIterator<Item = proto::EntityId>) {
156 let subscription = {
157 let subscriptions = self.0.subscriptions.lock().unwrap();
158 subscriptions.get(&subscription_id).cloned()
159 };
160
161 if let Some(subscription) = subscription {
162 let mut watcher_set = self.0.watcher_set.lock().unwrap();
163 for entity_id in entity_ids {
164 subscription.add_entity_subscription(entity_id);
165 watcher_set.add_entity_subscription(subscription_id, entity_id);
166 }
167 }
168 }
169
170 pub fn remove_entity_subscriptions(
172 &self,
173 subscription_id: ReactorSubscriptionId,
174 entity_ids: impl IntoIterator<Item = proto::EntityId>,
175 ) {
176 let mut subscriptions = self.0.subscriptions.lock().unwrap();
177 let mut watcher_set = self.0.watcher_set.lock().unwrap();
178
179 if let Some(subscription) = subscriptions.get_mut(&subscription_id) {
180 for entity_id in entity_ids {
181 subscription.remove_entity_subscription(entity_id);
182
183 let should_remove = !subscription.any_query_matches(&entity_id);
186
187 if should_remove {
188 watcher_set.remove_entity_subscription(subscription_id, entity_id);
189 }
190 }
191 }
192 }
193}
194
195pub(crate) fn build_key_spec_from_selection<E: AbstractEntity>(
197 order_by: &[ankql::ast::OrderByItem],
198 resultset: &EntityResultSet<E>,
199) -> anyhow::Result<KeySpec> {
200 let mut keyparts = Vec::new();
201
202 let read = resultset.read();
203 for item in order_by {
204 let column = item.path.property().to_string();
206
207 let value_type = read.iter_entities().find_map(|(_, e)| e.value(&column).map(|v| ValueType::of(&v))).unwrap_or(ValueType::String); let direction: IndexDirection = match item.direction {
211 ankql::ast::OrderDirection::Asc => IndexDirection::Asc,
212 ankql::ast::OrderDirection::Desc => IndexDirection::Desc,
213 };
214
215 keyparts.push(IndexKeyPart { column, sub_path: None, direction, value_type, nulls: Some(NullsOrder::Last), collation: None });
216 }
217
218 Ok(KeySpec { keyparts })
219}
220
221impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Reactor<E, Ev> {
222 pub async fn add_query_and_notify<H: PreNotifyHook>(
229 &self,
230 subscription_id: ReactorSubscriptionId,
231 query_id: proto::QueryId,
232 collection_id: proto::CollectionId,
233 selection: ankql::ast::Selection,
234 node: &dyn crate::node::TNodeErased<E>,
235 resultset: EntityResultSet<E>,
236 gap_fetcher: std::sync::Arc<dyn GapFetcher<E>>,
237 pre_notify_hook: H,
238 ) -> anyhow::Result<()> {
239 let subscription = {
241 let subscriptions = self.0.subscriptions.lock().unwrap();
242 subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
243 };
244
245 let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
247
248 subscription.register_query(query_id, collection_id.clone(), resultset.clone(), gap_fetcher)?;
250
251 let mut reactor_update_items = Vec::new();
254 let _newly_added = subscription.update_query(
255 query_id,
256 collection_id.clone(),
257 selection.clone(),
258 included_entities,
259 1, &mut reactor_update_items,
261 )?;
262
263 subscription.fill_gaps_for_query(query_id, &mut reactor_update_items).await;
268
269 resultset.set_loaded(true);
271
272 pre_notify_hook.pre_notify(1);
274
275 subscription.send_update(reactor_update_items);
277
278 Ok(())
279 }
280
281 pub async fn update_query_and_notify<H: PreNotifyHook>(
286 &self,
287 subscription_id: ReactorSubscriptionId,
288 query_id: proto::QueryId,
289 collection_id: proto::CollectionId,
290 selection: ankql::ast::Selection,
291 node: &dyn crate::node::TNodeErased<E>,
292 version: u32,
293 pre_notify_hook: H,
294 ) -> anyhow::Result<()> {
295 let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
296
297 let subscription = {
298 let subscriptions = self.0.subscriptions.lock().unwrap();
299 subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
300 };
301
302 let mut reactor_update_items = Vec::new();
303 let _newly_added = subscription.update_query(
305 query_id,
306 collection_id.clone(),
307 selection.clone(),
308 included_entities,
309 version,
310 &mut reactor_update_items,
311 )?;
312
313 subscription.fill_gaps_for_query(query_id, &mut reactor_update_items).await;
317
318 pre_notify_hook.pre_notify(version);
320
321 if !reactor_update_items.is_empty() {
323 subscription.send_update(reactor_update_items);
324 }
325
326 Ok(())
327 }
328
329 pub async fn notify_change<C: ChangeNotification<Entity = E, Event = Ev> + Clone>(&self, changes: Vec<C>) {
331 let _notify_guard = self.0.notify_lock.lock().await;
333
334 let changes: Arc<Vec<C>> = Arc::from(changes);
336
337 tracing::debug!("Reactor.notify_change({} changes)", changes.len());
338
339 let mut candidates_by_sub: HashMap<ReactorSubscriptionId, CandidateChanges<C>> = HashMap::new();
341 {
342 let watcher_set = self.0.watcher_set.lock().unwrap();
343 for (offset, change) in changes.iter().enumerate() {
344 watcher_set.accumulate_interested_watchers(change.entity(), offset, &changes, &mut candidates_by_sub);
345 }
346 }
347
348 let evaluations = {
351 let subscriptions = self.0.subscriptions.lock().unwrap();
352 candidates_by_sub
353 .into_iter()
354 .filter_map(|(sub_id, candidates)| {
355 subscriptions.get(&sub_id).map(|subscription| subscription.clone().evaluate_changes(candidates))
356 })
357 .collect::<Vec<_>>()
358 };
359
360 let all_watcher_changes: Vec<WatcherChange> = join_all(evaluations).await.into_iter().flatten().collect();
362
363 let mut watcher_set = self.0.watcher_set.lock().unwrap();
366 for change in all_watcher_changes {
367 watcher_set.apply_watcher_change(change);
368 }
369 }
370
371 pub fn system_reset(&self) {
373 {
376 let mut watcher_set = self.0.watcher_set.lock().unwrap();
377 watcher_set.clear_entity_watchers();
378 }
379
380 let subscriptions = self.0.subscriptions.lock().unwrap();
381 for subscription in subscriptions.values() {
382 subscription.system_reset();
383 }
384 }
385}
386
387impl Reactor<Entity, ankurah_proto::Attested<ankurah_proto::Event>> {
389 pub async fn upsert_query<SE, PA>(
394 &self,
395 subscription_id: ReactorSubscriptionId,
396 query_id: proto::QueryId,
397 collection_id: proto::CollectionId,
398 selection: ankql::ast::Selection,
399 node: &crate::node::Node<SE, PA>,
400 cdata: &PA::ContextData,
401 version: u32,
402 ) -> anyhow::Result<Vec<Entity>>
403 where
404 SE: crate::storage::StorageEngine + Send + Sync + 'static,
405 PA: crate::policy::PolicyAgent + Send + Sync + 'static,
406 {
407 let subscription = {
408 let subscriptions = self.0.subscriptions.lock().unwrap();
409 subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
410 };
411
412 let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
413
414 let resultset = subscription.upsert_query(query_id, collection_id.clone(), node, cdata);
417
418 let mut all_entities =
420 subscription.update_query(query_id, collection_id.clone(), selection.clone(), included_entities, version, &mut ())?;
421
422 subscription.fill_gaps_for_query_entities(query_id, &mut all_entities).await;
426
427 resultset.set_loaded(true);
428
429 Ok(all_entities)
431 }
432}
433
434impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> std::fmt::Debug for Reactor<E, Ev> {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 let watcher_set = self.0.watcher_set.lock().unwrap();
437 let subscriptions = self.0.subscriptions.lock().unwrap();
438 let (index_watchers, wildcard_watchers, entity_watchers) = watcher_set.debug_data();
439 write!(
440 f,
441 "Reactor {{ subscriptions: {:?}, index_watchers: {:?}, wildcard_watchers: {:?}, entity_watchers: {:?} }}",
442 subscriptions, index_watchers, wildcard_watchers, entity_watchers
443 )
444 }
445}
446
447impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> std::fmt::Debug for Subscription<E, Ev> {
448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 write!(f, "Subscription {{ id: {:?}, queries: {} }}", self.id(), self.queries_len())
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::selection::filter::Filterable;
457 use ankurah_signals::Subscribe;
458 use proto::{CollectionId, QueryId};
459 use std::sync::Arc;
460
461 pub fn watcher<T: Clone + Send + 'static>() -> (Box<dyn Fn(T) + Send + Sync>, Box<dyn Fn() -> Vec<T> + Send + Sync>) {
462 let values = Arc::new(Mutex::new(Vec::new()));
463 let accumulate = {
464 let values = values.clone();
465 Box::new(move |value: T| {
466 values.lock().unwrap().push(value);
467 })
468 };
469
470 let check = Box::new(move || values.lock().unwrap().drain(..).collect());
471
472 (accumulate, check)
473 }
474
475 #[derive(Debug, Clone)]
476 struct TestEntity {
477 id: proto::EntityId,
478 collection: proto::CollectionId,
479 state: Arc<Mutex<HashMap<String, String>>>,
480 }
481 impl Eq for TestEntity {}
482 impl PartialEq for TestEntity {
483 fn eq(&self, other: &Self) -> bool { self.id == other.id }
484 }
485 impl PartialOrd for TestEntity {
486 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.id.cmp(&other.id)) }
487 }
488 #[derive(Debug, Clone, PartialEq)]
489 struct TestEvent {
490 id: proto::EventId,
491 collection: proto::CollectionId,
492 changes: HashMap<String, String>,
493 }
494 impl TestEntity {
495 fn new(name: &str, status: &str) -> Self {
496 Self {
497 id: proto::EntityId::new(),
498 collection: proto::CollectionId::fixed_name("album"),
499 state: Arc::new(Mutex::new(HashMap::from([
500 ("name".to_string(), name.to_string()),
501 ("status".to_string(), status.to_string()),
502 ]))),
503 }
504 }
505 }
506 impl Filterable for TestEntity {
507 fn collection(&self) -> &str { self.collection.as_str() }
508 fn value(&self, field: &str) -> Option<crate::value::Value> {
509 self.state.lock().unwrap().get(field).cloned().map(crate::value::Value::String)
510 }
511 }
512 impl AbstractEntity for TestEntity {
513 fn collection(&self) -> proto::CollectionId { self.collection.clone() }
514 fn id(&self) -> &proto::EntityId { &self.id }
515 fn value(&self, field: &str) -> Option<crate::value::Value> {
516 self.state.lock().unwrap().get(field).cloned().map(crate::value::Value::String)
517 }
518 }
519
520 struct MockGapFetcher {
522 entities: Vec<TestEntity>,
523 }
524
525 impl MockGapFetcher {
526 fn new() -> Self { Self { entities: Vec::new() } }
527
528 fn with_entities(entities: Vec<TestEntity>) -> Self { Self { entities } }
529 }
530
531 #[async_trait::async_trait]
532 impl GapFetcher<TestEntity> for MockGapFetcher {
533 async fn fetch_gap(
534 &self,
535 _collection_id: &proto::CollectionId,
536 _selection: &ankql::ast::Selection,
537 _last_entity: Option<&TestEntity>,
538 _gap_size: usize,
539 ) -> Result<Vec<TestEntity>, crate::error::RetrievalError> {
540 Ok(self.entities.clone())
542 }
543 }
544
545 struct MockNode {
547 entities: Vec<TestEntity>,
548 }
549
550 #[async_trait::async_trait]
551 impl crate::node::TNodeErased<TestEntity> for MockNode {
552 fn unsubscribe_remote_predicate(&self, _query_id: proto::QueryId) {}
553 fn update_remote_query(
554 &self,
555 _query_id: proto::QueryId,
556 _selection: ankql::ast::Selection,
557 _version: u32,
558 ) -> Result<(), anyhow::Error> {
559 Ok(())
560 }
561 async fn fetch_entities_from_local(
562 &self,
563 _collection_id: &proto::CollectionId,
564 _selection: &ankql::ast::Selection,
565 ) -> Result<Vec<TestEntity>, crate::error::RetrievalError> {
566 Ok(self.entities.clone())
567 }
568 fn reactor(&self) -> &Reactor<TestEntity> { panic!("MockNode::reactor() should not be called in this test") }
569 fn has_subscription_relay(&self) -> bool { false }
570 }
571
572 #[tokio::test]
575 async fn test_entity_remains_watched_after_predicate_stops_matching() {
576 let reactor = Reactor::<TestEntity, TestEvent>::new();
577
578 let rsub = reactor.subscribe();
580 let (w, check) = watcher::<ReactorUpdate<TestEntity, TestEvent>>();
581 let _guard = rsub.subscribe(w);
582
583 let query_id = QueryId::new();
584 let collection_id = CollectionId::fixed_name("album");
585 let selection: ankql::ast::Selection = "status = 'pending'".try_into().unwrap();
586 let entity1 = TestEntity::new("Test Album", "pending");
587 let resultset: EntityResultSet<TestEntity> = EntityResultSet::empty();
588 let mock_gap_fetcher = Arc::new(MockGapFetcher::new());
589 let mock_node = MockNode { entities: vec![entity1.clone()] };
590
591 reactor
593 .add_query_and_notify(rsub.id(), query_id, collection_id, selection, &mock_node, resultset, mock_gap_fetcher, ())
594 .await
595 .unwrap();
596
597 assert_eq!(
599 check(),
600 vec![ReactorUpdate {
601 items: vec![ReactorUpdateItem {
602 entity: entity1.clone(),
603 events: vec![],
604 predicate_relevance: vec![(query_id, MembershipChange::Initial)],
605 }],
606 }]
607 );
608
609 }
618
619 }