use std::{
marker::PhantomData,
sync::{Arc, Weak},
};
use ankurah_proto::{self as proto, CollectionId};
use ankurah_signals::{
broadcast::BroadcastId,
porcelain::subscribe::{IntoSubscribeListener, SubscriptionGuard},
signal::{Listener, ListenerGuard},
Get, Mut, Peek, Read, Signal, Subscribe,
};
use tracing::{debug, warn};
use crate::{
changes::ChangeSet,
entity::Entity,
error::RetrievalError,
model::View,
node::{MatchArgs, TNodeErased},
policy::PolicyAgent,
reactor::{
fetch_gap::{GapFetcher, QueryGapFetcher},
ReactorSubscription, ReactorUpdate,
},
resultset::{EntityResultSet, ResultSet},
storage::StorageEngine,
Node,
};
#[derive(Clone)]
pub struct EntityLiveQuery(Arc<Inner>);
struct Inner {
pub(crate) query_id: proto::QueryId,
pub(crate) node: Box<dyn TNodeErased>,
pub(crate) subscription: ReactorSubscription,
pub(crate) resultset: EntityResultSet,
pub(crate) error: Mut<Option<RetrievalError>>,
pub(crate) initialized: tokio::sync::Notify,
pub(crate) initialized_version: std::sync::atomic::AtomicU32,
pub(crate) current_version: std::sync::atomic::AtomicU32,
pub(crate) selection: Mut<(ankql::ast::Selection, u32)>,
pub(crate) collection_id: CollectionId,
pub(crate) gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>>,
}
pub struct WeakEntityLiveQuery(Weak<Inner>);
impl WeakEntityLiveQuery {
pub fn upgrade(&self) -> Option<EntityLiveQuery> { self.0.upgrade().map(EntityLiveQuery) }
}
impl Clone for WeakEntityLiveQuery {
fn clone(&self) -> Self { Self(self.0.clone()) }
}
#[derive(Clone)]
pub struct LiveQuery<R: View>(EntityLiveQuery, PhantomData<R>);
impl<R: View> std::ops::Deref for LiveQuery<R> {
type Target = EntityLiveQuery;
fn deref(&self) -> &Self::Target { &self.0 }
}
impl crate::reactor::PreNotifyHook for &EntityLiveQuery {
fn pre_notify(&self, version: u32) {
self.mark_initialized(version);
}
}
impl EntityLiveQuery {
pub fn new<SE, PA>(
node: &Node<SE, PA>,
collection_id: CollectionId,
mut args: MatchArgs,
cdata: PA::ContextData,
) -> Result<Self, RetrievalError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
node.policy_agent.can_access_collection(&cdata, &collection_id)?;
args.selection.predicate = node.policy_agent.filter_predicate(&cdata, &collection_id, args.selection.predicate)?;
args.selection = node.type_resolver.resolve_selection_types(args.selection);
let subscription = node.reactor.subscribe();
let resultset = EntityResultSet::empty();
let query_id = proto::QueryId::new();
let gap_fetcher: std::sync::Arc<dyn GapFetcher<Entity>> = std::sync::Arc::new(QueryGapFetcher::new(&node, cdata.clone()));
let me = Self(Arc::new(Inner {
query_id,
node: Box::new(node.clone()),
subscription,
resultset: resultset.clone(),
error: Mut::new(None),
initialized: tokio::sync::Notify::new(),
initialized_version: std::sync::atomic::AtomicU32::new(0), current_version: std::sync::atomic::AtomicU32::new(1), selection: Mut::new((args.selection.clone(), 1)), collection_id: collection_id.clone(),
gap_fetcher,
}));
let has_relay = node.subscription_relay.is_some();
if args.cached || !has_relay {
let me2 = me.clone();
debug!("LiveQuery::new() spawning initialization task for durable node predicate {}", query_id);
crate::task::spawn(async move {
debug!("LiveQuery initialization task starting for predicate {}", query_id);
if let Err(e) = me2.activate(1).await {
debug!("LiveQuery initialization failed for predicate {}: {}", query_id, e);
me2.0.error.set(Some(e));
} else {
debug!("LiveQuery initialization completed for predicate {}", query_id);
}
});
}
if has_relay {
node.subscribe_remote_query(query_id, collection_id.clone(), args.selection.clone(), cdata.clone(), 1, me.weak());
}
Ok(me)
}
pub fn map<R: View>(self) -> LiveQuery<R> { LiveQuery(self, PhantomData) }
pub async fn wait_initialized(&self) {
if self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed)
>= self.0.current_version.load(std::sync::atomic::Ordering::Relaxed)
{
return;
}
self.0.initialized.notified().await;
}
pub fn update_selection(
&self,
new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
) -> Result<(), RetrievalError> {
let new_selection = new_selection.try_into().map_err(|e| e.into())?;
let new_version = self.0.current_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
self.0.resultset.set_loaded(false);
self.0.selection.set((new_selection.clone(), new_version));
let has_relay = self.0.node.has_subscription_relay();
if has_relay {
self.0.node.update_remote_query(self.0.query_id, new_selection.clone(), new_version)?;
} else {
let me2 = self.clone();
let query_id = self.0.query_id;
crate::task::spawn(async move {
if let Err(e) = me2.activate(new_version).await {
tracing::error!("LiveQuery update failed for predicate {}: {}", query_id, e);
me2.0.error.set(Some(e));
}
});
}
Ok(())
}
pub async fn update_selection_wait(
&self,
new_selection: impl TryInto<ankql::ast::Selection, Error = impl Into<RetrievalError>>,
) -> Result<(), RetrievalError> {
self.update_selection(new_selection)?;
self.wait_initialized().await;
Ok(())
}
async fn activate(&self, version: u32) -> Result<(), RetrievalError> {
let (selection, stored_version) = self.0.selection.value();
if version < stored_version {
warn!("LiveQuery - Dropped stale activation request for version {} (current version is {})", version, stored_version);
return Ok(());
}
debug!("LiveQuery.activate() for predicate {} (version {})", self.0.query_id, version);
let reactor = self.0.node.reactor();
let initialized_version = self.0.initialized_version.load(std::sync::atomic::Ordering::Relaxed);
if initialized_version == 0 {
reactor
.add_query_and_notify(
self.0.subscription.id(),
self.0.query_id,
self.0.collection_id.clone(),
selection,
&*self.0.node,
self.0.resultset.clone(),
self.0.gap_fetcher.clone(),
self,
)
.await?
} else {
reactor
.update_query_and_notify(
self.0.subscription.id(),
self.0.query_id,
self.0.collection_id.clone(),
selection,
&*self.0.node,
version,
self,
)
.await?;
};
Ok(())
}
pub fn error(&self) -> Read<Option<RetrievalError>> { self.0.error.read() }
pub fn query_id(&self) -> proto::QueryId { self.0.query_id }
pub fn selection(&self) -> Read<(ankql::ast::Selection, u32)> { self.0.selection.read() }
pub fn weak(&self) -> WeakEntityLiveQuery { WeakEntityLiveQuery(Arc::downgrade(&self.0)) }
pub fn mark_initialized(&self, version: u32) {
self.0.initialized_version.store(version, std::sync::atomic::Ordering::Relaxed);
self.0.initialized.notify_waiters();
}
}
impl Drop for Inner {
fn drop(&mut self) { self.node.unsubscribe_remote_predicate(self.query_id); }
}
#[async_trait::async_trait]
impl crate::peer_subscription::RemoteQuerySubscriber for WeakEntityLiveQuery {
async fn subscription_established(&self, version: u32) {
if let Some(livequery) = self.upgrade() {
tracing::debug!("Subscription established for query {}: {}", livequery.0.query_id, version);
if let Err(e) = livequery.activate(version).await {
tracing::error!("Failed to activate subscription for query {}: {}", livequery.0.query_id, e);
livequery.0.error.set(Some(e));
}
}
}
fn set_last_error(&self, error: RetrievalError) {
if let Some(livequery) = self.upgrade() {
tracing::info!("Setting last error for LiveQuery {}: {}", livequery.0.query_id, error);
livequery.0.error.set(Some(error));
}
}
}
impl<R: View> LiveQuery<R> {
pub async fn wait_initialized(&self) { self.0.wait_initialized().await; }
pub fn resultset(&self) -> ResultSet<R> { self.0 .0.resultset.wrap::<R>() }
pub fn loaded(&self) -> bool { self.0 .0.resultset.is_loaded() }
pub fn ids(&self) -> Vec<proto::EntityId> { self.0 .0.resultset.keys().collect() }
pub fn ids_sorted(&self) -> Vec<proto::EntityId> {
use itertools::Itertools;
self.0 .0.resultset.keys().sorted().collect()
}
}
impl<R: View> Signal for LiveQuery<R> {
fn listen(&self, listener: Listener) -> ListenerGuard { self.0 .0.subscription.listen(listener) }
fn broadcast_id(&self) -> BroadcastId { self.0 .0.subscription.broadcast_id() }
}
impl<R: View + Clone + 'static> Get<Vec<R>> for LiveQuery<R> {
fn get(&self) -> Vec<R> {
use ankurah_signals::CurrentObserver;
CurrentObserver::track(&self);
self.0 .0.resultset.wrap::<R>().peek()
}
}
impl<R: View + Clone + 'static> Peek<Vec<R>> for LiveQuery<R> {
fn peek(&self) -> Vec<R> { self.0 .0.resultset.wrap().peek() }
}
impl<R: View> Subscribe<ChangeSet<R>> for LiveQuery<R>
where R: Clone + Send + Sync + 'static
{
fn subscribe<L>(&self, listener: L) -> SubscriptionGuard
where L: IntoSubscribeListener<ChangeSet<R>> {
let listener = listener.into_subscribe_listener();
let me = self.clone();
self.0 .0.subscription.subscribe(move |reactor_update: ReactorUpdate| {
let changeset: ChangeSet<R> = livequery_change_set_from(me.0 .0.resultset.wrap::<R>(), reactor_update);
listener(changeset);
})
}
}
fn livequery_change_set_from<R: View>(resultset: ResultSet<R>, reactor_update: ReactorUpdate) -> ChangeSet<R>
where R: View {
use crate::changes::{ChangeSet, ItemChange};
let mut changes = Vec::new();
for item in reactor_update.items {
let view = R::from_entity(item.entity);
if let Some((_, membership_change)) = item.predicate_relevance.first() {
match membership_change {
crate::reactor::MembershipChange::Initial => {
changes.push(ItemChange::Initial { item: view });
}
crate::reactor::MembershipChange::Add => {
changes.push(ItemChange::Add { item: view, events: item.events });
}
crate::reactor::MembershipChange::Remove => {
changes.push(ItemChange::Remove { item: view, events: item.events });
}
}
} else {
changes.push(ItemChange::Update { item: view, events: item.events });
}
}
ChangeSet { changes, resultset }
}