1use std::{
2 marker::PhantomData,
3 sync::{Arc, Weak},
4};
5
6use ankurah_proto::{self as proto, CollectionId};
7
8use ankurah_signals::{
9 broadcast::BroadcastId,
10 porcelain::subscribe::{IntoSubscribeListener, SubscriptionGuard},
11 signal::{Listener, ListenerGuard},
12 Get, Mut, Peek, Read, Signal, Subscribe,
13};
14use tracing::{debug, warn};
15
16use crate::{
17 changes::ChangeSet,
18 entity::Entity,
19 error::RetrievalError,
20 model::View,
21 node::{MatchArgs, TNodeErased},
22 policy::PolicyAgent,
23 reactor::{
24 fetch_gap::{GapFetcher, QueryGapFetcher},
25 ReactorSubscription, ReactorUpdate,
26 },
27 resultset::{EntityResultSet, ResultSet},
28 storage::StorageEngine,
29 Node,
30};
31
32#[derive(Clone)]
35pub struct EntityLiveQuery(Arc<Inner>);
36struct Inner {
37 pub(crate) query_id: proto::QueryId,
38 pub(crate) node: Box<dyn TNodeErased>,
39 pub(crate) subscription: ReactorSubscription,
41 pub(crate) resultset: EntityResultSet,
42 pub(crate) error: Mut<Option<RetrievalError>>,
43 pub(crate) initialized: tokio::sync::Notify,
44 pub(crate) initialized_version: std::sync::atomic::AtomicU32,
45 pub(crate) current_version: std::sync::atomic::AtomicU32,
47 pub(crate) selection: Mut<(ankql::ast::Selection, u32)>,
51 pub(crate) collection_id: CollectionId,
53 pub(crate) gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>>,
55}
56
57pub struct WeakEntityLiveQuery(Weak<Inner>);
59
60impl WeakEntityLiveQuery {
61 pub fn upgrade(&self) -> Option<EntityLiveQuery> { self.0.upgrade().map(EntityLiveQuery) }
62}
63
64impl Clone for WeakEntityLiveQuery {
65 fn clone(&self) -> Self { Self(self.0.clone()) }
66}
67
68#[derive(Clone)]
69pub struct LiveQuery<R: View>(EntityLiveQuery, PhantomData<R>);
70
71impl<R: View> std::ops::Deref for LiveQuery<R> {
72 type Target = EntityLiveQuery;
73 fn deref(&self) -> &Self::Target { &self.0 }
74}
75
76impl crate::reactor::PreNotifyHook for &EntityLiveQuery {
77 fn pre_notify(&self, version: u32) {
78 self.mark_initialized(version);
80 }
81}
82
83impl EntityLiveQuery {
84 pub fn new<SE, PA>(
85 node: &Node<SE, PA>,
86 collection_id: CollectionId,
87 mut args: MatchArgs,
88 cdata: PA::ContextData,
89 ) -> Result<Self, RetrievalError>
90 where
91 SE: StorageEngine + Send + Sync + 'static,
92 PA: PolicyAgent + Send + Sync + 'static,
93 {
94 node.policy_agent.can_access_collection(&cdata, &collection_id)?;
95 args.selection.predicate = node.policy_agent.filter_predicate(&cdata, &collection_id, args.selection.predicate)?;
96
97 args.selection = node.type_resolver.resolve_selection_types(args.selection);
99
100 let subscription = node.reactor.subscribe();
101
102 let resultset = EntityResultSet::empty();
103 let query_id = proto::QueryId::new();
104 let gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>> = std::sync::Arc::new(QueryGapFetcher::new(&node, cdata.clone()));
105
106 let me = Self(Arc::new(Inner {
107 query_id,
108 node: Box::new(node.clone()),
109 subscription,
110 resultset: resultset.clone(),
111 error: Mut::new(None),
112 initialized: tokio::sync::Notify::new(),
113 initialized_version: std::sync::atomic::AtomicU32::new(0), current_version: std::sync::atomic::AtomicU32::new(1), selection: Mut::new((args.selection.clone(), 1)), collection_id: collection_id.clone(),
117 gap_fetcher,
118 }));
119
120 let has_relay = node.subscription_relay.is_some();
122
123 if args.cached || !has_relay {
124 let me2 = me.clone();
126
127 debug!("LiveQuery::new() spawning initialization task for durable node predicate {}", query_id);
128 crate::task::spawn(async move {
129 debug!("LiveQuery initialization task starting for predicate {}", query_id);
130 if let Err(e) = me2.activate(1).await {
131 debug!("LiveQuery initialization failed for predicate {}: {}", query_id, e);
132
133 me2.0.error.set(Some(e));
134 } else {
135 debug!("LiveQuery initialization completed for predicate {}", query_id);
136 }
137 });
138 }
139
140 if has_relay {
143 node.subscribe_remote_query(query_id, collection_id.clone(), args.selection.clone(), cdata.clone(), 1, me.weak());
144 }
145
146 Ok(me)
147 }
148 pub fn map<R: View>(self) -> LiveQuery<R> { LiveQuery(self, PhantomData) }
149
150 pub async fn wait_initialized(&self) {
152 if self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed)
154 >= self.0.current_version.load(std::sync::atomic::Ordering::Relaxed)
155 {
156 return;
157 }
158
159 self.0.initialized.notified().await;
162 }
163
164 pub fn update_selection(
165 &self,
166 new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
167 ) -> Result<(), RetrievalError> {
168 let new_selection = new_selection.try_into().map_err(|e| e.into())?;
169
170 let new_version = self.0.current_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
172
173 self.0.resultset.set_loaded(false);
175
176 self.0.selection.set((new_selection.clone(), new_version));
178
179 let has_relay = self.0.node.has_subscription_relay();
181
182 if has_relay {
183 self.0.node.update_remote_query(self.0.query_id, new_selection.clone(), new_version)?;
185 } else {
186 let me2 = self.clone();
188 let query_id = self.0.query_id;
189
190 crate::task::spawn(async move {
191 if let Err(e) = me2.activate(new_version).await {
192 tracing::error!("LiveQuery update failed for predicate {}: {}", query_id, e);
193 me2.0.error.set(Some(e));
194 }
195 });
196 }
197
198 Ok(())
199 }
200
201 pub async fn update_selection_wait(
202 &self,
203 new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
204 ) -> Result<(), RetrievalError> {
205 self.update_selection(new_selection)?;
206 self.wait_initialized().await;
207 Ok(())
208 }
209
210 async fn activate(&self, version: u32) -> Result<(), RetrievalError> {
216 let (selection, stored_version) = self.0.selection.value();
218
219 if version < stored_version {
222 warn!("LiveQuery - Dropped stale activation request for version {} (current version is {})", version, stored_version);
223 return Ok(());
224 }
225
226 debug!("LiveQuery.activate() for predicate {} (version {})", self.0.query_id, version);
227
228 let reactor = self.0.node.reactor();
229 let initialized_version = self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed);
230
231 if initialized_version == 0 {
233 reactor
236 .add_query_and_notify(
237 self.0.subscription.id(),
238 self.0.query_id,
239 self.0.collection_id.clone(),
240 selection,
241 &*self.0.node,
242 self.0.resultset.clone(),
243 self.0.gap_fetcher.clone(),
244 self,
245 )
246 .await?
247 } else {
248 reactor
251 .update_query_and_notify(
252 self.0.subscription.id(),
253 self.0.query_id,
254 self.0.collection_id.clone(),
255 selection,
256 &*self.0.node,
257 version,
258 self,
259 )
260 .await?;
261 };
262
263 Ok(())
264 }
265 pub fn error(&self) -> Read<Option<RetrievalError>> { self.0.error.read() }
266 pub fn query_id(&self) -> proto::QueryId { self.0.query_id }
267 pub fn selection(&self) -> Read<(ankql::ast::Selection, u32)> { self.0.selection.read() }
268
269 pub fn weak(&self) -> WeakEntityLiveQuery { WeakEntityLiveQuery(Arc::downgrade(&self.0)) }
271
272 pub fn mark_initialized(&self, version: u32) {
274 self.0.initialized_version.store(version, std::sync::atomic::Ordering::Relaxed);
276 self.0.initialized.notify_waiters();
277 }
278}
279
280impl Drop for Inner {
281 fn drop(&mut self) { self.node.unsubscribe_remote_predicate(self.query_id); }
282}
283
284#[async_trait::async_trait]
286impl crate::peer_subscription::RemoteQuerySubscriber for WeakEntityLiveQuery {
287 async fn subscription_established(&self, version: u32) {
288 if let Some(livequery) = self.upgrade() {
290 tracing::debug!("Subscription established for query {}: {}", livequery.0.query_id, version);
293 if let Err(e) = livequery.activate(version).await {
294 tracing::error!("Failed to activate subscription for query {}: {}", livequery.0.query_id, e);
295 livequery.0.error.set(Some(e));
296 }
297 }
298 }
300
301 fn set_last_error(&self, error: RetrievalError) {
302 if let Some(livequery) = self.upgrade() {
304 tracing::info!("Setting last error for LiveQuery {}: {}", livequery.0.query_id, error);
305 livequery.0.error.set(Some(error));
306 }
307 }
309}
310
311impl<R: View> LiveQuery<R> {
312 pub async fn wait_initialized(&self) { self.0.wait_initialized().await; }
314
315 pub fn resultset(&self) -> ResultSet<R> { self.0 .0.resultset.wrap::<R>() }
316
317 pub fn loaded(&self) -> bool { self.0 .0.resultset.is_loaded() }
318
319 pub fn ids(&self) -> Vec<proto::EntityId> { self.0 .0.resultset.keys().collect() }
320
321 pub fn ids_sorted(&self) -> Vec<proto::EntityId> {
322 use itertools::Itertools;
323 self.0 .0.resultset.keys().sorted().collect()
324 }
325}
326
327impl<R: View> Signal for LiveQuery<R> {
330 fn listen(&self, listener: Listener) -> ListenerGuard { self.0 .0.subscription.listen(listener) }
331
332 fn broadcast_id(&self) -> BroadcastId { self.0 .0.subscription.broadcast_id() }
333}
334
335impl<R: View + Clone + 'static> Get<Vec<R>> for LiveQuery<R> {
337 fn get(&self) -> Vec<R> {
338 use ankurah_signals::CurrentObserver;
339 CurrentObserver::track(&self);
340 self.0 .0.resultset.wrap::<R>().peek()
341 }
342}
343
344impl<R: View + Clone + 'static> Peek<Vec<R>> for LiveQuery<R> {
346 fn peek(&self) -> Vec<R> { self.0 .0.resultset.wrap().peek() }
347}
348
349impl<R: View> Subscribe<ChangeSet<R>> for LiveQuery<R>
351where R: Clone + Send + Sync + 'static
352{
353 fn subscribe<L>(&self, listener: L) -> SubscriptionGuard
354 where L: IntoSubscribeListener<ChangeSet<R>> {
355 let listener = listener.into_subscribe_listener();
356
357 let me = self.clone();
358 self.0 .0.subscription.subscribe(move |reactor_update: ReactorUpdate| {
361 let changeset: ChangeSet<R> = livequery_change_set_from(me.0 .0.resultset.wrap::<R>(), reactor_update);
363 listener(changeset);
364 })
365 }
366}
367
368fn livequery_change_set_from<R: View>(resultset: ResultSet<R>, reactor_update: ReactorUpdate) -> ChangeSet<R>
370where R: View {
371 use crate::changes::{ChangeSet, ItemChange};
372
373 let mut changes = Vec::new();
374
375 for item in reactor_update.items {
376 let view = R::from_entity(item.entity);
377
378 if let Some((_, membership_change)) = item.predicate_relevance.first() {
381 match membership_change {
382 crate::reactor::MembershipChange::Initial => {
383 changes.push(ItemChange::Initial { item: view });
384 }
385 crate::reactor::MembershipChange::Add => {
386 changes.push(ItemChange::Add { item: view, events: item.events });
387 }
388 crate::reactor::MembershipChange::Remove => {
389 changes.push(ItemChange::Remove { item: view, events: item.events });
390 }
391 }
392 } else {
393 changes.push(ItemChange::Update { item: view, events: item.events });
395 }
396 }
397
398 ChangeSet { changes, resultset }
399}