Skip to main content

ankurah_core/
reactor.rs

1mod candidate_changes;
2mod comparison_index;
3pub mod fetch_gap;
4mod property_path;
5mod subscription;
6mod subscription_state;
7mod update;
8mod watcherset;
9
10pub(crate) use self::{
11    candidate_changes::CandidateChanges,
12    subscription::{ReactorSubscription, ReactorSubscriptionId},
13    update::{MembershipChange, ReactorUpdate, ReactorUpdateItem},
14    watcherset::{WatcherChange, WatcherSet},
15};
16
17// Re-export fetch_gap items
18pub(crate) use self::fetch_gap::GapFetcher;
19
20use crate::{
21    entity::Entity,
22    error::SubscriptionError,
23    indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder},
24    reactor::{subscription::ReactorSubInner, subscription_state::Subscription, watcherset::WatcherOp},
25    resultset::EntityResultSet,
26    selection::filter::Filterable,
27    value::{Value, ValueType},
28};
29use ankurah_proto::{self as proto};
30use futures::future::join_all;
31use std::{
32    collections::HashMap,
33    sync::{Arc, Mutex},
34};
35
36/// Trait for entities that can be used in reactor notifications
37pub trait AbstractEntity: Clone + std::fmt::Debug {
38    fn collection(&self) -> proto::CollectionId;
39    fn id(&self) -> &proto::EntityId;
40    fn value(&self, field: &str) -> Option<Value>;
41}
42
43/// Trait for types that can be used in notify_change
44pub trait ChangeNotification: std::fmt::Debug + std::fmt::Display {
45    type Entity: AbstractEntity;
46    type Event: Clone + std::fmt::Debug;
47
48    fn into_parts(self) -> (Self::Entity, Vec<Self::Event>);
49    fn entity(&self) -> &Self::Entity;
50    fn events(&self) -> &[Self::Event];
51}
52
53/// Hook trait for performing actions before notification is sent
54pub trait PreNotifyHook {
55    fn pre_notify(&self, version: u32);
56}
57
58/// No-op implementation for unit type
59impl PreNotifyHook for () {
60    fn pre_notify(&self, _version: u32) {}
61}
62
63/// A Reactor is a collection of subscriptions, which are to be notified of changes to a set of entities
64pub struct Reactor<
65    E: AbstractEntity + Filterable + Send + 'static = Entity,
66    Ev: Clone + Send + 'static = ankurah_proto::Attested<ankurah_proto::Event>,
67>(Arc<ReactorInner<E, Ev>>);
68
69struct ReactorInner<E: AbstractEntity + Filterable, Ev> {
70    subscriptions: std::sync::Mutex<HashMap<ReactorSubscriptionId, Subscription<E, Ev>>>,
71    // Shared with all subscriptions to allow them to manage their own watchers
72    watcher_set: Arc<std::sync::Mutex<WatcherSet>>,
73    /// Serializes notify_change invocations to ensure consistent watcher state
74    notify_lock: tokio::sync::Mutex<()>,
75}
76// don't require Clone SE or PA, because we have an Arc
77impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Clone for Reactor<E, Ev> {
78    fn clone(&self) -> Self { Self(self.0.clone()) }
79}
80
81impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Default for Reactor<E, Ev> {
82    fn default() -> Self { Self::new() }
83}
84
85impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Reactor<E, Ev> {
86    pub fn new() -> Self {
87        Self(Arc::new(ReactorInner {
88            subscriptions: Mutex::new(HashMap::new()),
89            watcher_set: Arc::new(Mutex::new(WatcherSet::new())),
90            notify_lock: tokio::sync::Mutex::new(()),
91        }))
92    }
93
94    /// Create a new subscription container
95    pub fn subscribe(&self) -> ReactorSubscription<E, Ev> {
96        let broadcast = ankurah_signals::broadcast::Broadcast::new();
97        let subscription = Subscription::new(broadcast.clone(), self.0.watcher_set.clone());
98        let subscription_id = subscription.id();
99        self.0.subscriptions.lock().unwrap().insert(subscription_id, subscription);
100        ReactorSubscription(Arc::new(ReactorSubInner { subscription_id, reactor: self.clone(), broadcast }))
101    }
102
103    /// Remove a subscription and all its predicates
104    pub(crate) fn unsubscribe(&self, sub_id: ReactorSubscriptionId) -> Result<(), SubscriptionError> {
105        let subscription = {
106            let mut subscriptions = self.0.subscriptions.lock().unwrap();
107            subscriptions.remove(&sub_id).ok_or(SubscriptionError::SubscriptionNotFound)?
108        };
109
110        // Get all queries for cleanup
111        let queries = subscription.take_all_queries();
112
113        // Remove all predicates from watchers
114        let mut watcher_set = self.0.watcher_set.lock().unwrap();
115        for (query_id, query_state) in queries {
116            // Remove from index watcher (only if selection was set)
117            if let Some(selection) = &query_state.selection {
118                watcher_set.recurse_predicate_watchers(
119                    &query_state.collection_id,
120                    &selection.predicate,
121                    (sub_id, query_id),
122                    WatcherOp::Remove,
123                );
124            }
125
126            // Remove from entity watchers using predicate's matching entities
127            let entity_ids: Vec<_> = query_state.resultset.keys().collect();
128            watcher_set.remove_entity_subscriptions(sub_id, entity_ids);
129        }
130
131        Ok(())
132    }
133
134    /// Remove a predicate from a subscription
135    pub fn remove_query(&self, subscription_id: ReactorSubscriptionId, query_id: proto::QueryId) -> Result<(), SubscriptionError> {
136        let subscription = {
137            let subscriptions = self.0.subscriptions.lock().unwrap();
138            subscriptions.get(&subscription_id).cloned().ok_or(SubscriptionError::SubscriptionNotFound)?
139        };
140
141        // Remove the query from the subscription
142        let query_state = subscription.remove_query(query_id).ok_or(SubscriptionError::PredicateNotFound)?;
143
144        // Remove from watchers (only if selection was set)
145        if let Some(selection) = &query_state.selection {
146            let mut watcher_set = self.0.watcher_set.lock().unwrap();
147            let watcher_id = (subscription_id, query_id);
148            watcher_set.recurse_predicate_watchers(&query_state.collection_id, &selection.predicate, watcher_id, WatcherOp::Remove);
149        }
150
151        Ok(())
152    }
153
154    /// Add entity subscriptions to a subscription
155    pub fn add_entity_subscriptions(&self, subscription_id: ReactorSubscriptionId, entity_ids: impl IntoIterator<Item = proto::EntityId>) {
156        let subscription = {
157            let subscriptions = self.0.subscriptions.lock().unwrap();
158            subscriptions.get(&subscription_id).cloned()
159        };
160
161        if let Some(subscription) = subscription {
162            let mut watcher_set = self.0.watcher_set.lock().unwrap();
163            for entity_id in entity_ids {
164                subscription.add_entity_subscription(entity_id);
165                watcher_set.add_entity_subscription(subscription_id, entity_id);
166            }
167        }
168    }
169
170    /// Remove entity subscriptions from a subscription
171    pub fn remove_entity_subscriptions(
172        &self,
173        subscription_id: ReactorSubscriptionId,
174        entity_ids: impl IntoIterator<Item = proto::EntityId>,
175    ) {
176        let mut subscriptions = self.0.subscriptions.lock().unwrap();
177        let mut watcher_set = self.0.watcher_set.lock().unwrap();
178
179        if let Some(subscription) = subscriptions.get_mut(&subscription_id) {
180            for entity_id in entity_ids {
181                subscription.remove_entity_subscription(entity_id);
182
183                // TODO: Check if any predicates match this entity before removing from entity_watchers
184                // For now, only remove if no predicates match
185                let should_remove = !subscription.any_query_matches(&entity_id);
186
187                if should_remove {
188                    watcher_set.remove_entity_subscription(subscription_id, entity_id);
189                }
190            }
191        }
192    }
193}
194
195/// Build KeySpec from Selection's ORDER BY clause with type inference from sample entities
196pub(crate) fn build_key_spec_from_selection<E: AbstractEntity>(
197    order_by: &[ankql::ast::OrderByItem],
198    resultset: &EntityResultSet<E>,
199) -> anyhow::Result<KeySpec> {
200    let mut keyparts = Vec::new();
201
202    let read = resultset.read();
203    for item in order_by {
204        // Use the property name from the path (currently only simple paths are supported in ORDER BY)
205        let column = item.path.property().to_string();
206
207        // Infer type from first non-null value in resultset entities
208        let value_type = read.iter_entities().find_map(|(_, e)| e.value(&column).map(|v| ValueType::of(&v))).unwrap_or(ValueType::String); // TODO: Get type from system catalog instead of defaulting to String
209
210        let direction: IndexDirection = match item.direction {
211            ankql::ast::OrderDirection::Asc => IndexDirection::Asc,
212            ankql::ast::OrderDirection::Desc => IndexDirection::Desc,
213        };
214
215        keyparts.push(IndexKeyPart { column, sub_path: None, direction, value_type, nulls: Some(NullsOrder::Last), collation: None });
216    }
217
218    Ok(KeySpec { keyparts })
219}
220
221impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> Reactor<E, Ev> {
222    /// Add a new predicate to a subscription (initial subscription only)
223    /// Fails if query_id already exists - use update_query for updates
224    ///
225    /// Add a query and send initialization notification (for local subscriptions)
226    /// Collects ReactorUpdateItems and sends them
227    /// pre_notify_hook is called before sending notification (e.g., to mark LiveQuery initialized)
228    pub async fn add_query_and_notify<H: PreNotifyHook>(
229        &self,
230        subscription_id: ReactorSubscriptionId,
231        query_id: proto::QueryId,
232        collection_id: proto::CollectionId,
233        selection: ankql::ast::Selection,
234        node: &dyn crate::node::TNodeErased<E>,
235        resultset: EntityResultSet<E>,
236        gap_fetcher: std::sync::Arc<dyn GapFetcher<E>>,
237        pre_notify_hook: H,
238    ) -> anyhow::Result<()> {
239        // Get subscription reference
240        let subscription = {
241            let subscriptions = self.0.subscriptions.lock().unwrap();
242            subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
243        };
244
245        // Fetch initial entities from local storage (do this first to avoid holding locks across await)
246        let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
247
248        // Register empty query state with subscription (will be populated by update_query)
249        subscription.register_query(query_id, collection_id.clone(), resultset.clone(), gap_fetcher)?;
250
251        // Populate the resultset and collect ReactorUpdateItems
252        // update_query now handles all watcher management internally (predicate + entity)
253        let mut reactor_update_items = Vec::new();
254        let _newly_added = subscription.update_query(
255            query_id,
256            collection_id.clone(),
257            selection.clone(),
258            included_entities,
259            1, // version 1 for initial add
260            &mut reactor_update_items,
261        )?;
262
263        // Fill gaps if needed for this specific query
264        // FIXME: Open question — is there a window where entity edits land between the local fetch
265        // above and downstream notification handling (reactor.notify_change + evaluate_changes)
266        // such that we need this gap fill to catch the missed edit-driven gap?
267        subscription.fill_gaps_for_query(query_id, &mut reactor_update_items).await;
268
269        // Mark as loaded
270        resultset.set_loaded(true);
271
272        // Call pre-notify hook (e.g., mark LiveQuery as initialized) with version 1
273        pre_notify_hook.pre_notify(1);
274
275        // Send the notification with collected items. We always notify because we're initializing the query.
276        subscription.send_update(reactor_update_items);
277
278        Ok(())
279    }
280
281    /// Update an existing predicate (v>0) and send notifications
282    /// Does diffing against the current resultset
283    /// Used by local LiveQuery updates
284    /// pre_notify_hook is called before sending notification (e.g., to mark LiveQuery initialized)
285    pub async fn update_query_and_notify<H: PreNotifyHook>(
286        &self,
287        subscription_id: ReactorSubscriptionId,
288        query_id: proto::QueryId,
289        collection_id: proto::CollectionId,
290        selection: ankql::ast::Selection,
291        node: &dyn crate::node::TNodeErased<E>,
292        version: u32,
293        pre_notify_hook: H,
294    ) -> anyhow::Result<()> {
295        let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
296
297        let subscription = {
298            let subscriptions = self.0.subscriptions.lock().unwrap();
299            subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
300        };
301
302        let mut reactor_update_items = Vec::new();
303        // Update query - watcher management is handled internally
304        let _newly_added = subscription.update_query(
305            query_id,
306            collection_id.clone(),
307            selection.clone(),
308            included_entities,
309            version,
310            &mut reactor_update_items,
311        )?;
312
313        // Fill gaps if needed for this specific query
314        // FIXME: Same open question as add_query_and_notify — do edits that slip in between the
315        // storage fetch and subsequent notify_change path require this gap fill to keep limits tight?
316        subscription.fill_gaps_for_query(query_id, &mut reactor_update_items).await;
317
318        // Call pre-notify hook (e.g., mark LiveQuery as initialized)
319        pre_notify_hook.pre_notify(version);
320
321        // Send reactor update
322        if !reactor_update_items.is_empty() {
323            subscription.send_update(reactor_update_items);
324        }
325
326        Ok(())
327    }
328
329    /// Notify subscriptions about an entity change
330    pub async fn notify_change<C: ChangeNotification<Entity = E, Event = Ev> + Clone>(&self, changes: Vec<C>) {
331        // Serialize notify_change invocations
332        let _notify_guard = self.0.notify_lock.lock().await;
333
334        // Wrap changes in Arc for sharing across subscriptions
335        let changes: Arc<Vec<C>> = Arc::from(changes);
336
337        tracing::debug!("Reactor.notify_change({} changes)", changes.len());
338
339        // Build per-subscription candidate accumulators (first lock of watcher_set)
340        let mut candidates_by_sub: HashMap<ReactorSubscriptionId, CandidateChanges<C>> = HashMap::new();
341        {
342            let watcher_set = self.0.watcher_set.lock().unwrap();
343            for (offset, change) in changes.iter().enumerate() {
344                watcher_set.accumulate_interested_watchers(change.entity(), offset, &changes, &mut candidates_by_sub);
345            }
346        }
347
348        // Parallelize evaluate_changes calls across subscriptions
349        // First, collect all the evaluation futures while holding the lock
350        let evaluations = {
351            let subscriptions = self.0.subscriptions.lock().unwrap();
352            candidates_by_sub
353                .into_iter()
354                .filter_map(|(sub_id, candidates)| {
355                    subscriptions.get(&sub_id).map(|subscription| subscription.clone().evaluate_changes(candidates))
356                })
357                .collect::<Vec<_>>()
358        };
359
360        // Now await all evaluations (lock is dropped)
361        let all_watcher_changes: Vec<WatcherChange> = join_all(evaluations).await.into_iter().flatten().collect();
362
363        // Apply all watcher changes to watcher_set (second lock of watcher_set)
364
365        let mut watcher_set = self.0.watcher_set.lock().unwrap();
366        for change in all_watcher_changes {
367            watcher_set.apply_watcher_change(change);
368        }
369    }
370
371    /// Notify all subscriptions that their entities have been removed but do not remove the subscriptions
372    pub fn system_reset(&self) {
373        // Clear entity watchers first - no entities are being watched after reset, because any previously existing entities "stopped existing"
374        // as part of the system reset.
375        {
376            let mut watcher_set = self.0.watcher_set.lock().unwrap();
377            watcher_set.clear_entity_watchers();
378        }
379
380        let subscriptions = self.0.subscriptions.lock().unwrap();
381        for subscription in subscriptions.values() {
382            subscription.system_reset();
383        }
384    }
385}
386
387// Entity-specific methods for remote subscriptions
388impl Reactor<Entity, ankurah_proto::Attested<ankurah_proto::Event>> {
389    /// Add or update a query for remote subscriptions (server-side)
390    /// This method is idempotent - it works whether the query exists or not
391    /// Constructs gap_fetcher internally using the provided Node and ContextData
392    /// Returns all entities that currently match the selection (for delta generation)
393    pub async fn upsert_query<SE, PA>(
394        &self,
395        subscription_id: ReactorSubscriptionId,
396        query_id: proto::QueryId,
397        collection_id: proto::CollectionId,
398        selection: ankql::ast::Selection,
399        node: &crate::node::Node<SE, PA>,
400        cdata: &PA::ContextData,
401        version: u32,
402    ) -> anyhow::Result<Vec<Entity>>
403    where
404        SE: crate::storage::StorageEngine + Send + Sync + 'static,
405        PA: crate::policy::PolicyAgent + Send + Sync + 'static,
406    {
407        let subscription = {
408            let subscriptions = self.0.subscriptions.lock().unwrap();
409            subscriptions.get(&subscription_id).cloned().ok_or_else(|| anyhow::anyhow!("Subscription {:?} not found", subscription_id))?
410        };
411
412        let included_entities = node.fetch_entities_from_local(&collection_id, &selection).await?;
413
414        // Upsert query - register if new or get existing resultset
415        // Gap fetcher is only created if query doesn't exist yet
416        let resultset = subscription.upsert_query(query_id, collection_id.clone(), node, cdata);
417
418        // Update query - watcher management is handled internally
419        let mut all_entities =
420            subscription.update_query(query_id, collection_id.clone(), selection.clone(), included_entities, version, &mut ())?;
421
422        // Fill gaps if needed for this specific query (also registers entity watchers)
423        // FIXME: Same follow-up — we should confirm whether edit-driven gaps can occur between the
424        // storage fetch and notify_change handling, which would make this gap fill mandatory.
425        subscription.fill_gaps_for_query_entities(query_id, &mut all_entities).await;
426
427        resultset.set_loaded(true);
428
429        // Return all entities (newly added + gap-filled)
430        Ok(all_entities)
431    }
432}
433
434impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> std::fmt::Debug for Reactor<E, Ev> {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        let watcher_set = self.0.watcher_set.lock().unwrap();
437        let subscriptions = self.0.subscriptions.lock().unwrap();
438        let (index_watchers, wildcard_watchers, entity_watchers) = watcher_set.debug_data();
439        write!(
440            f,
441            "Reactor {{ subscriptions: {:?}, index_watchers: {:?}, wildcard_watchers: {:?}, entity_watchers: {:?} }}",
442            subscriptions, index_watchers, wildcard_watchers, entity_watchers
443        )
444    }
445}
446
447impl<E: AbstractEntity + Filterable + Send + 'static, Ev: Clone + Send + 'static> std::fmt::Debug for Subscription<E, Ev> {
448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449        write!(f, "Subscription {{ id: {:?}, queries: {} }}", self.id(), self.queries_len())
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::selection::filter::Filterable;
457    use ankurah_signals::Subscribe;
458    use proto::{CollectionId, QueryId};
459    use std::sync::Arc;
460
461    pub fn watcher<T: Clone + Send + 'static>() -> (Box<dyn Fn(T) + Send + Sync>, Box<dyn Fn() -> Vec<T> + Send + Sync>) {
462        let values = Arc::new(Mutex::new(Vec::new()));
463        let accumulate = {
464            let values = values.clone();
465            Box::new(move |value: T| {
466                values.lock().unwrap().push(value);
467            })
468        };
469
470        let check = Box::new(move || values.lock().unwrap().drain(..).collect());
471
472        (accumulate, check)
473    }
474
475    #[derive(Debug, Clone)]
476    struct TestEntity {
477        id: proto::EntityId,
478        collection: proto::CollectionId,
479        state: Arc<Mutex<HashMap<String, String>>>,
480    }
481    impl Eq for TestEntity {}
482    impl PartialEq for TestEntity {
483        fn eq(&self, other: &Self) -> bool { self.id == other.id }
484    }
485    impl PartialOrd for TestEntity {
486        fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.id.cmp(&other.id)) }
487    }
488    #[derive(Debug, Clone, PartialEq)]
489    struct TestEvent {
490        id: proto::EventId,
491        collection: proto::CollectionId,
492        changes: HashMap<String, String>,
493    }
494    impl TestEntity {
495        fn new(name: &str, status: &str) -> Self {
496            Self {
497                id: proto::EntityId::new(),
498                collection: proto::CollectionId::fixed_name("album"),
499                state: Arc::new(Mutex::new(HashMap::from([
500                    ("name".to_string(), name.to_string()),
501                    ("status".to_string(), status.to_string()),
502                ]))),
503            }
504        }
505    }
506    impl Filterable for TestEntity {
507        fn collection(&self) -> &str { self.collection.as_str() }
508        fn value(&self, field: &str) -> Option<crate::value::Value> {
509            self.state.lock().unwrap().get(field).cloned().map(crate::value::Value::String)
510        }
511    }
512    impl AbstractEntity for TestEntity {
513        fn collection(&self) -> proto::CollectionId { self.collection.clone() }
514        fn id(&self) -> &proto::EntityId { &self.id }
515        fn value(&self, field: &str) -> Option<crate::value::Value> {
516            self.state.lock().unwrap().get(field).cloned().map(crate::value::Value::String)
517        }
518    }
519
520    /// Mock gap fetcher for testing
521    struct MockGapFetcher {
522        entities: Vec<TestEntity>,
523    }
524
525    impl MockGapFetcher {
526        fn new() -> Self { Self { entities: Vec::new() } }
527
528        fn with_entities(entities: Vec<TestEntity>) -> Self { Self { entities } }
529    }
530
531    #[async_trait::async_trait]
532    impl GapFetcher<TestEntity> for MockGapFetcher {
533        async fn fetch_gap(
534            &self,
535            _collection_id: &proto::CollectionId,
536            _selection: &ankql::ast::Selection,
537            _last_entity: Option<&TestEntity>,
538            _gap_size: usize,
539        ) -> Result<Vec<TestEntity>, crate::error::RetrievalError> {
540            // For testing, just return the pre-configured entities
541            Ok(self.entities.clone())
542        }
543    }
544
545    /// Mock node for testing
546    struct MockNode {
547        entities: Vec<TestEntity>,
548    }
549
550    #[async_trait::async_trait]
551    impl crate::node::TNodeErased<TestEntity> for MockNode {
552        fn unsubscribe_remote_predicate(&self, _query_id: proto::QueryId) {}
553        fn update_remote_query(
554            &self,
555            _query_id: proto::QueryId,
556            _selection: ankql::ast::Selection,
557            _version: u32,
558        ) -> Result<(), anyhow::Error> {
559            Ok(())
560        }
561        async fn fetch_entities_from_local(
562            &self,
563            _collection_id: &proto::CollectionId,
564            _selection: &ankql::ast::Selection,
565        ) -> Result<Vec<TestEntity>, crate::error::RetrievalError> {
566            Ok(self.entities.clone())
567        }
568        fn reactor(&self) -> &Reactor<TestEntity> { panic!("MockNode::reactor() should not be called in this test") }
569        fn has_subscription_relay(&self) -> bool { false }
570    }
571
572    /// Test that once a predicate matches an entity, that entity continues to be watched
573    /// by the ReactorSubscriptionId until the user explicitly unwatches it
574    #[tokio::test]
575    async fn test_entity_remains_watched_after_predicate_stops_matching() {
576        let reactor = Reactor::<TestEntity, TestEvent>::new();
577
578        // Set up a subscription with a predicate that matches status="pending"
579        let rsub = reactor.subscribe();
580        let (w, check) = watcher::<ReactorUpdate<TestEntity, TestEvent>>();
581        let _guard = rsub.subscribe(w);
582
583        let query_id = QueryId::new();
584        let collection_id = CollectionId::fixed_name("album");
585        let selection: ankql::ast::Selection = "status = 'pending'".try_into().unwrap();
586        let entity1 = TestEntity::new("Test Album", "pending");
587        let resultset: EntityResultSet<TestEntity> = EntityResultSet::empty();
588        let mock_gap_fetcher = Arc::new(MockGapFetcher::new());
589        let mock_node = MockNode { entities: vec![entity1.clone()] };
590
591        // Add query using the reactor - this should send Initial notification
592        reactor
593            .add_query_and_notify(rsub.id(), query_id, collection_id, selection, &mock_node, resultset, mock_gap_fetcher, ())
594            .await
595            .unwrap();
596
597        // something like this
598        assert_eq!(
599            check(),
600            vec![ReactorUpdate {
601                items: vec![ReactorUpdateItem {
602                    entity: entity1.clone(),
603                    events: vec![],
604                    predicate_relevance: vec![(query_id, MembershipChange::Initial)],
605                }],
606            }]
607        );
608
609        // TODO: For now, this test validates the setup. The actual notify_change test
610        // will require fixing the remaining compilation issues with Entity creation
611        // and the generic type constraints.
612
613        // The key behavior we want to test:
614        // 1. When notify_change is called with an entity that no longer matches the predicate
615        // 2. The Predicate watcher should be removed (entity no longer matches)
616        // 3. The Subscription watcher should remain (entity should stay watched)
617    }
618
619    // TODO: Add more test cases:
620    // 2. A watched entity _shall not_ become unwatched simply because a predicate stops matching
621    //    (partially covered above, but could be more explicit)
622    // 3. When the user expressly requests (via a pub method on reactor) that an entity be unwatched,
623    //    that request should be ignored if any predicates on that subscription still match the entity
624    // 4. Test consolidation of multiple predicates from same subscription in notify_change
625    // 5. Test that wildcard watchers work correctly
626    // 6. Test index_watchers for field-specific comparisons
627    // 7. Test proper cleanup when unsubscribing (all watchers removed)
628    // 8. Test multiple subscriptions watching the same entity
629}