ankurah_core/
node.rs

1use ankql::selection::filter::Filterable;
2use ankurah_proto::{self as proto, Attested, CollectionId, EntityState};
3use anyhow::anyhow;
4
5use rand::prelude::*;
6use std::{
7    fmt,
8    hash::Hash,
9    ops::Deref,
10    sync::{Arc, Weak},
11};
12use tokio::sync::oneshot;
13
14use crate::{
15    action_error, action_info,
16    changes::EntityChange,
17    collectionset::CollectionSet,
18    connector::{PeerSender, SendError},
19    context::Context,
20    entity::{Entity, WeakEntitySet},
21    error::{MutationError, RequestError, RetrievalError},
22    notice_info,
23    peer_subscription::{SubscriptionHandler, SubscriptionRelay},
24    policy::{AccessDenied, PolicyAgent},
25    reactor::{AbstractEntity, Reactor},
26    retrieval::LocalRetriever,
27    storage::StorageEngine,
28    system::SystemManager,
29    util::{safemap::SafeMap, safeset::SafeSet, Iterable},
30};
31use itertools::Itertools;
32#[cfg(feature = "instrument")]
33use tracing::instrument;
34
35use tracing::{debug, error, warn};
36
37pub struct PeerState {
38    sender: Box<dyn PeerSender>,
39    _durable: bool,
40    subscription_handler: SubscriptionHandler,
41    pending_requests: SafeMap<proto::RequestId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
42    pending_updates: SafeMap<proto::UpdateId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
43}
44
45impl PeerState {
46    pub fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError> { self.sender.send_message(message) }
47}
48
49pub struct MatchArgs {
50    pub selection: ankql::ast::Selection,
51    pub cached: bool,
52}
53
54impl TryInto<MatchArgs> for &str {
55    type Error = ankql::error::ParseError;
56    fn try_into(self) -> Result<MatchArgs, Self::Error> { Ok(MatchArgs { selection: ankql::parser::parse_selection(self)?, cached: true }) }
57}
58impl TryInto<MatchArgs> for String {
59    type Error = ankql::error::ParseError;
60    fn try_into(self) -> Result<MatchArgs, Self::Error> {
61        Ok(MatchArgs { selection: ankql::parser::parse_selection(&self)?, cached: true })
62    }
63}
64
65impl From<ankql::ast::Predicate> for MatchArgs {
66    fn from(val: ankql::ast::Predicate) -> Self {
67        MatchArgs { selection: ankql::ast::Selection { predicate: val, order_by: None, limit: None }, cached: true }
68    }
69}
70
71impl From<ankql::ast::Selection> for MatchArgs {
72    fn from(val: ankql::ast::Selection) -> Self { MatchArgs { selection: val, cached: true } }
73}
74
75impl From<ankql::error::ParseError> for RetrievalError {
76    fn from(e: ankql::error::ParseError) -> Self { RetrievalError::ParseError(e) }
77}
78
79pub fn nocache<T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError>>(s: T) -> Result<MatchArgs, ankql::error::ParseError> {
80    MatchArgs::nocache(s)
81}
82impl MatchArgs {
83    pub fn nocache<T>(s: T) -> Result<Self, ankql::error::ParseError>
84    where T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError> {
85        Ok(Self { selection: s.try_into()?, cached: false })
86    }
87}
88
89/// A participant in the Ankurah network, and primary place where queries are initiated
90
91pub struct Node<SE, PA>(pub(crate) Arc<NodeInner<SE, PA>>)
92where PA: PolicyAgent;
93impl<SE, PA> Clone for Node<SE, PA>
94where PA: PolicyAgent
95{
96    fn clone(&self) -> Self { Self(self.0.clone()) }
97}
98
99pub struct WeakNode<SE, PA>(Weak<NodeInner<SE, PA>>)
100where PA: PolicyAgent;
101impl<SE, PA> Clone for WeakNode<SE, PA>
102where PA: PolicyAgent
103{
104    fn clone(&self) -> Self { Self(self.0.clone()) }
105}
106
107impl<SE, PA> WeakNode<SE, PA>
108where PA: PolicyAgent
109{
110    pub fn upgrade(&self) -> Option<Node<SE, PA>> { self.0.upgrade().map(Node) }
111}
112
113impl<SE, PA> Deref for Node<SE, PA>
114where PA: PolicyAgent
115{
116    type Target = Arc<NodeInner<SE, PA>>;
117    fn deref(&self) -> &Self::Target { &self.0 }
118}
119
120/// Represents the user session - or whatever other context the PolicyAgent
121/// Needs to perform it's evaluation.
122pub trait ContextData: Send + Sync + Clone + Hash + Eq + 'static {}
123
124pub struct NodeInner<SE, PA>
125where PA: PolicyAgent
126{
127    pub id: proto::EntityId,
128    pub durable: bool,
129    pub collections: CollectionSet<SE>,
130
131    pub(crate) entities: WeakEntitySet,
132    peer_connections: SafeMap<proto::EntityId, Arc<PeerState>>,
133    durable_peers: SafeSet<proto::EntityId>,
134
135    pub(crate) predicate_context: SafeMap<proto::QueryId, PA::ContextData>,
136
137    /// The reactor for handling subscriptions
138    pub(crate) reactor: Reactor,
139    pub(crate) policy_agent: PA,
140    pub system: SystemManager<SE, PA>,
141
142    pub(crate) subscription_relay: Option<SubscriptionRelay<PA::ContextData, crate::livequery::WeakEntityLiveQuery>>,
143}
144
145impl<SE, PA> Node<SE, PA>
146where
147    SE: StorageEngine + Send + Sync + 'static,
148    PA: PolicyAgent + Send + Sync + 'static,
149{
150    pub fn new(engine: Arc<SE>, policy_agent: PA) -> Self {
151        let collections = CollectionSet::new(engine.clone());
152        let entityset: WeakEntitySet = Default::default();
153        let id = proto::EntityId::new();
154        let reactor = Reactor::new();
155        notice_info!("Node {id:#} created as ephemeral");
156
157        let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), false);
158
159        // Create subscription relay for ephemeral nodes
160        let subscription_relay = Some(SubscriptionRelay::new());
161
162        let node = Node(Arc::new(NodeInner {
163            id,
164            collections,
165            entities: entityset,
166            peer_connections: SafeMap::new(),
167            durable_peers: SafeSet::new(),
168            reactor,
169            durable: false,
170            policy_agent,
171            system: system_manager,
172            predicate_context: SafeMap::new(),
173            subscription_relay,
174        }));
175
176        // Set up the message sender for the subscription relay
177        if let Some(ref relay) = node.subscription_relay {
178            let weak_node = node.weak();
179            if relay.set_node(Arc::new(weak_node)).is_err() {
180                warn!("Failed to set message sender for subscription relay");
181            }
182        }
183
184        node
185    }
186    pub fn new_durable(engine: Arc<SE>, policy_agent: PA) -> Self {
187        let collections = CollectionSet::new(engine);
188        let entityset: WeakEntitySet = Default::default();
189        let id = proto::EntityId::new();
190        let reactor = Reactor::new();
191        notice_info!("Node {id:#} created as durable");
192
193        let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), true);
194
195        Node(Arc::new(NodeInner {
196            id,
197            collections,
198            entities: entityset,
199            peer_connections: SafeMap::new(),
200            durable_peers: SafeSet::new(),
201            reactor,
202            durable: true,
203            policy_agent,
204            system: system_manager,
205            predicate_context: SafeMap::new(),
206            subscription_relay: None,
207        }))
208    }
209    pub fn weak(&self) -> WeakNode<SE, PA> { WeakNode(Arc::downgrade(&self.0)) }
210
211    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %presence.node_id.to_base64_short(), durable = %presence.durable)))]
212    pub fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
213        action_info!(self, "register_peer", "{}", &presence);
214
215        let subscription_handler = SubscriptionHandler::new(presence.node_id, self);
216        self.peer_connections.insert(
217            presence.node_id,
218            Arc::new(PeerState {
219                sender,
220                _durable: presence.durable,
221                subscription_handler,
222                pending_requests: SafeMap::new(),
223                pending_updates: SafeMap::new(),
224            }),
225        );
226        if presence.durable {
227            self.durable_peers.insert(presence.node_id);
228
229            // Notify subscription relay of new durable peer connection
230            if let Some(ref relay) = self.subscription_relay {
231                relay.notify_peer_connected(presence.node_id);
232            }
233
234            if !self.durable {
235                if let Some(system_root) = presence.system_root {
236                    action_info!(self, "received system root", "{}", &system_root.payload);
237                    let me = self.clone();
238                    crate::task::spawn(async move {
239                        if let Err(e) = me.system.join_system(system_root).await {
240                            action_error!(me, "failed to join system", "{}", &e);
241                        } else {
242                            action_info!(me, "successfully joined system");
243                        }
244                    });
245                } else {
246                    error!("Node({}) durable peer {} has no system root", self.id, presence.node_id);
247                }
248            }
249        }
250        // TODO send hello message to the peer, including present head state for all relevant collections
251    }
252    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %node_id.to_base64_short())))]
253    pub fn deregister_peer(&self, node_id: proto::EntityId) {
254        notice_info!("Node({:#}) deregister_peer {:#}", self.id, node_id);
255
256        self.durable_peers.remove(&node_id);
257        // Get and cleanup subscriptions before removing the peer
258        if let Some(peer_state) = self.peer_connections.remove(&node_id) {
259            action_info!(self, "unsubscribing", "subscription {} for peer {}", peer_state.subscription_handler.subscription_id(), node_id);
260            // ReactorSubscription is automatically unsubscribed on drop
261        }
262
263        // Notify subscription relay of peer disconnection (unconditional - relay handles filtering)
264        if let Some(ref relay) = self.subscription_relay {
265            relay.notify_peer_disconnected(node_id);
266        }
267    }
268    #[cfg_attr(feature = "instrument", instrument(skip_all, fields(node_id = %node_id, request_body = %request_body)))]
269    pub async fn request<'a, C>(
270        &self,
271        node_id: proto::EntityId,
272        cdata: &C,
273        request_body: proto::NodeRequestBody,
274    ) -> Result<proto::NodeResponseBody, RequestError>
275    where
276        C: Iterable<PA::ContextData>,
277    {
278        let (response_tx, response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
279        let request_id = proto::RequestId::new();
280
281        let request = proto::NodeRequest { id: request_id.clone(), to: node_id, from: self.id, body: request_body };
282        let auth = self.policy_agent.sign_request(self, cdata, &request);
283
284        // Get the peer connection
285        let connection = self.peer_connections.get(&node_id).ok_or(RequestError::PeerNotConnected)?;
286
287        connection.pending_requests.insert(request_id, response_tx);
288        connection.send_message(proto::NodeMessage::Request { auth, request })?;
289
290        // Wait for response
291        response_rx.await.map_err(|_| RequestError::InternalChannelClosed)?
292    }
293
294    // TODO LATER: rework this to be retried in the background some number of times
295    pub fn send_update(&self, node_id: proto::EntityId, notification: proto::NodeUpdateBody) {
296        // same as request, minus cdata and the sign_request step
297        debug!("{self}.send_update({node_id:#}, {notification})");
298        let (response_tx, _response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
299        let id = proto::UpdateId::new();
300
301        // Get the peer connection
302        let Some(connection) = self.peer_connections.get(&node_id) else {
303            warn!("Failed to send update to peer {}: {}", node_id, RequestError::PeerNotConnected);
304            return;
305        };
306
307        // Store the response channel
308        connection.pending_updates.insert(id.clone(), response_tx);
309
310        let notification = proto::NodeMessage::Update(proto::NodeUpdate { id, from: self.id, to: node_id, body: notification });
311
312        match connection.send_message(notification) {
313            Ok(_) => {}
314            Err(e) => {
315                warn!("Failed to send update to peer {}: {}", node_id, e);
316            }
317        };
318
319        // response_rx.await.map_err(|_| RequestError::InternalChannelClosed)??;
320    }
321
322    // TODO add a node id argument to this function rather than getting it from the message
323    // (does this actually make it more secure? or just move the place they could lie to us to the handshake?)
324    // Not if its signed by a node key.
325    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(message = %message)))]
326    pub async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
327        match message {
328            proto::NodeMessage::Update(update) => {
329                debug!("Node({}) received update {}", self.id, update);
330
331                if let Some(sender) = { self.peer_connections.get(&update.from).map(|c| c.sender.cloned()) } {
332                    let _from = update.from;
333                    let _id = update.id.clone();
334                    if update.to != self.id {
335                        warn!("{} received message from {} but is not the intended recipient", self.id, update.from);
336                        return Ok(());
337                    }
338
339                    // take down the return address
340                    let id = update.id.clone();
341                    let to = update.from;
342                    let from = self.id;
343
344                    // TODO - validate the from node id is the one we're connected to
345                    let body = match self.handle_update(update).await {
346                        Ok(_) => proto::NodeUpdateAckBody::Success,
347                        Err(e) => proto::NodeUpdateAckBody::Error(e.to_string()),
348                    };
349
350                    sender.send_message(proto::NodeMessage::UpdateAck(proto::NodeUpdateAck { id, from, to, body }))?;
351                }
352            }
353            proto::NodeMessage::UpdateAck(ack) => {
354                debug!("Node({}) received ack notification {} {}", self.id, ack.id, ack.body);
355                // let connection = self.peer_connections.get(&ack.from).ok_or(RequestError::PeerNotConnected)?;
356                // if let Some(tx) = connection.pending_updates.remove(&ack.id) {
357                //     tx.send(Ok(proto::NodeResponseBody::Success)).unwrap();
358                // }
359            }
360            proto::NodeMessage::Request { auth, request } => {
361                debug!("Node({}) received request {}", self.id, request);
362                // TODO: Should we spawn a task here and make handle_message synchronous?
363                // I think this depends on how we want to handle timeouts.
364                // I think we want timeouts to be handled by the node, not the connector,
365                // which would lend itself to spawning a task here and making this function synchronous.
366
367                let cdata = self.policy_agent.check_request(self, &auth, &request).await?;
368
369                // double check to make sure we have a connection to the peer based on the node id
370                if let Some(sender) = { self.peer_connections.get(&request.from).map(|c| c.sender.cloned()) } {
371                    let from = request.from;
372                    let request_id = request.id.clone();
373                    if request.to != self.id {
374                        warn!("{} received message from {} but is not the intended recipient", self.id, request.from);
375                        return Ok(());
376                    }
377
378                    let body = match self.handle_request(&cdata, request).await {
379                        Ok(result) => result,
380                        Err(e) => proto::NodeResponseBody::Error(e.to_string()),
381                    };
382                    let _result = sender.send_message(proto::NodeMessage::Response(proto::NodeResponse {
383                        request_id,
384                        from: self.id,
385                        to: from,
386                        body,
387                    }));
388                }
389            }
390            proto::NodeMessage::Response(response) => {
391                debug!("Node {} received response {}", self.id, response);
392                let connection = self.peer_connections.get(&response.from).ok_or(RequestError::PeerNotConnected)?;
393                if let Some(tx) = connection.pending_requests.remove(&response.request_id) {
394                    tx.send(Ok(response.body)).map_err(|e| anyhow!("Failed to send response: {:?}", e))?;
395                }
396            }
397            proto::NodeMessage::UnsubscribeQuery { from, query_id } => {
398                // Remove predicate from the peer's subscription
399                if let Some(peer_state) = self.peer_connections.get(&from) {
400                    peer_state.subscription_handler.remove_predicate(query_id)?;
401                }
402            }
403        }
404        Ok(())
405    }
406
407    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(request = %request)))]
408    async fn handle_request<C>(&self, cdata: &C, request: proto::NodeRequest) -> anyhow::Result<proto::NodeResponseBody>
409    where C: Iterable<PA::ContextData> {
410        match request.body {
411            proto::NodeRequestBody::CommitTransaction { id, events } => {
412                // TODO - relay to peers in a gossipy/resource-available manner, so as to improve propagation
413                // With moderate potential for duplication, while not creating message loops
414                // Doing so would be a secondary/tertiary/etc hop for this message
415                let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for CommitTransaction"))?;
416                match self.commit_remote_transaction(cdata, id.clone(), events).await {
417                    Ok(_) => Ok(proto::NodeResponseBody::CommitComplete { id }),
418                    Err(e) => Ok(proto::NodeResponseBody::Error(e.to_string())),
419                }
420            }
421            proto::NodeRequestBody::Fetch { collection, mut selection, known_matches } => {
422                self.policy_agent.can_access_collection(cdata, &collection)?;
423                let storage_collection = self.collections.get(&collection).await?;
424                selection.predicate = self.policy_agent.filter_predicate(cdata, &collection, selection.predicate)?;
425
426                // Expand initial_states to include entities from known_matches that weren't in the predicate results
427                let expanded_states = crate::util::expand_states::expand_states(
428                    storage_collection.fetch_states(&selection).await?,
429                    known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
430                    &storage_collection,
431                )
432                .await?;
433
434                let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
435
436                let mut deltas = Vec::new();
437                for state in expanded_states {
438                    if self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state).is_err() {
439                        continue;
440                    }
441
442                    // Generate delta based on known_matches (returns None if heads are equal)
443                    // No need to reconstruct Entity - work directly with EntityState
444                    if let Some(delta) = self.generate_entity_delta(&known_map, state, &storage_collection).await? {
445                        deltas.push(delta);
446                    }
447                }
448                Ok(proto::NodeResponseBody::Fetch(deltas))
449            }
450            proto::NodeRequestBody::Get { collection, ids } => {
451                self.policy_agent.can_access_collection(cdata, &collection)?;
452                let storage_collection = self.collections.get(&collection).await?;
453
454                // filter out any that the policy agent says we don't have access to
455                let mut states = Vec::new();
456                for state in storage_collection.get_states(ids).await? {
457                    match self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state) {
458                        Ok(_) => states.push(state),
459                        Err(AccessDenied::ByPolicy(_)) => {}
460                        // TODO: we need to have a cleaner delineation between actual access denied versus processing errors
461                        Err(e) => return Err(anyhow!("Error from peer get: {}", e)),
462                    }
463                }
464
465                Ok(proto::NodeResponseBody::Get(states))
466            }
467            proto::NodeRequestBody::GetEvents { collection, event_ids } => {
468                self.policy_agent.can_access_collection(cdata, &collection)?;
469                let storage_collection = self.collections.get(&collection).await?;
470
471                // filter out any that the policy agent says we don't have access to
472                let mut events = Vec::new();
473                for event in storage_collection.get_events(event_ids).await? {
474                    match self.policy_agent.check_read_event(cdata, &event) {
475                        Ok(_) => events.push(event),
476                        Err(AccessDenied::ByPolicy(_)) => {}
477                        // TODO: we need to have a cleaner delineation between actual access denied versus processing errors
478                        Err(e) => return Err(anyhow!("Error from peer subscription: {}", e)),
479                    }
480                }
481
482                Ok(proto::NodeResponseBody::GetEvents(events))
483            }
484            proto::NodeRequestBody::SubscribeQuery { query_id, collection, selection, version, known_matches } => {
485                let peer_state = self.peer_connections.get(&request.from).ok_or_else(|| anyhow!("Peer {} not connected", request.from))?;
486                // only one cdata is permitted for SubscribePredicate
487                use itertools::Itertools;
488                let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for SubscribePredicate"))?;
489                peer_state.subscription_handler.subscribe_query(self, query_id, collection, selection, cdata, version, known_matches).await
490            }
491        }
492    }
493
494    async fn handle_update(&self, notification: proto::NodeUpdate) -> anyhow::Result<()> {
495        let Some(_connection) = self.peer_connections.get(&notification.from) else {
496            return Err(anyhow!("Rejected notification from unknown node {}", notification.from));
497        };
498
499        match notification.body {
500            proto::NodeUpdateBody::SubscriptionUpdate { items } => {
501                tracing::info!("Node({}) received subscription update from peer {}", self.id, notification.from);
502                crate::node_applier::NodeApplier::apply_updates(self, &notification.from, items).await?;
503                Ok(())
504            }
505        }
506    }
507
508    pub(crate) async fn relay_to_required_peers(
509        &self,
510        cdata: &PA::ContextData,
511        id: proto::TransactionId,
512        events: &[Attested<proto::Event>],
513    ) -> Result<(), MutationError> {
514        // TODO determine how many durable peers need to respond before we can proceed. The others should continue in the background.
515        // as of this writing, we only have one durable peer, so we can just await the response from "all" of them
516        for peer_id in self.get_durable_peers() {
517            match self.request(peer_id, cdata, proto::NodeRequestBody::CommitTransaction { id: id.clone(), events: events.to_vec() }).await
518            {
519                Ok(proto::NodeResponseBody::CommitComplete { .. }) => (),
520                Ok(proto::NodeResponseBody::Error(e)) => {
521                    return Err(MutationError::General(Box::new(std::io::Error::other(format!("Peer {} rejected: {}", peer_id, e)))));
522                }
523                _ => {
524                    return Err(MutationError::General(Box::new(std::io::Error::other(format!(
525                        "Peer {} returned unexpected response",
526                        peer_id
527                    )))));
528                }
529            }
530        }
531        Ok(())
532    }
533
534    /// Does all the things necessary to commit a remote transaction
535    pub async fn commit_remote_transaction(
536        &self,
537        cdata: &PA::ContextData,
538        id: proto::TransactionId,
539        mut events: Vec<Attested<proto::Event>>,
540    ) -> Result<(), MutationError> {
541        debug!("{self} commiting transaction {id} with {} events", events.len());
542        let mut changes = Vec::new();
543
544        for event in events.iter_mut() {
545            let collection = self.collections.get(&event.payload.collection).await?;
546
547            // When applying an event, we should only look at the local storage for the lineage
548            let retriever = LocalRetriever::new(collection.clone());
549            let entity = self.entities.get_retrieve_or_create(&retriever, &event.payload.collection, &event.payload.entity_id).await?;
550
551            // we have the entity, so we can check access, optionally atteste, and apply/save the event;
552            if let Some(attestation) = self.policy_agent.check_event(self, cdata, &entity, &event.payload)? {
553                event.attestations.push(attestation);
554            }
555
556            if entity.apply_event(&retriever, &event.payload).await? {
557                let state = entity.to_state()?;
558                let entity_state = EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
559                let attestation = self.policy_agent.attest_state(self, &entity_state);
560                let attested = Attested::opt(entity_state, attestation);
561                collection.add_event(event).await?;
562                collection.set_state(attested).await?;
563                changes.push(EntityChange::new(entity.clone(), vec![event.clone()])?);
564            }
565        }
566
567        self.reactor.notify_change(changes).await;
568
569        Ok(())
570    }
571
572    /// Generate EntityDelta for an entity state, using known_matches to decide between StateSnapshot and EventBridge
573    /// Returns None if the entity is in known_matches with equal heads (client already has current state)
574    pub(crate) async fn generate_entity_delta(
575        &self,
576        known_map: &std::collections::HashMap<proto::EntityId, proto::Clock>,
577        entity_state: proto::Attested<proto::EntityState>,
578        storage_collection: &crate::storage::StorageCollectionWrapper,
579    ) -> anyhow::Result<Option<proto::EntityDelta>>
580    where
581        SE: StorageEngine + Send + Sync + 'static,
582        PA: PolicyAgent + Send + Sync + 'static,
583    {
584        // Destructure to take ownership and avoid clones
585        let proto::Attested { payload: proto::EntityState { entity_id, collection, state }, attestations } = entity_state;
586        let current_head = &state.head;
587
588        // Entity is in known_matches - try to optimize the response
589        if let Some(known_head) = known_map.get(&entity_id) {
590            // Case 1: Heads equal → return None (omit entity, client already has current state) ✓
591            if known_head == current_head {
592                return Ok(None);
593            }
594
595            // Case 2: Heads differ → try to build EventBridge (cheaper than full state) ✓
596            match self.collect_event_bridge(storage_collection, known_head, current_head).await {
597                Ok(attested_events) if !attested_events.is_empty() => {
598                    // Convert Attested<Event> to EventFragments (strips entity_id and collection)
599                    let event_fragments: Vec<proto::EventFragment> = attested_events.into_iter().map(|e| e.into()).collect();
600
601                    return Ok(Some(proto::EntityDelta {
602                        entity_id,
603                        collection,
604                        content: proto::DeltaContent::EventBridge { events: event_fragments },
605                    }));
606                }
607                _ => {
608                    // Fall through to StateSnapshot if bridge building failed or returned empty
609                }
610            }
611        }
612
613        // Case 3: Entity not in known_matches OR bridge building failed → send full StateSnapshot ✓
614        let state_fragment = proto::StateFragment { state, attestations };
615        Ok(Some(proto::EntityDelta { entity_id, collection, content: proto::DeltaContent::StateSnapshot { state: state_fragment } }))
616    }
617
618    /// Collect events between known_head and current_head using lineage comparison
619    pub(crate) async fn collect_event_bridge(
620        &self,
621        storage_collection: &crate::storage::StorageCollectionWrapper,
622        known_head: &proto::Clock,
623        current_head: &proto::Clock,
624    ) -> anyhow::Result<Vec<proto::Attested<proto::Event>>>
625    where
626        SE: StorageEngine + Send + Sync + 'static,
627        PA: PolicyAgent + Send + Sync + 'static,
628    {
629        use crate::lineage::{EventAccumulator, Ordering};
630        use crate::retrieval::LocalRetriever;
631
632        let retriever = LocalRetriever::new(storage_collection.clone());
633        let accumulator = EventAccumulator::new(None); // No limit for Phase 1
634        let mut comparison = crate::lineage::Comparison::new_with_accumulator(
635            &retriever,
636            current_head,
637            known_head,
638            100000, // TODO: make budget configurable
639            Some(accumulator),
640        );
641
642        // Run comparison
643        loop {
644            match comparison.step().await? {
645                Some(Ordering::Descends) => {
646                    // Current descends from known - perfect for event bridge
647                    break;
648                }
649                Some(Ordering::Equal) => {
650                    // Heads are equal - no events needed
651                    break;
652                }
653                Some(_) => {
654                    // Other relationships (NotDescends, Incomparable, etc.) - can't build bridge
655                    return Ok(vec![]);
656                }
657                None => {
658                    // Continue stepping
659                }
660            }
661        }
662
663        // Extract accumulated events
664        Ok(comparison.take_accumulated_events().unwrap_or_default())
665    }
666
667    pub fn next_entity_id(&self) -> proto::EntityId { proto::EntityId::new() }
668
669    pub fn context(&self, data: PA::ContextData) -> Result<Context, anyhow::Error> {
670        if !self.system.is_system_ready() {
671            return Err(anyhow!("System is not ready"));
672        }
673        Ok(Context::new(Node::clone(self), data))
674    }
675
676    pub async fn context_async(&self, data: PA::ContextData) -> Context {
677        self.system.wait_system_ready().await;
678        Context::new(Node::clone(self), data)
679    }
680
681    pub(crate) async fn get_from_peer(
682        &self,
683        collection_id: &CollectionId,
684        ids: Vec<proto::EntityId>,
685        cdata: &PA::ContextData,
686    ) -> Result<(), RetrievalError> {
687        let peer_id = self.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
688
689        match self
690            .request(peer_id, cdata, proto::NodeRequestBody::Get { collection: collection_id.clone(), ids })
691            .await
692            .map_err(|e| RetrievalError::Other(format!("{:?}", e)))?
693        {
694            proto::NodeResponseBody::Get(states) => {
695                let collection = self.collections.get(collection_id).await?;
696
697                // do we have the ability to merge states?
698                // because that's what we have to do I think
699                for state in states {
700                    self.policy_agent.validate_received_state(self, &peer_id, &state)?;
701                    collection.set_state(state).await.map_err(|e| RetrievalError::Other(format!("{:?}", e)))?;
702                }
703                Ok(())
704            }
705            proto::NodeResponseBody::Error(e) => {
706                debug!("Error from peer fetch: {}", e);
707                Err(RetrievalError::Other(format!("{:?}", e)))
708            }
709            _ => {
710                debug!("Unexpected response type from peer get");
711                Err(RetrievalError::Other("Unexpected response type".to_string()))
712            }
713        }
714    }
715
716    /// Get a random durable peer node ID
717    pub fn get_durable_peer_random(&self) -> Option<proto::EntityId> {
718        let mut rng = rand::thread_rng();
719        // Convert to Vec since DashSet iterator doesn't support random selection
720        let peers: Vec<_> = self.durable_peers.to_vec();
721        peers.choose(&mut rng).copied()
722    }
723
724    /// Get all durable peer node IDs
725    pub fn get_durable_peers(&self) -> Vec<proto::EntityId> { self.durable_peers.to_vec() }
726}
727
728impl<SE, PA> NodeInner<SE, PA>
729where
730    SE: StorageEngine + Send + Sync + 'static,
731    PA: PolicyAgent + Send + Sync + 'static,
732{
733    pub async fn request_remote_unsubscribe(&self, query_id: proto::QueryId, peers: Vec<proto::EntityId>) -> anyhow::Result<()> {
734        for (peer_id, item) in self.peer_connections.get_list(peers) {
735            if let Some(connection) = item {
736                connection.send_message(proto::NodeMessage::UnsubscribeQuery { from: peer_id, query_id })?;
737            } else {
738                warn!("Peer {} not connected", peer_id);
739            }
740        }
741
742        Ok(())
743    }
744}
745
746impl<SE, PA> Drop for NodeInner<SE, PA>
747where PA: PolicyAgent
748{
749    fn drop(&mut self) {
750        notice_info!("Node({}) dropped", self.id);
751    }
752}
753
754impl<SE, PA> Node<SE, PA>
755where
756    SE: StorageEngine + Send + Sync + 'static,
757    PA: PolicyAgent + Send + Sync + 'static,
758{
759    pub(crate) fn subscribe_remote_query(
760        &self,
761        query_id: proto::QueryId,
762        collection_id: CollectionId,
763        selection: ankql::ast::Selection,
764        cdata: PA::ContextData,
765        version: u32,
766        livequery: crate::livequery::WeakEntityLiveQuery,
767    ) {
768        if let Some(ref relay) = self.subscription_relay {
769            self.predicate_context.insert(query_id, cdata.clone());
770            relay.subscribe_query(query_id, collection_id, selection, cdata, version, livequery);
771        }
772    }
773
774    pub async fn fetch_entities_from_local(
775        &self,
776        collection_id: &CollectionId,
777        selection: &ankql::ast::Selection,
778    ) -> Result<Vec<Entity>, RetrievalError> {
779        let storage_collection = self.collections.get(collection_id).await?;
780        let initial_states = storage_collection.fetch_states(selection).await?;
781        let retriever = crate::retrieval::LocalRetriever::new(storage_collection);
782        let mut entities = Vec::with_capacity(initial_states.len());
783        for state in initial_states {
784            let (_, entity) =
785                self.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
786            entities.push(entity);
787        }
788        Ok(entities)
789    }
790}
791#[async_trait::async_trait]
792pub trait TNodeErased<E: AbstractEntity + Filterable + Send + 'static = Entity>: Send + Sync + 'static {
793    fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId);
794    fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error>;
795    async fn fetch_entities_from_local(
796        &self,
797        collection_id: &CollectionId,
798        selection: &ankql::ast::Selection,
799    ) -> Result<Vec<E>, RetrievalError>;
800    fn reactor(&self) -> &Reactor<E>;
801    fn has_subscription_relay(&self) -> bool;
802}
803
804#[async_trait::async_trait]
805impl<SE, PA> TNodeErased<Entity> for Node<SE, PA>
806where
807    SE: StorageEngine + Send + Sync + 'static,
808    PA: PolicyAgent + Send + Sync + 'static,
809{
810    fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId) {
811        // Clean up subscription context
812        self.predicate_context.remove(&query_id);
813
814        // Notify subscription relay for remote cleanup
815        if let Some(ref relay) = self.subscription_relay {
816            relay.unsubscribe_predicate(query_id);
817        }
818    }
819
820    fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error> {
821        if let Some(ref relay) = self.subscription_relay {
822            relay.update_query(query_id, selection, version)?;
823        }
824        Ok(())
825    }
826
827    async fn fetch_entities_from_local(
828        &self,
829        collection_id: &CollectionId,
830        selection: &ankql::ast::Selection,
831    ) -> Result<Vec<Entity>, RetrievalError> {
832        Node::fetch_entities_from_local(self, collection_id, selection).await
833    }
834
835    fn reactor(&self) -> &Reactor<Entity> { &self.0.reactor }
836
837    fn has_subscription_relay(&self) -> bool { self.subscription_relay.is_some() }
838}
839
840impl<SE, PA> fmt::Display for Node<SE, PA>
841where PA: PolicyAgent
842{
843    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
844        // bold blue, dimmed brackets
845        write!(f, "\x1b[1;34mnode\x1b[2m[\x1b[1;34m{}\x1b[2m]\x1b[0m", self.id.to_base64_short())
846    }
847}