ankurah_core/
livequery.rs

1use std::{
2    marker::PhantomData,
3    sync::{atomic::AtomicBool, Arc, Weak},
4};
5
6use ankurah_proto::{self as proto, CollectionId};
7
8use ankurah_signals::{
9    broadcast::{BroadcastId, Listener, ListenerGuard},
10    porcelain::subscribe::{IntoSubscribeListener, SubscriptionGuard},
11    Get, Peek, Signal, Subscribe,
12};
13use tracing::{debug, warn};
14
15use crate::{
16    changes::ChangeSet,
17    entity::Entity,
18    error::RetrievalError,
19    model::View,
20    node::{MatchArgs, TNodeErased},
21    policy::PolicyAgent,
22    reactor::{
23        fetch_gap::{GapFetcher, QueryGapFetcher},
24        ReactorSubscription, ReactorUpdate,
25    },
26    resultset::{EntityResultSet, ResultSet},
27    storage::StorageEngine,
28    Node,
29};
30
31/// A local subscription that handles both reactor subscription and remote cleanup
32/// This is a type-erased version that can be used in the TContext trait
33#[derive(Clone)]
34pub struct EntityLiveQuery(Arc<Inner>);
35struct Inner {
36    pub(crate) query_id: proto::QueryId,
37    pub(crate) node: Box<dyn TNodeErased>,
38    // Store the actual subscription - now non-generic!
39    pub(crate) subscription: ReactorSubscription,
40    pub(crate) resultset: EntityResultSet,
41    pub(crate) has_error: AtomicBool,
42    pub(crate) error: std::sync::Mutex<Option<RetrievalError>>,
43    pub(crate) initialized: tokio::sync::Notify,
44    pub(crate) initialized_version: std::sync::atomic::AtomicU32,
45    // Version tracking for predicate updates
46    pub(crate) current_version: std::sync::atomic::AtomicU32,
47    // Store selection with its version (starts with version 1, updated on selection changes)
48    // This represents user intent (client-side state), separate from reactor's QueryState.selection (reactor-side state)
49    pub(crate) selection: std::sync::Mutex<(ankql::ast::Selection, u32)>,
50    // Store collection_id for selection updates
51    pub(crate) collection_id: CollectionId,
52    // Gap fetcher for reactor.add_query (type-erased)
53    pub(crate) gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>>,
54}
55
56/// Weak reference to EntityLiveQuery for breaking circular dependencies
57pub struct WeakEntityLiveQuery(Weak<Inner>);
58
59impl WeakEntityLiveQuery {
60    pub fn upgrade(&self) -> Option<EntityLiveQuery> { self.0.upgrade().map(EntityLiveQuery) }
61}
62
63impl Clone for WeakEntityLiveQuery {
64    fn clone(&self) -> Self { Self(self.0.clone()) }
65}
66
67#[derive(Clone)]
68pub struct LiveQuery<R: View>(EntityLiveQuery, PhantomData<R>);
69
70impl<R: View> std::ops::Deref for LiveQuery<R> {
71    type Target = EntityLiveQuery;
72    fn deref(&self) -> &Self::Target { &self.0 }
73}
74
75impl crate::reactor::PreNotifyHook for &EntityLiveQuery {
76    fn pre_notify(&self, version: u32) {
77        // Mark as initialized before notification is sent
78        self.mark_initialized(version);
79    }
80}
81
82impl EntityLiveQuery {
83    pub fn new<SE, PA>(
84        node: &Node<SE, PA>,
85        collection_id: CollectionId,
86        mut args: MatchArgs,
87        cdata: PA::ContextData,
88    ) -> Result<Self, RetrievalError>
89    where
90        SE: StorageEngine + Send + Sync + 'static,
91        PA: PolicyAgent + Send + Sync + 'static,
92    {
93        node.policy_agent.can_access_collection(&cdata, &collection_id)?;
94        args.selection.predicate = node.policy_agent.filter_predicate(&cdata, &collection_id, args.selection.predicate)?;
95
96        let subscription = node.reactor.subscribe();
97
98        let resultset = EntityResultSet::empty();
99        let query_id = proto::QueryId::new();
100        let gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>> = std::sync::Arc::new(QueryGapFetcher::new(&node, cdata.clone()));
101
102        let me = Self(Arc::new(Inner {
103            query_id,
104            node: Box::new(node.clone()),
105            subscription,
106            resultset: resultset.clone(),
107            has_error: AtomicBool::new(false),
108            error: std::sync::Mutex::new(None),
109            initialized: tokio::sync::Notify::new(),
110            initialized_version: std::sync::atomic::AtomicU32::new(0), // 0 means uninitialized
111            current_version: std::sync::atomic::AtomicU32::new(1),     // Start at version 1
112            selection: std::sync::Mutex::new((args.selection.clone(), 1)), // Start with version 1
113            collection_id: collection_id.clone(),
114            gap_fetcher,
115        }));
116
117        // Check if this is a durable node (no relay) or ephemeral node (has relay)
118        let has_relay = node.subscription_relay.is_some();
119
120        if args.cached || !has_relay {
121            // Durable node: spawn initialization task directly (no remote subscription needed)
122            let me2 = me.clone();
123
124            debug!("LiveQuery::new() spawning initialization task for durable node predicate {}", query_id);
125            crate::task::spawn(async move {
126                debug!("LiveQuery initialization task starting for predicate {}", query_id);
127                if let Err(e) = me2.activate(1).await {
128                    debug!("LiveQuery initialization failed for predicate {}: {}", query_id, e);
129                    me2.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
130                    *me2.0.error.lock().unwrap() = Some(e);
131                } else {
132                    debug!("LiveQuery initialization completed for predicate {}", query_id);
133                }
134            });
135        }
136
137        // Ephemeral node: register with relay for remote subscription
138        // Remote will call activate() after applying deltas via subscription_established
139        if has_relay {
140            node.subscribe_remote_query(query_id, collection_id.clone(), args.selection.clone(), cdata.clone(), 1, me.weak());
141        }
142
143        Ok(me)
144    }
145    pub fn map<R: View>(self) -> LiveQuery<R> { LiveQuery(self, PhantomData) }
146
147    /// Wait for the LiveQuery to be fully initialized with initial states
148    pub async fn wait_initialized(&self) {
149        // If already initialized, return immediately
150        if self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed)
151            >= self.0.current_version.load(std::sync::atomic::Ordering::Relaxed)
152        {
153            return;
154        }
155
156        // Otherwise wait for the notification
157        self.0.initialized.notified().await;
158    }
159
160    pub fn update_selection(
161        &self,
162        new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
163    ) -> Result<(), RetrievalError> {
164        let new_selection = new_selection.try_into().map_err(|e| e.into())?;
165
166        // Increment current_version atomically and get the new version number
167        let new_version = self.0.current_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
168
169        // Store new selection and version in pending_selection mutex
170        *self.0.selection.lock().unwrap() = (new_selection.clone(), new_version);
171
172        // Check if this node has a relay (ephemeral) or not (durable)
173        let has_relay = self.0.node.has_subscription_relay();
174
175        if has_relay {
176            // Ephemeral node: delegate to relay, which will call update_selection_init after applying deltas
177            self.0.node.update_remote_query(self.0.query_id, new_selection.clone(), new_version)?;
178        } else {
179            // Durable node: spawn task to call update_selection_init directly
180            let me2 = self.clone();
181            let query_id = self.0.query_id;
182
183            crate::task::spawn(async move {
184                if let Err(e) = me2.activate(new_version).await {
185                    tracing::error!("LiveQuery update failed for predicate {}: {}", query_id, e);
186                    me2.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
187                    *me2.0.error.lock().unwrap() = Some(e);
188                }
189            });
190        }
191
192        Ok(())
193    }
194
195    pub async fn update_selection_wait(
196        &self,
197        new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
198    ) -> Result<(), RetrievalError> {
199        self.update_selection(new_selection)?;
200        self.wait_initialized().await;
201        Ok(())
202    }
203
204    /// Activate the LiveQuery by fetching entities and calling reactor.add_query or reactor.update_query
205    /// Called after deltas have been applied for both initial subscription and selection updates
206    /// Gets all parameters from self.0 (collection_id, query_id, selection)
207    /// Marks initialization as complete regardless of success/failure
208    /// Rejects activation if the version is older than the current selection to prevent regression
209    async fn activate(&self, version: u32) -> Result<(), RetrievalError> {
210        // Get the current selection and its version
211        let (selection, stored_version) = self.0.selection.lock().unwrap().clone();
212
213        // Reject activation if this is an older version than what's currently stored
214        // This prevents out-of-order activations from regressing the state
215        if version < stored_version {
216            warn!("LiveQuery - Dropped stale activation request for version {} (current version is {})", version, stored_version);
217            return Ok(());
218        }
219
220        debug!("LiveQuery.activate() for predicate {} (version {})", self.0.query_id, version);
221
222        let reactor = self.0.node.reactor();
223        let initialized_version = self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed);
224
225        // Determine if this is the first activation (query not yet in reactor)
226        if initialized_version == 0 {
227            // First activation ever: call reactor.add_query_and_notify which will populate the resultset
228            // Pass self as pre_notify_hook to mark initialized before notification
229            reactor
230                .add_query_and_notify(
231                    self.0.subscription.id(),
232                    self.0.query_id,
233                    self.0.collection_id.clone(),
234                    selection,
235                    &*self.0.node,
236                    self.0.resultset.clone(),
237                    self.0.gap_fetcher.clone(),
238                    self,
239                )
240                .await?
241        } else {
242            // Subsequent activation (including cached re-initialization or selection update): use update_query_and_notify
243            // This handles both: (1) cached queries re-activating after remote deltas, and (2) selection updates
244            reactor
245                .update_query_and_notify(
246                    self.0.subscription.id(),
247                    self.0.query_id,
248                    self.0.collection_id.clone(),
249                    selection,
250                    &*self.0.node,
251                    version,
252                    self,
253                )
254                .await?;
255        };
256
257        Ok(())
258    }
259    pub fn error(&self) -> Option<RetrievalError> { self.0.error.lock().unwrap().take() }
260    pub fn query_id(&self) -> proto::QueryId { self.0.query_id }
261
262    /// Create a weak reference to this LiveQuery
263    pub fn weak(&self) -> WeakEntityLiveQuery { WeakEntityLiveQuery(Arc::downgrade(&self.0)) }
264
265    /// Mark initialization as complete for a given version
266    pub fn mark_initialized(&self, version: u32) {
267        // TASK: Serialize or coalesce concurrent activations to prevent version regression https://github.com/ankurah/ankurah/issues/146
268        self.0.initialized_version.store(version, std::sync::atomic::Ordering::Relaxed);
269        self.0.initialized.notify_waiters();
270    }
271}
272
273impl Drop for Inner {
274    fn drop(&mut self) { self.node.unsubscribe_remote_predicate(self.query_id); }
275}
276
277// Implement RemoteQuerySubscriber for WeakEntityLiveQuery to break circular dependencies
278#[async_trait::async_trait]
279impl crate::peer_subscription::RemoteQuerySubscriber for WeakEntityLiveQuery {
280    async fn subscription_established(&self, version: u32) {
281        // Try to upgrade the weak reference
282        if let Some(livequery) = self.upgrade() {
283            // Activate the query (fetch entities, call reactor, and mark initialized)
284            // Handle errors internally by setting last_error
285            if let Err(e) = livequery.activate(version).await {
286                tracing::error!("Failed to activate subscription for query {}: {}", livequery.0.query_id, e);
287                livequery.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
288                *livequery.0.error.lock().unwrap() = Some(e);
289            }
290        }
291        // If upgrade fails, the LiveQuery was already dropped - nothing to do
292    }
293
294    fn set_last_error(&self, error: RetrievalError) {
295        // Try to upgrade the weak reference
296        if let Some(livequery) = self.upgrade() {
297            livequery.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
298            *livequery.0.error.lock().unwrap() = Some(error);
299        }
300        // If upgrade fails, the LiveQuery was already dropped - nothing to do
301    }
302}
303
304impl<R: View> LiveQuery<R> {
305    /// Wait for the LiveQuery to be fully initialized with initial states
306    pub async fn wait_initialized(&self) { self.0.wait_initialized().await; }
307
308    pub fn resultset(&self) -> ResultSet<R> { self.0 .0.resultset.wrap::<R>() }
309
310    pub fn loaded(&self) -> bool { self.0 .0.resultset.is_loaded() }
311
312    pub fn ids(&self) -> Vec<proto::EntityId> { self.0 .0.resultset.keys().collect() }
313
314    pub fn ids_sorted(&self) -> Vec<proto::EntityId> {
315        use itertools::Itertools;
316        self.0 .0.resultset.keys().sorted().collect()
317    }
318}
319
320// Implement Signal trait - delegate to the resultset
321impl<R: View> Signal for LiveQuery<R> {
322    fn listen(&self, listener: Listener) -> ListenerGuard { self.0 .0.resultset.listen(listener) }
323
324    fn broadcast_id(&self) -> BroadcastId { self.0 .0.resultset.broadcast_id() }
325}
326
327// Implement Get trait - delegate to ResultSet<R>
328impl<R: View + Clone + 'static> Get<Vec<R>> for LiveQuery<R> {
329    fn get(&self) -> Vec<R> { self.0 .0.resultset.wrap::<R>().get() }
330}
331
332// Implement Peek trait - delegate to ResultSet<R>
333impl<R: View + Clone + 'static> Peek<Vec<R>> for LiveQuery<R> {
334    fn peek(&self) -> Vec<R> { self.0 .0.resultset.wrap().peek() }
335}
336
337// Implement Subscribe trait - convert ReactorUpdate to ChangeSet<R>
338impl<R: View> Subscribe<ChangeSet<R>> for LiveQuery<R>
339where R: Clone + Send + Sync + 'static
340{
341    fn subscribe<L>(&self, listener: L) -> SubscriptionGuard
342    where L: IntoSubscribeListener<ChangeSet<R>> {
343        let listener = listener.into_subscribe_listener();
344
345        let me = self.clone();
346        // Subscribe to the underlying ReactorUpdate stream and convert to ChangeSet<R>
347
348        self.0 .0.subscription.subscribe(move |reactor_update: ReactorUpdate| {
349            // Convert ReactorUpdate to ChangeSet<R>");
350            let changeset: ChangeSet<R> = livequery_change_set_from(me.0 .0.resultset.wrap::<R>(), reactor_update);
351            listener(changeset);
352        })
353    }
354}
355
356/// Notably, this function does not filter by query_id, because it should only be used by LiveQuery, which entails a single-predicate subscription
357fn livequery_change_set_from<R: View>(resultset: ResultSet<R>, reactor_update: ReactorUpdate) -> ChangeSet<R>
358where R: View {
359    use crate::changes::{ChangeSet, ItemChange};
360
361    let mut changes = Vec::new();
362
363    for item in reactor_update.items {
364        let view = R::from_entity(item.entity);
365
366        // Determine the change type based on predicate relevance
367        // ignore the query_id, because it should only be used by LiveQuery, which entails a single-predicate subscription
368        if let Some((_, membership_change)) = item.predicate_relevance.first() {
369            match membership_change {
370                crate::reactor::MembershipChange::Initial => {
371                    changes.push(ItemChange::Initial { item: view });
372                }
373                crate::reactor::MembershipChange::Add => {
374                    changes.push(ItemChange::Add { item: view, events: item.events });
375                }
376                crate::reactor::MembershipChange::Remove => {
377                    changes.push(ItemChange::Remove { item: view, events: item.events });
378                }
379            }
380        } else {
381            // No membership change, just an update
382            changes.push(ItemChange::Update { item: view, events: item.events });
383        }
384    }
385
386    ChangeSet { changes, resultset }
387}