Skip to main content

ankurah_core/
livequery.rs

1use std::{
2    marker::PhantomData,
3    sync::{Arc, Weak},
4};
5
6use ankurah_proto::{self as proto, CollectionId};
7
8use ankurah_signals::{
9    broadcast::BroadcastId,
10    porcelain::subscribe::{IntoSubscribeListener, SubscriptionGuard},
11    signal::{Listener, ListenerGuard},
12    Get, Mut, Peek, Read, Signal, Subscribe,
13};
14use tracing::{debug, warn};
15
16use crate::{
17    changes::ChangeSet,
18    entity::Entity,
19    error::RetrievalError,
20    model::View,
21    node::{MatchArgs, TNodeErased},
22    policy::PolicyAgent,
23    reactor::{
24        fetch_gap::{GapFetcher, QueryGapFetcher},
25        ReactorSubscription, ReactorUpdate,
26    },
27    resultset::{EntityResultSet, ResultSet},
28    storage::StorageEngine,
29    Node,
30};
31
32/// A local subscription that handles both reactor subscription and remote cleanup
33/// This is a type-erased version that can be used in the TContext trait
34#[derive(Clone)]
35pub struct EntityLiveQuery(Arc<Inner>);
36struct Inner {
37    pub(crate) query_id: proto::QueryId,
38    pub(crate) node: Box<dyn TNodeErased>,
39    // Store the actual subscription - now non-generic!
40    pub(crate) subscription: ReactorSubscription,
41    pub(crate) resultset: EntityResultSet,
42    pub(crate) error: Mut<Option<RetrievalError>>,
43    pub(crate) initialized: tokio::sync::Notify,
44    pub(crate) initialized_version: std::sync::atomic::AtomicU32,
45    // Version tracking for predicate updates
46    pub(crate) current_version: std::sync::atomic::AtomicU32,
47    // Store selection with its version (starts with version 1, updated on selection changes)
48    // This represents user intent (client-side state), separate from reactor's QueryState.selection (reactor-side state)
49    // Using Mut for reactive updates that can be observed in WASM
50    pub(crate) selection: Mut<(ankql::ast::Selection, u32)>,
51    // Store collection_id for selection updates
52    pub(crate) collection_id: CollectionId,
53    // Gap fetcher for reactor.add_query (type-erased)
54    pub(crate) gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>>,
55}
56
57/// Weak reference to EntityLiveQuery for breaking circular dependencies
58pub struct WeakEntityLiveQuery(Weak<Inner>);
59
60impl WeakEntityLiveQuery {
61    pub fn upgrade(&self) -> Option<EntityLiveQuery> { self.0.upgrade().map(EntityLiveQuery) }
62}
63
64impl Clone for WeakEntityLiveQuery {
65    fn clone(&self) -> Self { Self(self.0.clone()) }
66}
67
68#[derive(Clone)]
69pub struct LiveQuery<R: View>(EntityLiveQuery, PhantomData<R>);
70
71impl<R: View> std::ops::Deref for LiveQuery<R> {
72    type Target = EntityLiveQuery;
73    fn deref(&self) -> &Self::Target { &self.0 }
74}
75
76impl crate::reactor::PreNotifyHook for &EntityLiveQuery {
77    fn pre_notify(&self, version: u32) {
78        // Mark as initialized before notification is sent
79        self.mark_initialized(version);
80    }
81}
82
83impl EntityLiveQuery {
84    pub fn new<SE, PA>(
85        node: &Node<SE, PA>,
86        collection_id: CollectionId,
87        mut args: MatchArgs,
88        cdata: PA::ContextData,
89    ) -> Result<Self, RetrievalError>
90    where
91        SE: StorageEngine + Send + Sync + 'static,
92        PA: PolicyAgent + Send + Sync + 'static,
93    {
94        node.policy_agent.can_access_collection(&cdata, &collection_id)?;
95        args.selection.predicate = node.policy_agent.filter_predicate(&cdata, &collection_id, args.selection.predicate)?;
96
97        // Resolve types in the AST (converts literals for JSON path comparisons)
98        args.selection = node.type_resolver.resolve_selection_types(args.selection);
99
100        let subscription = node.reactor.subscribe();
101
102        let resultset = EntityResultSet::empty();
103        let query_id = proto::QueryId::new();
104        let gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>> = std::sync::Arc::new(QueryGapFetcher::new(&node, cdata.clone()));
105
106        let me = Self(Arc::new(Inner {
107            query_id,
108            node: Box::new(node.clone()),
109            subscription,
110            resultset: resultset.clone(),
111            error: Mut::new(None),
112            initialized: tokio::sync::Notify::new(),
113            initialized_version: std::sync::atomic::AtomicU32::new(0), // 0 means uninitialized
114            current_version: std::sync::atomic::AtomicU32::new(1),     // Start at version 1
115            selection: Mut::new((args.selection.clone(), 1)),          // Start with version 1
116            collection_id: collection_id.clone(),
117            gap_fetcher,
118        }));
119
120        // Check if this is a durable node (no relay) or ephemeral node (has relay)
121        let has_relay = node.subscription_relay.is_some();
122
123        if args.cached || !has_relay {
124            // Durable node: spawn initialization task directly (no remote subscription needed)
125            let me2 = me.clone();
126
127            debug!("LiveQuery::new() spawning initialization task for durable node predicate {}", query_id);
128            crate::task::spawn(async move {
129                debug!("LiveQuery initialization task starting for predicate {}", query_id);
130                if let Err(e) = me2.activate(1).await {
131                    debug!("LiveQuery initialization failed for predicate {}: {}", query_id, e);
132
133                    me2.0.error.set(Some(e));
134                } else {
135                    debug!("LiveQuery initialization completed for predicate {}", query_id);
136                }
137            });
138        }
139
140        // Ephemeral node: register with relay for remote subscription
141        // Remote will call activate() after applying deltas via subscription_established
142        if has_relay {
143            node.subscribe_remote_query(query_id, collection_id.clone(), args.selection.clone(), cdata.clone(), 1, me.weak());
144        }
145
146        Ok(me)
147    }
148    pub fn map<R: View>(self) -> LiveQuery<R> { LiveQuery(self, PhantomData) }
149
150    /// Wait for the LiveQuery to be fully initialized with initial states
151    pub async fn wait_initialized(&self) {
152        // If already initialized, return immediately
153        if self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed)
154            >= self.0.current_version.load(std::sync::atomic::Ordering::Relaxed)
155        {
156            return;
157        }
158
159        // FIXME - this should be waiting for the correct version, not any version
160        // Otherwise wait for the notification
161        self.0.initialized.notified().await;
162    }
163
164    pub fn update_selection(
165        &self,
166        new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
167    ) -> Result<(), RetrievalError> {
168        let new_selection = new_selection.try_into().map_err(|e| e.into())?;
169
170        // Increment current_version atomically and get the new version number
171        let new_version = self.0.current_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
172
173        // Mark resultset as not loaded since we're changing the selection
174        self.0.resultset.set_loaded(false);
175
176        // Store new selection and version
177        self.0.selection.set((new_selection.clone(), new_version));
178
179        // Check if this node has a relay (ephemeral) or not (durable)
180        let has_relay = self.0.node.has_subscription_relay();
181
182        if has_relay {
183            // Ephemeral node: delegate to relay, which will call update_selection_init after applying deltas
184            self.0.node.update_remote_query(self.0.query_id, new_selection.clone(), new_version)?;
185        } else {
186            // Durable node: spawn task to call update_selection_init directly
187            let me2 = self.clone();
188            let query_id = self.0.query_id;
189
190            crate::task::spawn(async move {
191                if let Err(e) = me2.activate(new_version).await {
192                    tracing::error!("LiveQuery update failed for predicate {}: {}", query_id, e);
193                    me2.0.error.set(Some(e));
194                }
195            });
196        }
197
198        Ok(())
199    }
200
201    pub async fn update_selection_wait(
202        &self,
203        new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
204    ) -> Result<(), RetrievalError> {
205        self.update_selection(new_selection)?;
206        self.wait_initialized().await;
207        Ok(())
208    }
209
210    /// Activate the LiveQuery by fetching entities and calling reactor.add_query or reactor.update_query
211    /// Called after deltas have been applied for both initial subscription and selection updates
212    /// Gets all parameters from self.0 (collection_id, query_id, selection)
213    /// Marks initialization as complete regardless of success/failure
214    /// Rejects activation if the version is older than the current selection to prevent regression
215    async fn activate(&self, version: u32) -> Result<(), RetrievalError> {
216        // Get the current selection and its version
217        let (selection, stored_version) = self.0.selection.value();
218
219        // Reject activation if this is an older version than what's currently stored
220        // This prevents out-of-order activations from regressing the state
221        if version < stored_version {
222            warn!("LiveQuery - Dropped stale activation request for version {} (current version is {})", version, stored_version);
223            return Ok(());
224        }
225
226        debug!("LiveQuery.activate() for predicate {} (version {})", self.0.query_id, version);
227
228        let reactor = self.0.node.reactor();
229        let initialized_version = self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed);
230
231        // Determine if this is the first activation (query not yet in reactor)
232        if initialized_version == 0 {
233            // First activation ever: call reactor.add_query_and_notify which will populate the resultset
234            // Pass self as pre_notify_hook to mark initialized before notification
235            reactor
236                .add_query_and_notify(
237                    self.0.subscription.id(),
238                    self.0.query_id,
239                    self.0.collection_id.clone(),
240                    selection,
241                    &*self.0.node,
242                    self.0.resultset.clone(),
243                    self.0.gap_fetcher.clone(),
244                    self,
245                )
246                .await?
247        } else {
248            // Subsequent activation (including cached re-initialization or selection update): use update_query_and_notify
249            // This handles both: (1) cached queries re-activating after remote deltas, and (2) selection updates
250            reactor
251                .update_query_and_notify(
252                    self.0.subscription.id(),
253                    self.0.query_id,
254                    self.0.collection_id.clone(),
255                    selection,
256                    &*self.0.node,
257                    version,
258                    self,
259                )
260                .await?;
261        };
262
263        Ok(())
264    }
265    pub fn error(&self) -> Read<Option<RetrievalError>> { self.0.error.read() }
266    pub fn query_id(&self) -> proto::QueryId { self.0.query_id }
267    pub fn selection(&self) -> Read<(ankql::ast::Selection, u32)> { self.0.selection.read() }
268
269    /// Create a weak reference to this LiveQuery
270    pub fn weak(&self) -> WeakEntityLiveQuery { WeakEntityLiveQuery(Arc::downgrade(&self.0)) }
271
272    /// Mark initialization as complete for a given version
273    pub fn mark_initialized(&self, version: u32) {
274        // TASK: Serialize or coalesce concurrent activations to prevent version regression https://github.com/ankurah/ankurah/issues/146
275        self.0.initialized_version.store(version, std::sync::atomic::Ordering::Relaxed);
276        self.0.initialized.notify_waiters();
277    }
278}
279
280impl Drop for Inner {
281    fn drop(&mut self) { self.node.unsubscribe_remote_predicate(self.query_id); }
282}
283
284// Implement RemoteQuerySubscriber for WeakEntityLiveQuery to break circular dependencies
285#[async_trait::async_trait]
286impl crate::peer_subscription::RemoteQuerySubscriber for WeakEntityLiveQuery {
287    async fn subscription_established(&self, version: u32) {
288        // Try to upgrade the weak reference
289        if let Some(livequery) = self.upgrade() {
290            // Activate the query (fetch entities, call reactor, and mark initialized)
291            // Handle errors internally by setting last_error
292            tracing::debug!("Subscription established for query {}: {}", livequery.0.query_id, version);
293            if let Err(e) = livequery.activate(version).await {
294                tracing::error!("Failed to activate subscription for query {}: {}", livequery.0.query_id, e);
295                livequery.0.error.set(Some(e));
296            }
297        }
298        // If upgrade fails, the LiveQuery was already dropped - nothing to do
299    }
300
301    fn set_last_error(&self, error: RetrievalError) {
302        // Try to upgrade the weak reference
303        if let Some(livequery) = self.upgrade() {
304            tracing::info!("Setting last error for LiveQuery {}: {}", livequery.0.query_id, error);
305            livequery.0.error.set(Some(error));
306        }
307        // If upgrade fails, the LiveQuery was already dropped - nothing to do
308    }
309}
310
311impl<R: View> LiveQuery<R> {
312    /// Wait for the LiveQuery to be fully initialized with initial states
313    pub async fn wait_initialized(&self) { self.0.wait_initialized().await; }
314
315    pub fn resultset(&self) -> ResultSet<R> { self.0 .0.resultset.wrap::<R>() }
316
317    pub fn loaded(&self) -> bool { self.0 .0.resultset.is_loaded() }
318
319    pub fn ids(&self) -> Vec<proto::EntityId> { self.0 .0.resultset.keys().collect() }
320
321    pub fn ids_sorted(&self) -> Vec<proto::EntityId> {
322        use itertools::Itertools;
323        self.0 .0.resultset.keys().sorted().collect()
324    }
325}
326
327// Implement Signal trait - delegate to the subscription (not resultset)
328// This ensures that LiveQuery tracking fires on ALL entity changes, not just membership changes
329impl<R: View> Signal for LiveQuery<R> {
330    fn listen(&self, listener: Listener) -> ListenerGuard { self.0 .0.subscription.listen(listener) }
331
332    fn broadcast_id(&self) -> BroadcastId { self.0 .0.subscription.broadcast_id() }
333}
334
335// Implement Get trait - delegate to ResultSet<R>
336impl<R: View + Clone + 'static> Get<Vec<R>> for LiveQuery<R> {
337    fn get(&self) -> Vec<R> {
338        use ankurah_signals::CurrentObserver;
339        CurrentObserver::track(&self);
340        self.0 .0.resultset.wrap::<R>().peek()
341    }
342}
343
344// Implement Peek trait - delegate to ResultSet<R>
345impl<R: View + Clone + 'static> Peek<Vec<R>> for LiveQuery<R> {
346    fn peek(&self) -> Vec<R> { self.0 .0.resultset.wrap().peek() }
347}
348
349// Implement Subscribe trait - convert ReactorUpdate to ChangeSet<R>
350impl<R: View> Subscribe<ChangeSet<R>> for LiveQuery<R>
351where R: Clone + Send + Sync + 'static
352{
353    fn subscribe<L>(&self, listener: L) -> SubscriptionGuard
354    where L: IntoSubscribeListener<ChangeSet<R>> {
355        let listener = listener.into_subscribe_listener();
356
357        let me = self.clone();
358        // Subscribe to the underlying ReactorUpdate stream and convert to ChangeSet<R>
359
360        self.0 .0.subscription.subscribe(move |reactor_update: ReactorUpdate| {
361            // Convert ReactorUpdate to ChangeSet<R>");
362            let changeset: ChangeSet<R> = livequery_change_set_from(me.0 .0.resultset.wrap::<R>(), reactor_update);
363            listener(changeset);
364        })
365    }
366}
367
368/// Notably, this function does not filter by query_id, because it should only be used by LiveQuery, which entails a single-predicate subscription
369fn livequery_change_set_from<R: View>(resultset: ResultSet<R>, reactor_update: ReactorUpdate) -> ChangeSet<R>
370where R: View {
371    use crate::changes::{ChangeSet, ItemChange};
372
373    let mut changes = Vec::new();
374
375    for item in reactor_update.items {
376        let view = R::from_entity(item.entity);
377
378        // Determine the change type based on predicate relevance
379        // ignore the query_id, because it should only be used by LiveQuery, which entails a single-predicate subscription
380        if let Some((_, membership_change)) = item.predicate_relevance.first() {
381            match membership_change {
382                crate::reactor::MembershipChange::Initial => {
383                    changes.push(ItemChange::Initial { item: view });
384                }
385                crate::reactor::MembershipChange::Add => {
386                    changes.push(ItemChange::Add { item: view, events: item.events });
387                }
388                crate::reactor::MembershipChange::Remove => {
389                    changes.push(ItemChange::Remove { item: view, events: item.events });
390                }
391            }
392        } else {
393            // No membership change, just an update
394            changes.push(ItemChange::Update { item: view, events: item.events });
395        }
396    }
397
398    ChangeSet { changes, resultset }
399}