1use std::{
2 marker::PhantomData,
3 sync::{atomic::AtomicBool, Arc, Weak},
4};
5
6use ankurah_proto::{self as proto, CollectionId};
7
8use ankurah_signals::{
9 broadcast::{BroadcastId, Listener, ListenerGuard},
10 porcelain::subscribe::{IntoSubscribeListener, SubscriptionGuard},
11 Get, Peek, Signal, Subscribe,
12};
13use tracing::{debug, warn};
14
15use crate::{
16 changes::ChangeSet,
17 entity::Entity,
18 error::RetrievalError,
19 model::View,
20 node::{MatchArgs, TNodeErased},
21 policy::PolicyAgent,
22 reactor::{
23 fetch_gap::{GapFetcher, QueryGapFetcher},
24 ReactorSubscription, ReactorUpdate,
25 },
26 resultset::{EntityResultSet, ResultSet},
27 storage::StorageEngine,
28 Node,
29};
30
31#[derive(Clone)]
34pub struct EntityLiveQuery(Arc<Inner>);
35struct Inner {
36 pub(crate) query_id: proto::QueryId,
37 pub(crate) node: Box<dyn TNodeErased>,
38 pub(crate) subscription: ReactorSubscription,
40 pub(crate) resultset: EntityResultSet,
41 pub(crate) has_error: AtomicBool,
42 pub(crate) error: std::sync::Mutex<Option<RetrievalError>>,
43 pub(crate) initialized: tokio::sync::Notify,
44 pub(crate) initialized_version: std::sync::atomic::AtomicU32,
45 pub(crate) current_version: std::sync::atomic::AtomicU32,
47 pub(crate) selection: std::sync::Mutex<(ankql::ast::Selection, u32)>,
50 pub(crate) collection_id: CollectionId,
52 pub(crate) gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>>,
54}
55
56pub struct WeakEntityLiveQuery(Weak<Inner>);
58
59impl WeakEntityLiveQuery {
60 pub fn upgrade(&self) -> Option<EntityLiveQuery> { self.0.upgrade().map(EntityLiveQuery) }
61}
62
63impl Clone for WeakEntityLiveQuery {
64 fn clone(&self) -> Self { Self(self.0.clone()) }
65}
66
67#[derive(Clone)]
68pub struct LiveQuery<R: View>(EntityLiveQuery, PhantomData<R>);
69
70impl<R: View> std::ops::Deref for LiveQuery<R> {
71 type Target = EntityLiveQuery;
72 fn deref(&self) -> &Self::Target { &self.0 }
73}
74
75impl crate::reactor::PreNotifyHook for &EntityLiveQuery {
76 fn pre_notify(&self, version: u32) {
77 self.mark_initialized(version);
79 }
80}
81
82impl EntityLiveQuery {
83 pub fn new<SE, PA>(
84 node: &Node<SE, PA>,
85 collection_id: CollectionId,
86 mut args: MatchArgs,
87 cdata: PA::ContextData,
88 ) -> Result<Self, RetrievalError>
89 where
90 SE: StorageEngine + Send + Sync + 'static,
91 PA: PolicyAgent + Send + Sync + 'static,
92 {
93 node.policy_agent.can_access_collection(&cdata, &collection_id)?;
94 args.selection.predicate = node.policy_agent.filter_predicate(&cdata, &collection_id, args.selection.predicate)?;
95
96 let subscription = node.reactor.subscribe();
97
98 let resultset = EntityResultSet::empty();
99 let query_id = proto::QueryId::new();
100 let gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>> = std::sync::Arc::new(QueryGapFetcher::new(&node, cdata.clone()));
101
102 let me = Self(Arc::new(Inner {
103 query_id,
104 node: Box::new(node.clone()),
105 subscription,
106 resultset: resultset.clone(),
107 has_error: AtomicBool::new(false),
108 error: std::sync::Mutex::new(None),
109 initialized: tokio::sync::Notify::new(),
110 initialized_version: std::sync::atomic::AtomicU32::new(0), current_version: std::sync::atomic::AtomicU32::new(1), selection: std::sync::Mutex::new((args.selection.clone(), 1)), collection_id: collection_id.clone(),
114 gap_fetcher,
115 }));
116
117 let has_relay = node.subscription_relay.is_some();
119
120 if args.cached || !has_relay {
121 let me2 = me.clone();
123
124 debug!("LiveQuery::new() spawning initialization task for durable node predicate {}", query_id);
125 crate::task::spawn(async move {
126 debug!("LiveQuery initialization task starting for predicate {}", query_id);
127 if let Err(e) = me2.activate(1).await {
128 debug!("LiveQuery initialization failed for predicate {}: {}", query_id, e);
129 me2.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
130 *me2.0.error.lock().unwrap() = Some(e);
131 } else {
132 debug!("LiveQuery initialization completed for predicate {}", query_id);
133 }
134 });
135 }
136
137 if has_relay {
140 node.subscribe_remote_query(query_id, collection_id.clone(), args.selection.clone(), cdata.clone(), 1, me.weak());
141 }
142
143 Ok(me)
144 }
145 pub fn map<R: View>(self) -> LiveQuery<R> { LiveQuery(self, PhantomData) }
146
147 pub async fn wait_initialized(&self) {
149 if self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed)
151 >= self.0.current_version.load(std::sync::atomic::Ordering::Relaxed)
152 {
153 return;
154 }
155
156 self.0.initialized.notified().await;
158 }
159
160 pub fn update_selection(
161 &self,
162 new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
163 ) -> Result<(), RetrievalError> {
164 let new_selection = new_selection.try_into().map_err(|e| e.into())?;
165
166 let new_version = self.0.current_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
168
169 *self.0.selection.lock().unwrap() = (new_selection.clone(), new_version);
171
172 let has_relay = self.0.node.has_subscription_relay();
174
175 if has_relay {
176 self.0.node.update_remote_query(self.0.query_id, new_selection.clone(), new_version)?;
178 } else {
179 let me2 = self.clone();
181 let query_id = self.0.query_id;
182
183 crate::task::spawn(async move {
184 if let Err(e) = me2.activate(new_version).await {
185 tracing::error!("LiveQuery update failed for predicate {}: {}", query_id, e);
186 me2.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
187 *me2.0.error.lock().unwrap() = Some(e);
188 }
189 });
190 }
191
192 Ok(())
193 }
194
195 pub async fn update_selection_wait(
196 &self,
197 new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
198 ) -> Result<(), RetrievalError> {
199 self.update_selection(new_selection)?;
200 self.wait_initialized().await;
201 Ok(())
202 }
203
204 async fn activate(&self, version: u32) -> Result<(), RetrievalError> {
210 let (selection, stored_version) = self.0.selection.lock().unwrap().clone();
212
213 if version < stored_version {
216 warn!("LiveQuery - Dropped stale activation request for version {} (current version is {})", version, stored_version);
217 return Ok(());
218 }
219
220 debug!("LiveQuery.activate() for predicate {} (version {})", self.0.query_id, version);
221
222 let reactor = self.0.node.reactor();
223 let initialized_version = self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed);
224
225 if initialized_version == 0 {
227 reactor
230 .add_query_and_notify(
231 self.0.subscription.id(),
232 self.0.query_id,
233 self.0.collection_id.clone(),
234 selection,
235 &*self.0.node,
236 self.0.resultset.clone(),
237 self.0.gap_fetcher.clone(),
238 self,
239 )
240 .await?
241 } else {
242 reactor
245 .update_query_and_notify(
246 self.0.subscription.id(),
247 self.0.query_id,
248 self.0.collection_id.clone(),
249 selection,
250 &*self.0.node,
251 version,
252 self,
253 )
254 .await?;
255 };
256
257 Ok(())
258 }
259 pub fn error(&self) -> Option<RetrievalError> { self.0.error.lock().unwrap().take() }
260 pub fn query_id(&self) -> proto::QueryId { self.0.query_id }
261
262 pub fn weak(&self) -> WeakEntityLiveQuery { WeakEntityLiveQuery(Arc::downgrade(&self.0)) }
264
265 pub fn mark_initialized(&self, version: u32) {
267 self.0.initialized_version.store(version, std::sync::atomic::Ordering::Relaxed);
269 self.0.initialized.notify_waiters();
270 }
271}
272
273impl Drop for Inner {
274 fn drop(&mut self) { self.node.unsubscribe_remote_predicate(self.query_id); }
275}
276
277#[async_trait::async_trait]
279impl crate::peer_subscription::RemoteQuerySubscriber for WeakEntityLiveQuery {
280 async fn subscription_established(&self, version: u32) {
281 if let Some(livequery) = self.upgrade() {
283 if let Err(e) = livequery.activate(version).await {
286 tracing::error!("Failed to activate subscription for query {}: {}", livequery.0.query_id, e);
287 livequery.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
288 *livequery.0.error.lock().unwrap() = Some(e);
289 }
290 }
291 }
293
294 fn set_last_error(&self, error: RetrievalError) {
295 if let Some(livequery) = self.upgrade() {
297 livequery.0.has_error.store(true, std::sync::atomic::Ordering::Relaxed);
298 *livequery.0.error.lock().unwrap() = Some(error);
299 }
300 }
302}
303
304impl<R: View> LiveQuery<R> {
305 pub async fn wait_initialized(&self) { self.0.wait_initialized().await; }
307
308 pub fn resultset(&self) -> ResultSet<R> { self.0 .0.resultset.wrap::<R>() }
309
310 pub fn loaded(&self) -> bool { self.0 .0.resultset.is_loaded() }
311
312 pub fn ids(&self) -> Vec<proto::EntityId> { self.0 .0.resultset.keys().collect() }
313
314 pub fn ids_sorted(&self) -> Vec<proto::EntityId> {
315 use itertools::Itertools;
316 self.0 .0.resultset.keys().sorted().collect()
317 }
318}
319
320impl<R: View> Signal for LiveQuery<R> {
322 fn listen(&self, listener: Listener) -> ListenerGuard { self.0 .0.resultset.listen(listener) }
323
324 fn broadcast_id(&self) -> BroadcastId { self.0 .0.resultset.broadcast_id() }
325}
326
327impl<R: View + Clone + 'static> Get<Vec<R>> for LiveQuery<R> {
329 fn get(&self) -> Vec<R> { self.0 .0.resultset.wrap::<R>().get() }
330}
331
332impl<R: View + Clone + 'static> Peek<Vec<R>> for LiveQuery<R> {
334 fn peek(&self) -> Vec<R> { self.0 .0.resultset.wrap().peek() }
335}
336
337impl<R: View> Subscribe<ChangeSet<R>> for LiveQuery<R>
339where R: Clone + Send + Sync + 'static
340{
341 fn subscribe<L>(&self, listener: L) -> SubscriptionGuard
342 where L: IntoSubscribeListener<ChangeSet<R>> {
343 let listener = listener.into_subscribe_listener();
344
345 let me = self.clone();
346 self.0 .0.subscription.subscribe(move |reactor_update: ReactorUpdate| {
349 let changeset: ChangeSet<R> = livequery_change_set_from(me.0 .0.resultset.wrap::<R>(), reactor_update);
351 listener(changeset);
352 })
353 }
354}
355
356fn livequery_change_set_from<R: View>(resultset: ResultSet<R>, reactor_update: ReactorUpdate) -> ChangeSet<R>
358where R: View {
359 use crate::changes::{ChangeSet, ItemChange};
360
361 let mut changes = Vec::new();
362
363 for item in reactor_update.items {
364 let view = R::from_entity(item.entity);
365
366 if let Some((_, membership_change)) = item.predicate_relevance.first() {
369 match membership_change {
370 crate::reactor::MembershipChange::Initial => {
371 changes.push(ItemChange::Initial { item: view });
372 }
373 crate::reactor::MembershipChange::Add => {
374 changes.push(ItemChange::Add { item: view, events: item.events });
375 }
376 crate::reactor::MembershipChange::Remove => {
377 changes.push(ItemChange::Remove { item: view, events: item.events });
378 }
379 }
380 } else {
381 changes.push(ItemChange::Update { item: view, events: item.events });
383 }
384 }
385
386 ChangeSet { changes, resultset }
387}