Skip to main content

ankurah_core/
node.rs

1use crate::selection::filter::Filterable;
2use ankurah_proto::{self as proto, Attested, CollectionId, EntityState};
3use anyhow::anyhow;
4
5use rand::prelude::*;
6use std::{
7    fmt,
8    hash::Hash,
9    ops::Deref,
10    sync::{Arc, Weak},
11};
12use tokio::sync::oneshot;
13
14use crate::{
15    action_error, action_info,
16    changes::EntityChange,
17    collectionset::CollectionSet,
18    connector::{PeerSender, SendError},
19    context::Context,
20    entity::{Entity, WeakEntitySet},
21    error::{MutationError, RequestError, RetrievalError},
22    notice_info,
23    peer_subscription::{SubscriptionHandler, SubscriptionRelay},
24    policy::{AccessDenied, PolicyAgent},
25    reactor::{AbstractEntity, Reactor},
26    retrieval::{LocalEventGetter, LocalStateGetter, SuspenseEvents},
27    storage::StorageEngine,
28    system::SystemManager,
29    util::{safemap::SafeMap, safeset::SafeSet, Iterable},
30};
31use itertools::Itertools;
32#[cfg(feature = "instrument")]
33use tracing::instrument;
34
35use tracing::{debug, error, warn};
36
37pub struct PeerState {
38    sender: Box<dyn PeerSender>,
39    _durable: bool,
40    subscription_handler: SubscriptionHandler,
41    pending_requests: SafeMap<proto::RequestId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
42    pending_updates: SafeMap<proto::UpdateId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
43}
44
45impl PeerState {
46    pub fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError> { self.sender.send_message(message) }
47}
48
49pub struct MatchArgs {
50    pub selection: ankql::ast::Selection,
51    pub cached: bool,
52}
53
54impl TryInto<MatchArgs> for &str {
55    type Error = ankql::error::ParseError;
56    fn try_into(self) -> Result<MatchArgs, Self::Error> { Ok(MatchArgs { selection: ankql::parser::parse_selection(self)?, cached: true }) }
57}
58impl TryInto<MatchArgs> for String {
59    type Error = ankql::error::ParseError;
60    fn try_into(self) -> Result<MatchArgs, Self::Error> {
61        Ok(MatchArgs { selection: ankql::parser::parse_selection(&self)?, cached: true })
62    }
63}
64
65impl From<ankql::ast::Predicate> for MatchArgs {
66    fn from(val: ankql::ast::Predicate) -> Self {
67        MatchArgs { selection: ankql::ast::Selection { predicate: val, order_by: None, limit: None }, cached: true }
68    }
69}
70
71impl From<ankql::ast::Selection> for MatchArgs {
72    fn from(val: ankql::ast::Selection) -> Self { MatchArgs { selection: val, cached: true } }
73}
74
75impl From<ankql::error::ParseError> for RetrievalError {
76    fn from(e: ankql::error::ParseError) -> Self { RetrievalError::ParseError(e) }
77}
78
79pub fn nocache<T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError>>(s: T) -> Result<MatchArgs, ankql::error::ParseError> {
80    MatchArgs::nocache(s)
81}
82impl MatchArgs {
83    pub fn nocache<T>(s: T) -> Result<Self, ankql::error::ParseError>
84    where T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError> {
85        Ok(Self { selection: s.try_into()?, cached: false })
86    }
87}
88
89/// A participant in the Ankurah network, and primary place where queries are initiated
90
91pub struct Node<SE, PA>(pub(crate) Arc<NodeInner<SE, PA>>)
92where PA: PolicyAgent;
93impl<SE, PA> Clone for Node<SE, PA>
94where PA: PolicyAgent
95{
96    fn clone(&self) -> Self { Self(self.0.clone()) }
97}
98
99pub struct WeakNode<SE, PA>(Weak<NodeInner<SE, PA>>)
100where PA: PolicyAgent;
101impl<SE, PA> Clone for WeakNode<SE, PA>
102where PA: PolicyAgent
103{
104    fn clone(&self) -> Self { Self(self.0.clone()) }
105}
106
107impl<SE, PA> WeakNode<SE, PA>
108where PA: PolicyAgent
109{
110    pub fn upgrade(&self) -> Option<Node<SE, PA>> { self.0.upgrade().map(Node) }
111}
112
113impl<SE, PA> Deref for Node<SE, PA>
114where PA: PolicyAgent
115{
116    type Target = Arc<NodeInner<SE, PA>>;
117    fn deref(&self) -> &Self::Target { &self.0 }
118}
119
120/// Represents the user session - or whatever other context the PolicyAgent
121/// Needs to perform it's evaluation.
122pub trait ContextData: Send + Sync + Clone + Hash + Eq + 'static {}
123
124pub struct NodeInner<SE, PA>
125where PA: PolicyAgent
126{
127    pub id: proto::EntityId,
128    pub durable: bool,
129    pub collections: CollectionSet<SE>,
130
131    pub(crate) entities: WeakEntitySet,
132    peer_connections: SafeMap<proto::EntityId, Arc<PeerState>>,
133    durable_peers: SafeSet<proto::EntityId>,
134
135    pub(crate) predicate_context: SafeMap<proto::QueryId, PA::ContextData>,
136
137    /// The reactor for handling subscriptions
138    pub(crate) reactor: Reactor,
139    pub(crate) policy_agent: PA,
140    pub system: SystemManager<SE, PA>,
141
142    pub(crate) subscription_relay: Option<SubscriptionRelay<PA::ContextData, crate::livequery::WeakEntityLiveQuery>>,
143
144    /// Type resolver for AST preparation (temporary heuristic until Phase 3 schema)
145    pub(crate) type_resolver: crate::TypeResolver,
146}
147
148impl<SE, PA> Node<SE, PA>
149where
150    SE: StorageEngine + Send + Sync + 'static,
151    PA: PolicyAgent + Send + Sync + 'static,
152{
153    pub fn new(engine: Arc<SE>, policy_agent: PA) -> Self {
154        let collections = CollectionSet::new(engine.clone());
155        let entityset: WeakEntitySet = Default::default();
156        let id = proto::EntityId::new();
157        let reactor = Reactor::new();
158        notice_info!("Node {id:#} created as ephemeral");
159
160        let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), false);
161
162        // Create subscription relay for ephemeral nodes
163        let subscription_relay = Some(SubscriptionRelay::new());
164
165        let node = Node(Arc::new(NodeInner {
166            id,
167            collections,
168            entities: entityset,
169            peer_connections: SafeMap::new(),
170            durable_peers: SafeSet::new(),
171            reactor,
172            durable: false,
173            policy_agent,
174            system: system_manager,
175            predicate_context: SafeMap::new(),
176            subscription_relay,
177            type_resolver: crate::TypeResolver::new(),
178        }));
179
180        // Set up the message sender for the subscription relay
181        if let Some(ref relay) = node.subscription_relay {
182            let weak_node = node.weak();
183            if relay.set_node(Arc::new(weak_node)).is_err() {
184                warn!("Failed to set message sender for subscription relay");
185            }
186        }
187
188        node.policy_agent.on_node_ready(node.weak());
189
190        node
191    }
192    pub fn new_durable(engine: Arc<SE>, policy_agent: PA) -> Self {
193        let collections = CollectionSet::new(engine);
194        let entityset: WeakEntitySet = Default::default();
195        let id = proto::EntityId::new();
196        let reactor = Reactor::new();
197        notice_info!("Node {id:#} created as durable");
198
199        let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), true);
200
201        let node = Node(Arc::new(NodeInner {
202            id,
203            collections,
204            entities: entityset,
205            peer_connections: SafeMap::new(),
206            durable_peers: SafeSet::new(),
207            reactor,
208            durable: true,
209            policy_agent,
210            system: system_manager,
211            predicate_context: SafeMap::new(),
212            subscription_relay: None,
213            type_resolver: crate::TypeResolver::new(),
214        }));
215
216        node.policy_agent.on_node_ready(node.weak());
217
218        node
219    }
220    pub fn weak(&self) -> WeakNode<SE, PA> { WeakNode(Arc::downgrade(&self.0)) }
221
222    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %presence.node_id.to_base64_short(), durable = %presence.durable)))]
223    pub fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
224        action_info!(self, "register_peer", "{}", &presence);
225
226        let subscription_handler = SubscriptionHandler::new(presence.node_id, self);
227        self.peer_connections.insert(
228            presence.node_id,
229            Arc::new(PeerState {
230                sender,
231                _durable: presence.durable,
232                subscription_handler,
233                pending_requests: SafeMap::new(),
234                pending_updates: SafeMap::new(),
235            }),
236        );
237        if presence.durable {
238            self.durable_peers.insert(presence.node_id);
239
240            // Notify subscription relay of new durable peer connection
241            if let Some(ref relay) = self.subscription_relay {
242                relay.notify_peer_connected(presence.node_id);
243            }
244
245            if !self.durable {
246                if let Some(system_root) = presence.system_root {
247                    action_info!(self, "received system root", "{}", &system_root.payload);
248                    let me = self.clone();
249                    crate::task::spawn(async move {
250                        if let Err(e) = me.system.join_system(system_root).await {
251                            action_error!(me, "failed to join system", "{}", &e);
252                        } else {
253                            action_info!(me, "successfully joined system");
254                        }
255                    });
256                } else {
257                    error!("Node({}) durable peer {} has no system root", self.id, presence.node_id);
258                }
259            }
260        }
261        // TODO send hello message to the peer, including present head state for all relevant collections
262    }
263    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %node_id.to_base64_short())))]
264    pub fn deregister_peer(&self, node_id: proto::EntityId) {
265        notice_info!("Node({:#}) deregister_peer {:#}", self.id, node_id);
266
267        self.durable_peers.remove(&node_id);
268        // Get and cleanup subscriptions before removing the peer
269        if let Some(peer_state) = self.peer_connections.remove(&node_id) {
270            action_info!(self, "unsubscribing", "subscription {} for peer {}", peer_state.subscription_handler.subscription_id(), node_id);
271            // ReactorSubscription is automatically unsubscribed on drop
272        }
273
274        // Notify subscription relay of peer disconnection (unconditional - relay handles filtering)
275        if let Some(ref relay) = self.subscription_relay {
276            relay.notify_peer_disconnected(node_id);
277        }
278    }
279    #[cfg_attr(feature = "instrument", instrument(skip_all, fields(node_id = %node_id, request_body = %request_body)))]
280    pub async fn request<'a, C>(
281        &self,
282        node_id: proto::EntityId,
283        cdata: &C,
284        request_body: proto::NodeRequestBody,
285    ) -> Result<proto::NodeResponseBody, RequestError>
286    where
287        C: Iterable<PA::ContextData>,
288    {
289        let (response_tx, response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
290        let request_id = proto::RequestId::new();
291
292        let request = proto::NodeRequest { id: request_id.clone(), to: node_id, from: self.id, body: request_body };
293        let auth = self.policy_agent.sign_request(self, cdata, &request)?;
294
295        // Get the peer connection
296        let connection = self.peer_connections.get(&node_id).ok_or(RequestError::PeerNotConnected)?;
297
298        connection.pending_requests.insert(request_id, response_tx);
299        connection.send_message(proto::NodeMessage::Request { auth, request })?;
300
301        // Wait for response
302        response_rx.await.map_err(|_| RequestError::InternalChannelClosed)?
303    }
304
305    // TODO LATER: rework this to be retried in the background some number of times
306    pub fn send_update(&self, node_id: proto::EntityId, notification: proto::NodeUpdateBody) {
307        // same as request, minus cdata and the sign_request step
308        debug!("{self}.send_update({node_id:#}, {notification})");
309        let (response_tx, _response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
310        let id = proto::UpdateId::new();
311
312        // Get the peer connection
313        let Some(connection) = self.peer_connections.get(&node_id) else {
314            warn!("Failed to send update to peer {}: {}", node_id, RequestError::PeerNotConnected);
315            return;
316        };
317
318        // Store the response channel
319        connection.pending_updates.insert(id.clone(), response_tx);
320
321        let notification = proto::NodeMessage::Update(proto::NodeUpdate { id, from: self.id, to: node_id, body: notification });
322
323        match connection.send_message(notification) {
324            Ok(_) => {}
325            Err(e) => {
326                warn!("Failed to send update to peer {}: {}", node_id, e);
327            }
328        };
329
330        // response_rx.await.map_err(|_| RequestError::InternalChannelClosed)??;
331    }
332
333    // TODO add a node id argument to this function rather than getting it from the message
334    // (does this actually make it more secure? or just move the place they could lie to us to the handshake?)
335    // Not if its signed by a node key.
336    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(message = %message)))]
337    pub async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
338        match message {
339            proto::NodeMessage::Update(update) => {
340                debug!("Node({}) received update {}", self.id, update);
341
342                if let Some(sender) = { self.peer_connections.get(&update.from).map(|c| c.sender.cloned()) } {
343                    let _from = update.from;
344                    let _id = update.id.clone();
345                    if update.to != self.id {
346                        warn!("{} received message from {} but is not the intended recipient", self.id, update.from);
347                        return Ok(());
348                    }
349
350                    // take down the return address
351                    let id = update.id.clone();
352                    let to = update.from;
353                    let from = self.id;
354
355                    // TODO - validate the from node id is the one we're connected to
356                    let body = match self.handle_update(update).await {
357                        Ok(_) => proto::NodeUpdateAckBody::Success,
358                        Err(e) => proto::NodeUpdateAckBody::Error(e.to_string()),
359                    };
360
361                    sender.send_message(proto::NodeMessage::UpdateAck(proto::NodeUpdateAck { id, from, to, body }))?;
362                }
363            }
364            proto::NodeMessage::UpdateAck(ack) => {
365                debug!("Node({}) received ack notification {} {}", self.id, ack.id, ack.body);
366                // let connection = self.peer_connections.get(&ack.from).ok_or(RequestError::PeerNotConnected)?;
367                // if let Some(tx) = connection.pending_updates.remove(&ack.id) {
368                //     tx.send(Ok(proto::NodeResponseBody::Success)).unwrap();
369                // }
370            }
371            proto::NodeMessage::Request { auth, request } => {
372                debug!("Node({}) received request {}", self.id, request);
373                // TODO: Should we spawn a task here and make handle_message synchronous?
374                // I think this depends on how we want to handle timeouts.
375                // I think we want timeouts to be handled by the node, not the connector,
376                // which would lend itself to spawning a task here and making this function synchronous.
377
378                // double check to make sure we have a connection to the peer based on the node id
379                if let Some(sender) = { self.peer_connections.get(&request.from).map(|c| c.sender.cloned()) } {
380                    let from = request.from;
381                    let request_id = request.id.clone();
382                    if request.to != self.id {
383                        warn!("{} received message from {} but is not the intended recipient", self.id, request.from);
384                        return Ok(());
385                    }
386
387                    // Validate the request auth first, converting errors to error responses
388                    let body = match self.policy_agent.check_request(self, &auth, &request).await {
389                        Ok(cdata) => match self.handle_request(&cdata, request).await {
390                            Ok(result) => result,
391                            Err(e) => proto::NodeResponseBody::Error(e.to_string()),
392                        },
393                        Err(e) => proto::NodeResponseBody::Error(e.to_string()),
394                    };
395                    let _result = sender.send_message(proto::NodeMessage::Response(proto::NodeResponse {
396                        request_id,
397                        from: self.id,
398                        to: from,
399                        body,
400                    }));
401                }
402            }
403            proto::NodeMessage::Response(response) => {
404                debug!("Node {} received response {}", self.id, response);
405                let connection = self.peer_connections.get(&response.from).ok_or(RequestError::PeerNotConnected)?;
406                if let Some(tx) = connection.pending_requests.remove(&response.request_id) {
407                    tx.send(Ok(response.body)).map_err(|e| anyhow!("Failed to send response: {:?}", e))?;
408                }
409            }
410            proto::NodeMessage::UnsubscribeQuery { from, query_id } => {
411                // Remove predicate from the peer's subscription
412                if let Some(peer_state) = self.peer_connections.get(&from) {
413                    peer_state.subscription_handler.remove_predicate(query_id)?;
414                }
415            }
416        }
417        Ok(())
418    }
419
420    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(request = %request)))]
421    async fn handle_request<C>(&self, cdata: &C, request: proto::NodeRequest) -> anyhow::Result<proto::NodeResponseBody>
422    where C: Iterable<PA::ContextData> {
423        match request.body {
424            proto::NodeRequestBody::CommitTransaction { id, events } => {
425                // TODO - relay to peers in a gossipy/resource-available manner, so as to improve propagation
426                // With moderate potential for duplication, while not creating message loops
427                // Doing so would be a secondary/tertiary/etc hop for this message
428                let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for CommitTransaction"))?;
429                match self.commit_remote_transaction(cdata, id.clone(), events).await {
430                    Ok(_) => Ok(proto::NodeResponseBody::CommitComplete { id }),
431                    Err(e) => Ok(proto::NodeResponseBody::Error(e.to_string())),
432                }
433            }
434            proto::NodeRequestBody::Fetch { collection, mut selection, known_matches } => {
435                self.policy_agent.can_access_collection(cdata, &collection)?;
436                let storage_collection = self.collections.get(&collection).await?;
437                selection.predicate = self.policy_agent.filter_predicate(cdata, &collection, selection.predicate)?;
438
439                // Expand initial_states to include entities from known_matches that weren't in the predicate results
440                let expanded_states = crate::util::expand_states::expand_states(
441                    storage_collection.fetch_states(&selection).await?,
442                    known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
443                    &storage_collection,
444                )
445                .await?;
446
447                let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
448
449                let mut deltas = Vec::new();
450                for state in expanded_states {
451                    if self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state).is_err() {
452                        continue;
453                    }
454
455                    // Generate delta based on known_matches (returns None if heads are equal)
456                    // No need to reconstruct Entity - work directly with EntityState
457                    if let Some(delta) = self.generate_entity_delta(&known_map, state, &storage_collection, cdata).await? {
458                        deltas.push(delta);
459                    }
460                }
461                Ok(proto::NodeResponseBody::Fetch(deltas))
462            }
463            proto::NodeRequestBody::Get { collection, ids } => {
464                self.policy_agent.can_access_collection(cdata, &collection)?;
465                let storage_collection = self.collections.get(&collection).await?;
466
467                // filter out any that the policy agent says we don't have access to
468                let mut states = Vec::new();
469                for state in storage_collection.get_states(ids).await? {
470                    match self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state) {
471                        Ok(_) => states.push(state),
472                        Err(AccessDenied::ByPolicy(_)) => {}
473                        // TODO: we need to have a cleaner delineation between actual access denied versus processing errors
474                        Err(e) => return Err(anyhow!("Error from peer get: {}", e)),
475                    }
476                }
477
478                Ok(proto::NodeResponseBody::Get(states))
479            }
480            proto::NodeRequestBody::GetEvents { collection, event_ids } => {
481                self.policy_agent.can_access_collection(cdata, &collection)?;
482                let storage_collection = self.collections.get(&collection).await?;
483
484                // filter out any that the policy agent says we don't have access to
485                let mut events = Vec::new();
486                for event in storage_collection.get_events(event_ids).await? {
487                    match self.policy_agent.check_read_event(cdata, &event) {
488                        Ok(_) => events.push(event),
489                        Err(AccessDenied::ByPolicy(_)) => {}
490                        // TODO: we need to have a cleaner delineation between actual access denied versus processing errors
491                        Err(e) => return Err(anyhow!("Error from peer subscription: {}", e)),
492                    }
493                }
494
495                Ok(proto::NodeResponseBody::GetEvents(events))
496            }
497            proto::NodeRequestBody::SubscribeQuery { query_id, collection, selection, version, known_matches } => {
498                let peer_state = self.peer_connections.get(&request.from).ok_or_else(|| anyhow!("Peer {} not connected", request.from))?;
499                // only one cdata is permitted for SubscribePredicate
500                use itertools::Itertools;
501                let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for SubscribePredicate"))?;
502                peer_state.subscription_handler.subscribe_query(self, query_id, collection, selection, cdata, version, known_matches).await
503            }
504        }
505    }
506
507    async fn handle_update(&self, notification: proto::NodeUpdate) -> anyhow::Result<()> {
508        let Some(_connection) = self.peer_connections.get(&notification.from) else {
509            return Err(anyhow!("Rejected notification from unknown node {}", notification.from));
510        };
511
512        match notification.body {
513            proto::NodeUpdateBody::SubscriptionUpdate { items } => {
514                tracing::debug!("Node({}) received subscription update from peer {}", self.id, notification.from);
515                crate::node_applier::NodeApplier::apply_updates(self, &notification.from, items).await?;
516                Ok(())
517            }
518        }
519    }
520
521    pub(crate) async fn relay_to_required_peers(
522        &self,
523        cdata: &PA::ContextData,
524        id: proto::TransactionId,
525        events: &[Attested<proto::Event>],
526    ) -> Result<(), MutationError> {
527        // TODO determine how many durable peers need to respond before we can proceed. The others should continue in the background.
528        // as of this writing, we only have one durable peer, so we can just await the response from "all" of them
529        for peer_id in self.get_durable_peers() {
530            match self.request(peer_id, cdata, proto::NodeRequestBody::CommitTransaction { id: id.clone(), events: events.to_vec() }).await
531            {
532                Ok(proto::NodeResponseBody::CommitComplete { .. }) => (),
533                Ok(proto::NodeResponseBody::Error(e)) => {
534                    return Err(MutationError::General(Box::new(std::io::Error::other(format!("Peer {} rejected: {}", peer_id, e)))));
535                }
536                _ => {
537                    return Err(MutationError::General(Box::new(std::io::Error::other(format!(
538                        "Peer {} returned unexpected response",
539                        peer_id
540                    )))));
541                }
542            }
543        }
544        Ok(())
545    }
546
547    /// Does all the things necessary to commit a remote transaction
548    pub async fn commit_remote_transaction(
549        &self,
550        cdata: &PA::ContextData,
551        id: proto::TransactionId,
552        mut events: Vec<Attested<proto::Event>>,
553    ) -> Result<(), MutationError> {
554        debug!("{self} commiting transaction {id} with {} events", events.len());
555        let mut changes = Vec::new();
556
557        for event in events.iter_mut() {
558            let collection = self.collections.get(&event.payload.collection).await?;
559
560            // When applying an event, we should only look at the local storage for the lineage
561            let event_getter = LocalEventGetter::new(collection.clone(), self.durable);
562            let state_getter = LocalStateGetter::new(collection.clone());
563            let entity = self
564                .entities
565                .get_retrieve_or_create(&state_getter, &event_getter, &event.payload.collection, &event.payload.entity_id)
566                .await?;
567
568            // Stage the event so BFS can discover it
569            event_getter.stage_event(event.payload.clone());
570
571            // Handle creates vs updates differently for policy validation
572            let (entity_before, entity_after, already_applied) = if event.payload.is_entity_create() && entity.head().is_empty() {
573                // Create: apply to entity directly, use as both before/after
574                entity.apply_event(&event_getter, &event.payload).await?;
575                (entity.clone(), entity.clone(), true)
576            } else {
577                // Update: snapshot, apply to fork for validation
578                use std::sync::atomic::AtomicBool;
579                let trx_alive = Arc::new(AtomicBool::new(true));
580                let forked = entity.snapshot(trx_alive);
581                forked.apply_event(&event_getter, &event.payload).await?;
582                (entity.clone(), forked, false)
583            };
584
585            // Check policy with before/after states
586            if let Some(attestation) = self.policy_agent.check_event(self, cdata, &entity_before, &entity_after, &event.payload)? {
587                event.attestations.push(attestation);
588            }
589
590            // Commit the staged event to permanent storage
591            event_getter.commit_event(event).await?;
592
593            // For updates only: apply event to real entity (creates already applied above)
594            // Event is now in storage, so BFS will find it
595            let applied = if already_applied { true } else { entity.apply_event(&event_getter, &event.payload).await? };
596
597            if applied {
598                let state = entity.to_state()?;
599                let entity_state = EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
600                let attestation = self.policy_agent.attest_state(self, &entity_state);
601                let attested = Attested::opt(entity_state, attestation);
602                collection.set_state(attested).await?;
603                changes.push(EntityChange::new(entity.clone(), vec![event.clone()])?);
604            }
605        }
606
607        self.reactor.notify_change(changes).await;
608
609        Ok(())
610    }
611
612    /// Generate EntityDelta for an entity state, using known_matches to decide between StateSnapshot and EventBridge
613    /// Returns None if the entity is in known_matches with equal heads (client already has current state)
614    pub(crate) async fn generate_entity_delta<C>(
615        &self,
616        known_map: &std::collections::HashMap<proto::EntityId, proto::Clock>,
617        entity_state: proto::Attested<proto::EntityState>,
618        storage_collection: &crate::storage::StorageCollectionWrapper,
619        cdata: &C,
620    ) -> anyhow::Result<Option<proto::EntityDelta>>
621    where
622        SE: StorageEngine + Send + Sync + 'static,
623        PA: PolicyAgent + Send + Sync + 'static,
624        C: crate::util::Iterable<PA::ContextData>,
625    {
626        // Destructure to take ownership and avoid clones
627        let proto::Attested { payload: proto::EntityState { entity_id, collection, state }, attestations } = entity_state;
628        let current_head = &state.head;
629
630        // Entity is in known_matches - try to optimize the response
631        if let Some(known_head) = known_map.get(&entity_id) {
632            // Case 1: Heads equal → return None (omit entity, client already has current state) ✓
633            if known_head == current_head {
634                return Ok(None);
635            }
636
637            // Case 2: Heads differ → try to build EventBridge (cheaper than full state) ✓
638            match self.collect_event_bridge(storage_collection, known_head, current_head, cdata).await {
639                Ok(attested_events) if !attested_events.is_empty() => {
640                    // Convert Attested<Event> to EventFragments (strips entity_id and collection)
641                    let event_fragments: Vec<proto::EventFragment> = attested_events.into_iter().map(|e| e.into()).collect();
642
643                    return Ok(Some(proto::EntityDelta {
644                        entity_id,
645                        collection,
646                        content: proto::DeltaContent::EventBridge { events: event_fragments },
647                    }));
648                }
649                _ => {
650                    // Fall through to StateSnapshot if bridge building failed or returned empty
651                }
652            }
653        }
654
655        // Case 3: Entity not in known_matches OR bridge building failed → send full StateSnapshot ✓
656        let state_fragment = proto::StateFragment { state, attestations };
657        Ok(Some(proto::EntityDelta { entity_id, collection, content: proto::DeltaContent::StateSnapshot { state: state_fragment } }))
658    }
659
660    /// Collect events between known_head and current_head using event_dag comparison.
661    /// Returns events needed to advance from known_head to current_head.
662    pub(crate) async fn collect_event_bridge<C>(
663        &self,
664        storage_collection: &crate::storage::StorageCollectionWrapper,
665        known_head: &proto::Clock,
666        current_head: &proto::Clock,
667        cdata: &C,
668    ) -> anyhow::Result<Vec<proto::Attested<proto::Event>>>
669    where
670        SE: StorageEngine + Send + Sync + 'static,
671        PA: PolicyAgent + Send + Sync + 'static,
672        C: crate::util::Iterable<PA::ContextData>,
673    {
674        use crate::event_dag::{compare, AbstractCausalRelation};
675        use crate::retrieval::LocalEventGetter;
676        use std::collections::HashSet;
677
678        let event_getter = LocalEventGetter::new(storage_collection.clone(), self.durable);
679
680        // First check the causal relationship
681        let comparison_result = compare(&event_getter, current_head, known_head, 100000).await?;
682
683        match comparison_result.relation {
684            AbstractCausalRelation::Equal => {
685                // Heads are equal - no events needed
686                Ok(vec![])
687            }
688            AbstractCausalRelation::StrictDescends { chain: _ } => {
689                // Current descends from known - collect events by walking backward
690                // TODO: Optimize with forward chain from relation (GitHub #200)
691                let known_set: HashSet<_> = known_head.as_slice().iter().collect();
692                let mut events = Vec::new();
693                let mut frontier: Vec<proto::EventId> = current_head.as_slice().to_vec();
694                let mut visited: HashSet<proto::EventId> = HashSet::new();
695
696                while !frontier.is_empty() {
697                    let batch = std::mem::take(&mut frontier);
698                    let fetched = storage_collection.get_events(batch).await?;
699
700                    for event in fetched {
701                        let id = event.payload.id();
702                        if visited.insert(id.clone()) && !known_set.contains(&id) {
703                            // Add parents to frontier (walking backward)
704                            for parent in event.payload.parent.as_slice() {
705                                if !visited.contains(parent) && !known_set.contains(parent) {
706                                    frontier.push(parent.clone());
707                                }
708                            }
709                            events.push(event);
710                        }
711                    }
712                }
713
714                // Receivers must not learn events the read policy hides.
715                // One unreadable event makes the whole bridge unusable (a
716                // chain with a hole loses operations downstream), so give up
717                // entirely and let the caller fall back to a state snapshot,
718                // which passes its own read check.
719                for event in &events {
720                    match self.policy_agent.check_read_event(cdata, event) {
721                        Ok(()) => {}
722                        Err(AccessDenied::ByPolicy(_)) => return Ok(vec![]),
723                        Err(e) => return Err(anyhow!("check_read_event failed while building event bridge: {}", e)),
724                    }
725                }
726
727                // Backward BFS discovery order is not a causal order (uneven
728                // branch lengths interleave); sort parents-first so the wire
729                // carries a sane order. Receivers sort again and must not
730                // trust this.
731                let events = crate::event_dag::ordering::topo_sort_events(events)?;
732                Ok(events)
733            }
734            _ => {
735                // Other relationships (NotDescends, Incomparable, etc.) - can't build simple bridge
736                Ok(vec![])
737            }
738        }
739    }
740
741    pub fn next_entity_id(&self) -> proto::EntityId { proto::EntityId::new() }
742
743    pub fn context(&self, data: PA::ContextData) -> Result<Context, anyhow::Error> {
744        if !self.system.is_system_ready() {
745            return Err(anyhow!("System is not ready"));
746        }
747        Ok(Context::new(Node::clone(self), data))
748    }
749
750    pub async fn context_async(&self, data: PA::ContextData) -> Context {
751        self.system.wait_system_ready().await;
752        Context::new(Node::clone(self), data)
753    }
754
755    pub(crate) async fn get_from_peer(
756        &self,
757        collection_id: &CollectionId,
758        ids: Vec<proto::EntityId>,
759        cdata: &PA::ContextData,
760    ) -> Result<(), RetrievalError> {
761        let peer_id = self.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
762
763        match self
764            .request(peer_id, cdata, proto::NodeRequestBody::Get { collection: collection_id.clone(), ids })
765            .await
766            .map_err(|e| RetrievalError::Other(format!("{:?}", e)))?
767        {
768            proto::NodeResponseBody::Get(states) => {
769                let collection = self.collections.get(collection_id).await?;
770
771                // do we have the ability to merge states?
772                // because that's what we have to do I think
773                for state in states {
774                    self.policy_agent.validate_received_state(self, &peer_id, &state)?;
775                    collection.set_state(state).await.map_err(|e| RetrievalError::Other(format!("{:?}", e)))?;
776                }
777                Ok(())
778            }
779            proto::NodeResponseBody::Error(e) => {
780                debug!("Error from peer fetch: {}", e);
781                Err(RetrievalError::Other(format!("{:?}", e)))
782            }
783            _ => {
784                debug!("Unexpected response type from peer get");
785                Err(RetrievalError::Other("Unexpected response type".to_string()))
786            }
787        }
788    }
789
790    /// Get a random durable peer node ID
791    pub fn get_durable_peer_random(&self) -> Option<proto::EntityId> {
792        let mut rng = rand::thread_rng();
793        // Convert to Vec since DashSet iterator doesn't support random selection
794        let peers: Vec<_> = self.durable_peers.to_vec();
795        peers.choose(&mut rng).copied()
796    }
797
798    /// Get all durable peer node IDs
799    pub fn get_durable_peers(&self) -> Vec<proto::EntityId> { self.durable_peers.to_vec() }
800
801    /// TEST ONLY: Create a phantom entity with a specific ID.
802    ///
803    /// This creates an entity that was never properly created via Transaction::create(),
804    /// has no creation event, and has an empty state. Used for adversarial testing to
805    /// verify that commit paths properly reject such phantom entities.
806    ///
807    /// WARNING: This bypasses all normal entity creation validation. Only use in tests.
808    ///
809    /// Requires the `test-helpers` feature to be enabled.
810    #[cfg(feature = "test-helpers")]
811    pub fn conjure_evil_phantom(&self, id: proto::EntityId, collection: proto::CollectionId) -> crate::entity::Entity {
812        self.entities.conjure_evil_phantom(id, collection)
813    }
814}
815
816impl<SE, PA> NodeInner<SE, PA>
817where
818    SE: StorageEngine + Send + Sync + 'static,
819    PA: PolicyAgent + Send + Sync + 'static,
820{
821    pub async fn request_remote_unsubscribe(&self, query_id: proto::QueryId, peers: Vec<proto::EntityId>) -> anyhow::Result<()> {
822        for (peer_id, item) in self.peer_connections.get_list(peers) {
823            if let Some(connection) = item {
824                connection.send_message(proto::NodeMessage::UnsubscribeQuery { from: peer_id, query_id })?;
825            } else {
826                warn!("Peer {} not connected", peer_id);
827            }
828        }
829
830        Ok(())
831    }
832}
833
834impl<SE, PA> Drop for NodeInner<SE, PA>
835where PA: PolicyAgent
836{
837    fn drop(&mut self) {
838        notice_info!("Node({}) dropped", self.id);
839    }
840}
841
842impl<SE, PA> Node<SE, PA>
843where
844    SE: StorageEngine + Send + Sync + 'static,
845    PA: PolicyAgent + Send + Sync + 'static,
846{
847    pub(crate) fn subscribe_remote_query(
848        &self,
849        query_id: proto::QueryId,
850        collection_id: CollectionId,
851        selection: ankql::ast::Selection,
852        cdata: PA::ContextData,
853        version: u32,
854        livequery: crate::livequery::WeakEntityLiveQuery,
855    ) {
856        if let Some(ref relay) = self.subscription_relay {
857            // Resolve types in the AST (converts literals for JSON path comparisons)
858            let selection = self.type_resolver.resolve_selection_types(selection);
859            self.predicate_context.insert(query_id, cdata.clone());
860            relay.subscribe_query(query_id, collection_id, selection, cdata, version, livequery);
861        }
862    }
863
864    pub async fn fetch_entities_from_local(
865        &self,
866        collection_id: &CollectionId,
867        selection: &ankql::ast::Selection,
868    ) -> Result<Vec<Entity>, RetrievalError> {
869        let storage_collection = self.collections.get(collection_id).await?;
870        let initial_states = storage_collection.fetch_states(selection).await?;
871        let state_getter = LocalStateGetter::new(storage_collection.clone());
872        let event_getter = LocalEventGetter::new(storage_collection, self.durable);
873        let mut entities = Vec::with_capacity(initial_states.len());
874        for state in initial_states {
875            let (_, entity) = self
876                .entities
877                .with_state(&state_getter, &event_getter, state.payload.entity_id, collection_id.clone(), state.payload.state)
878                .await?;
879            entities.push(entity);
880        }
881        Ok(entities)
882    }
883}
884#[async_trait::async_trait]
885pub trait TNodeErased<E: AbstractEntity + Filterable + Send + 'static = Entity>: Send + Sync + 'static {
886    fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId);
887    fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error>;
888    async fn fetch_entities_from_local(
889        &self,
890        collection_id: &CollectionId,
891        selection: &ankql::ast::Selection,
892    ) -> Result<Vec<E>, RetrievalError>;
893    fn reactor(&self) -> &Reactor<E>;
894    fn has_subscription_relay(&self) -> bool;
895}
896
897#[async_trait::async_trait]
898impl<SE, PA> TNodeErased<Entity> for Node<SE, PA>
899where
900    SE: StorageEngine + Send + Sync + 'static,
901    PA: PolicyAgent + Send + Sync + 'static,
902{
903    fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId) {
904        // Clean up subscription context
905        self.predicate_context.remove(&query_id);
906
907        // Notify subscription relay for remote cleanup
908        if let Some(ref relay) = self.subscription_relay {
909            relay.unsubscribe_predicate(query_id);
910        }
911    }
912
913    fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error> {
914        if let Some(ref relay) = self.subscription_relay {
915            // Resolve types in the AST (converts literals for JSON path comparisons)
916            let selection = self.type_resolver.resolve_selection_types(selection);
917            relay.update_query(query_id, selection, version)?;
918        }
919        Ok(())
920    }
921
922    async fn fetch_entities_from_local(
923        &self,
924        collection_id: &CollectionId,
925        selection: &ankql::ast::Selection,
926    ) -> Result<Vec<Entity>, RetrievalError> {
927        Node::fetch_entities_from_local(self, collection_id, selection).await
928    }
929
930    fn reactor(&self) -> &Reactor<Entity> { &self.0.reactor }
931
932    fn has_subscription_relay(&self) -> bool { self.subscription_relay.is_some() }
933}
934
935impl<SE, PA> fmt::Display for Node<SE, PA>
936where PA: PolicyAgent
937{
938    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939        // bold blue, dimmed brackets
940        write!(f, "\x1b[1;34mnode\x1b[2m[\x1b[1;34m{}\x1b[2m]\x1b[0m", self.id.to_base64_short())
941    }
942}