1use crate::selection::filter::Filterable;
2use ankurah_proto::{self as proto, Attested, CollectionId, EntityState};
3use anyhow::anyhow;
4
5use rand::prelude::*;
6use std::{
7 fmt,
8 hash::Hash,
9 ops::Deref,
10 sync::{Arc, Weak},
11};
12use tokio::sync::oneshot;
13
14use crate::{
15 action_error, action_info,
16 changes::EntityChange,
17 collectionset::CollectionSet,
18 connector::{PeerSender, SendError},
19 context::Context,
20 entity::{Entity, WeakEntitySet},
21 error::{MutationError, RequestError, RetrievalError},
22 notice_info,
23 peer_subscription::{SubscriptionHandler, SubscriptionRelay},
24 policy::{AccessDenied, PolicyAgent},
25 reactor::{AbstractEntity, Reactor},
26 retrieval::{LocalEventGetter, LocalStateGetter, SuspenseEvents},
27 storage::StorageEngine,
28 system::SystemManager,
29 util::{safemap::SafeMap, safeset::SafeSet, Iterable},
30};
31use itertools::Itertools;
32#[cfg(feature = "instrument")]
33use tracing::instrument;
34
35use tracing::{debug, error, warn};
36
37pub struct PeerState {
38 sender: Box<dyn PeerSender>,
39 _durable: bool,
40 subscription_handler: SubscriptionHandler,
41 pending_requests: SafeMap<proto::RequestId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
42 pending_updates: SafeMap<proto::UpdateId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
43}
44
45impl PeerState {
46 pub fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError> { self.sender.send_message(message) }
47}
48
49pub struct MatchArgs {
50 pub selection: ankql::ast::Selection,
51 pub cached: bool,
52}
53
54impl TryInto<MatchArgs> for &str {
55 type Error = ankql::error::ParseError;
56 fn try_into(self) -> Result<MatchArgs, Self::Error> { Ok(MatchArgs { selection: ankql::parser::parse_selection(self)?, cached: true }) }
57}
58impl TryInto<MatchArgs> for String {
59 type Error = ankql::error::ParseError;
60 fn try_into(self) -> Result<MatchArgs, Self::Error> {
61 Ok(MatchArgs { selection: ankql::parser::parse_selection(&self)?, cached: true })
62 }
63}
64
65impl From<ankql::ast::Predicate> for MatchArgs {
66 fn from(val: ankql::ast::Predicate) -> Self {
67 MatchArgs { selection: ankql::ast::Selection { predicate: val, order_by: None, limit: None }, cached: true }
68 }
69}
70
71impl From<ankql::ast::Selection> for MatchArgs {
72 fn from(val: ankql::ast::Selection) -> Self { MatchArgs { selection: val, cached: true } }
73}
74
75impl From<ankql::error::ParseError> for RetrievalError {
76 fn from(e: ankql::error::ParseError) -> Self { RetrievalError::ParseError(e) }
77}
78
79pub fn nocache<T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError>>(s: T) -> Result<MatchArgs, ankql::error::ParseError> {
80 MatchArgs::nocache(s)
81}
82impl MatchArgs {
83 pub fn nocache<T>(s: T) -> Result<Self, ankql::error::ParseError>
84 where T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError> {
85 Ok(Self { selection: s.try_into()?, cached: false })
86 }
87}
88
89pub struct Node<SE, PA>(pub(crate) Arc<NodeInner<SE, PA>>)
92where PA: PolicyAgent;
93impl<SE, PA> Clone for Node<SE, PA>
94where PA: PolicyAgent
95{
96 fn clone(&self) -> Self { Self(self.0.clone()) }
97}
98
99pub struct WeakNode<SE, PA>(Weak<NodeInner<SE, PA>>)
100where PA: PolicyAgent;
101impl<SE, PA> Clone for WeakNode<SE, PA>
102where PA: PolicyAgent
103{
104 fn clone(&self) -> Self { Self(self.0.clone()) }
105}
106
107impl<SE, PA> WeakNode<SE, PA>
108where PA: PolicyAgent
109{
110 pub fn upgrade(&self) -> Option<Node<SE, PA>> { self.0.upgrade().map(Node) }
111}
112
113impl<SE, PA> Deref for Node<SE, PA>
114where PA: PolicyAgent
115{
116 type Target = Arc<NodeInner<SE, PA>>;
117 fn deref(&self) -> &Self::Target { &self.0 }
118}
119
120pub trait ContextData: Send + Sync + Clone + Hash + Eq + 'static {}
123
124pub struct NodeInner<SE, PA>
125where PA: PolicyAgent
126{
127 pub id: proto::EntityId,
128 pub durable: bool,
129 pub collections: CollectionSet<SE>,
130
131 pub(crate) entities: WeakEntitySet,
132 peer_connections: SafeMap<proto::EntityId, Arc<PeerState>>,
133 durable_peers: SafeSet<proto::EntityId>,
134
135 pub(crate) predicate_context: SafeMap<proto::QueryId, PA::ContextData>,
136
137 pub(crate) reactor: Reactor,
139 pub(crate) policy_agent: PA,
140 pub system: SystemManager<SE, PA>,
141
142 pub(crate) subscription_relay: Option<SubscriptionRelay<PA::ContextData, crate::livequery::WeakEntityLiveQuery>>,
143
144 pub(crate) type_resolver: crate::TypeResolver,
146}
147
148impl<SE, PA> Node<SE, PA>
149where
150 SE: StorageEngine + Send + Sync + 'static,
151 PA: PolicyAgent + Send + Sync + 'static,
152{
153 pub fn new(engine: Arc<SE>, policy_agent: PA) -> Self {
154 let collections = CollectionSet::new(engine.clone());
155 let entityset: WeakEntitySet = Default::default();
156 let id = proto::EntityId::new();
157 let reactor = Reactor::new();
158 notice_info!("Node {id:#} created as ephemeral");
159
160 let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), false);
161
162 let subscription_relay = Some(SubscriptionRelay::new());
164
165 let node = Node(Arc::new(NodeInner {
166 id,
167 collections,
168 entities: entityset,
169 peer_connections: SafeMap::new(),
170 durable_peers: SafeSet::new(),
171 reactor,
172 durable: false,
173 policy_agent,
174 system: system_manager,
175 predicate_context: SafeMap::new(),
176 subscription_relay,
177 type_resolver: crate::TypeResolver::new(),
178 }));
179
180 if let Some(ref relay) = node.subscription_relay {
182 let weak_node = node.weak();
183 if relay.set_node(Arc::new(weak_node)).is_err() {
184 warn!("Failed to set message sender for subscription relay");
185 }
186 }
187
188 node.policy_agent.on_node_ready(node.weak());
189
190 node
191 }
192 pub fn new_durable(engine: Arc<SE>, policy_agent: PA) -> Self {
193 let collections = CollectionSet::new(engine);
194 let entityset: WeakEntitySet = Default::default();
195 let id = proto::EntityId::new();
196 let reactor = Reactor::new();
197 notice_info!("Node {id:#} created as durable");
198
199 let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), true);
200
201 let node = Node(Arc::new(NodeInner {
202 id,
203 collections,
204 entities: entityset,
205 peer_connections: SafeMap::new(),
206 durable_peers: SafeSet::new(),
207 reactor,
208 durable: true,
209 policy_agent,
210 system: system_manager,
211 predicate_context: SafeMap::new(),
212 subscription_relay: None,
213 type_resolver: crate::TypeResolver::new(),
214 }));
215
216 node.policy_agent.on_node_ready(node.weak());
217
218 node
219 }
220 pub fn weak(&self) -> WeakNode<SE, PA> { WeakNode(Arc::downgrade(&self.0)) }
221
222 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %presence.node_id.to_base64_short(), durable = %presence.durable)))]
223 pub fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
224 action_info!(self, "register_peer", "{}", &presence);
225
226 let subscription_handler = SubscriptionHandler::new(presence.node_id, self);
227 self.peer_connections.insert(
228 presence.node_id,
229 Arc::new(PeerState {
230 sender,
231 _durable: presence.durable,
232 subscription_handler,
233 pending_requests: SafeMap::new(),
234 pending_updates: SafeMap::new(),
235 }),
236 );
237 if presence.durable {
238 self.durable_peers.insert(presence.node_id);
239
240 if let Some(ref relay) = self.subscription_relay {
242 relay.notify_peer_connected(presence.node_id);
243 }
244
245 if !self.durable {
246 if let Some(system_root) = presence.system_root {
247 action_info!(self, "received system root", "{}", &system_root.payload);
248 let me = self.clone();
249 crate::task::spawn(async move {
250 if let Err(e) = me.system.join_system(system_root).await {
251 action_error!(me, "failed to join system", "{}", &e);
252 } else {
253 action_info!(me, "successfully joined system");
254 }
255 });
256 } else {
257 error!("Node({}) durable peer {} has no system root", self.id, presence.node_id);
258 }
259 }
260 }
261 }
263 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %node_id.to_base64_short())))]
264 pub fn deregister_peer(&self, node_id: proto::EntityId) {
265 notice_info!("Node({:#}) deregister_peer {:#}", self.id, node_id);
266
267 self.durable_peers.remove(&node_id);
268 if let Some(peer_state) = self.peer_connections.remove(&node_id) {
270 action_info!(self, "unsubscribing", "subscription {} for peer {}", peer_state.subscription_handler.subscription_id(), node_id);
271 }
273
274 if let Some(ref relay) = self.subscription_relay {
276 relay.notify_peer_disconnected(node_id);
277 }
278 }
279 #[cfg_attr(feature = "instrument", instrument(skip_all, fields(node_id = %node_id, request_body = %request_body)))]
280 pub async fn request<'a, C>(
281 &self,
282 node_id: proto::EntityId,
283 cdata: &C,
284 request_body: proto::NodeRequestBody,
285 ) -> Result<proto::NodeResponseBody, RequestError>
286 where
287 C: Iterable<PA::ContextData>,
288 {
289 let (response_tx, response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
290 let request_id = proto::RequestId::new();
291
292 let request = proto::NodeRequest { id: request_id.clone(), to: node_id, from: self.id, body: request_body };
293 let auth = self.policy_agent.sign_request(self, cdata, &request)?;
294
295 let connection = self.peer_connections.get(&node_id).ok_or(RequestError::PeerNotConnected)?;
297
298 connection.pending_requests.insert(request_id, response_tx);
299 connection.send_message(proto::NodeMessage::Request { auth, request })?;
300
301 response_rx.await.map_err(|_| RequestError::InternalChannelClosed)?
303 }
304
305 pub fn send_update(&self, node_id: proto::EntityId, notification: proto::NodeUpdateBody) {
307 debug!("{self}.send_update({node_id:#}, {notification})");
309 let (response_tx, _response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
310 let id = proto::UpdateId::new();
311
312 let Some(connection) = self.peer_connections.get(&node_id) else {
314 warn!("Failed to send update to peer {}: {}", node_id, RequestError::PeerNotConnected);
315 return;
316 };
317
318 connection.pending_updates.insert(id.clone(), response_tx);
320
321 let notification = proto::NodeMessage::Update(proto::NodeUpdate { id, from: self.id, to: node_id, body: notification });
322
323 match connection.send_message(notification) {
324 Ok(_) => {}
325 Err(e) => {
326 warn!("Failed to send update to peer {}: {}", node_id, e);
327 }
328 };
329
330 }
332
333 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(message = %message)))]
337 pub async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
338 match message {
339 proto::NodeMessage::Update(update) => {
340 debug!("Node({}) received update {}", self.id, update);
341
342 if let Some(sender) = { self.peer_connections.get(&update.from).map(|c| c.sender.cloned()) } {
343 let _from = update.from;
344 let _id = update.id.clone();
345 if update.to != self.id {
346 warn!("{} received message from {} but is not the intended recipient", self.id, update.from);
347 return Ok(());
348 }
349
350 let id = update.id.clone();
352 let to = update.from;
353 let from = self.id;
354
355 let body = match self.handle_update(update).await {
357 Ok(_) => proto::NodeUpdateAckBody::Success,
358 Err(e) => proto::NodeUpdateAckBody::Error(e.to_string()),
359 };
360
361 sender.send_message(proto::NodeMessage::UpdateAck(proto::NodeUpdateAck { id, from, to, body }))?;
362 }
363 }
364 proto::NodeMessage::UpdateAck(ack) => {
365 debug!("Node({}) received ack notification {} {}", self.id, ack.id, ack.body);
366 }
371 proto::NodeMessage::Request { auth, request } => {
372 debug!("Node({}) received request {}", self.id, request);
373 if let Some(sender) = { self.peer_connections.get(&request.from).map(|c| c.sender.cloned()) } {
380 let from = request.from;
381 let request_id = request.id.clone();
382 if request.to != self.id {
383 warn!("{} received message from {} but is not the intended recipient", self.id, request.from);
384 return Ok(());
385 }
386
387 let body = match self.policy_agent.check_request(self, &auth, &request).await {
389 Ok(cdata) => match self.handle_request(&cdata, request).await {
390 Ok(result) => result,
391 Err(e) => proto::NodeResponseBody::Error(e.to_string()),
392 },
393 Err(e) => proto::NodeResponseBody::Error(e.to_string()),
394 };
395 let _result = sender.send_message(proto::NodeMessage::Response(proto::NodeResponse {
396 request_id,
397 from: self.id,
398 to: from,
399 body,
400 }));
401 }
402 }
403 proto::NodeMessage::Response(response) => {
404 debug!("Node {} received response {}", self.id, response);
405 let connection = self.peer_connections.get(&response.from).ok_or(RequestError::PeerNotConnected)?;
406 if let Some(tx) = connection.pending_requests.remove(&response.request_id) {
407 tx.send(Ok(response.body)).map_err(|e| anyhow!("Failed to send response: {:?}", e))?;
408 }
409 }
410 proto::NodeMessage::UnsubscribeQuery { from, query_id } => {
411 if let Some(peer_state) = self.peer_connections.get(&from) {
413 peer_state.subscription_handler.remove_predicate(query_id)?;
414 }
415 }
416 }
417 Ok(())
418 }
419
420 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(request = %request)))]
421 async fn handle_request<C>(&self, cdata: &C, request: proto::NodeRequest) -> anyhow::Result<proto::NodeResponseBody>
422 where C: Iterable<PA::ContextData> {
423 match request.body {
424 proto::NodeRequestBody::CommitTransaction { id, events } => {
425 let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for CommitTransaction"))?;
429 match self.commit_remote_transaction(cdata, id.clone(), events).await {
430 Ok(_) => Ok(proto::NodeResponseBody::CommitComplete { id }),
431 Err(e) => Ok(proto::NodeResponseBody::Error(e.to_string())),
432 }
433 }
434 proto::NodeRequestBody::Fetch { collection, mut selection, known_matches } => {
435 self.policy_agent.can_access_collection(cdata, &collection)?;
436 let storage_collection = self.collections.get(&collection).await?;
437 selection.predicate = self.policy_agent.filter_predicate(cdata, &collection, selection.predicate)?;
438
439 let expanded_states = crate::util::expand_states::expand_states(
441 storage_collection.fetch_states(&selection).await?,
442 known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
443 &storage_collection,
444 )
445 .await?;
446
447 let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
448
449 let mut deltas = Vec::new();
450 for state in expanded_states {
451 if self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state).is_err() {
452 continue;
453 }
454
455 if let Some(delta) = self.generate_entity_delta(&known_map, state, &storage_collection, cdata).await? {
458 deltas.push(delta);
459 }
460 }
461 Ok(proto::NodeResponseBody::Fetch(deltas))
462 }
463 proto::NodeRequestBody::Get { collection, ids } => {
464 self.policy_agent.can_access_collection(cdata, &collection)?;
465 let storage_collection = self.collections.get(&collection).await?;
466
467 let mut states = Vec::new();
469 for state in storage_collection.get_states(ids).await? {
470 match self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state) {
471 Ok(_) => states.push(state),
472 Err(AccessDenied::ByPolicy(_)) => {}
473 Err(e) => return Err(anyhow!("Error from peer get: {}", e)),
475 }
476 }
477
478 Ok(proto::NodeResponseBody::Get(states))
479 }
480 proto::NodeRequestBody::GetEvents { collection, event_ids } => {
481 self.policy_agent.can_access_collection(cdata, &collection)?;
482 let storage_collection = self.collections.get(&collection).await?;
483
484 let mut events = Vec::new();
486 for event in storage_collection.get_events(event_ids).await? {
487 match self.policy_agent.check_read_event(cdata, &event) {
488 Ok(_) => events.push(event),
489 Err(AccessDenied::ByPolicy(_)) => {}
490 Err(e) => return Err(anyhow!("Error from peer subscription: {}", e)),
492 }
493 }
494
495 Ok(proto::NodeResponseBody::GetEvents(events))
496 }
497 proto::NodeRequestBody::SubscribeQuery { query_id, collection, selection, version, known_matches } => {
498 let peer_state = self.peer_connections.get(&request.from).ok_or_else(|| anyhow!("Peer {} not connected", request.from))?;
499 use itertools::Itertools;
501 let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for SubscribePredicate"))?;
502 peer_state.subscription_handler.subscribe_query(self, query_id, collection, selection, cdata, version, known_matches).await
503 }
504 }
505 }
506
507 async fn handle_update(&self, notification: proto::NodeUpdate) -> anyhow::Result<()> {
508 let Some(_connection) = self.peer_connections.get(¬ification.from) else {
509 return Err(anyhow!("Rejected notification from unknown node {}", notification.from));
510 };
511
512 match notification.body {
513 proto::NodeUpdateBody::SubscriptionUpdate { items } => {
514 tracing::debug!("Node({}) received subscription update from peer {}", self.id, notification.from);
515 crate::node_applier::NodeApplier::apply_updates(self, ¬ification.from, items).await?;
516 Ok(())
517 }
518 }
519 }
520
521 pub(crate) async fn relay_to_required_peers(
522 &self,
523 cdata: &PA::ContextData,
524 id: proto::TransactionId,
525 events: &[Attested<proto::Event>],
526 ) -> Result<(), MutationError> {
527 for peer_id in self.get_durable_peers() {
530 match self.request(peer_id, cdata, proto::NodeRequestBody::CommitTransaction { id: id.clone(), events: events.to_vec() }).await
531 {
532 Ok(proto::NodeResponseBody::CommitComplete { .. }) => (),
533 Ok(proto::NodeResponseBody::Error(e)) => {
534 return Err(MutationError::General(Box::new(std::io::Error::other(format!("Peer {} rejected: {}", peer_id, e)))));
535 }
536 _ => {
537 return Err(MutationError::General(Box::new(std::io::Error::other(format!(
538 "Peer {} returned unexpected response",
539 peer_id
540 )))));
541 }
542 }
543 }
544 Ok(())
545 }
546
547 pub async fn commit_remote_transaction(
549 &self,
550 cdata: &PA::ContextData,
551 id: proto::TransactionId,
552 mut events: Vec<Attested<proto::Event>>,
553 ) -> Result<(), MutationError> {
554 debug!("{self} commiting transaction {id} with {} events", events.len());
555 let mut changes = Vec::new();
556
557 for event in events.iter_mut() {
558 let collection = self.collections.get(&event.payload.collection).await?;
559
560 let event_getter = LocalEventGetter::new(collection.clone(), self.durable);
562 let state_getter = LocalStateGetter::new(collection.clone());
563 let entity = self
564 .entities
565 .get_retrieve_or_create(&state_getter, &event_getter, &event.payload.collection, &event.payload.entity_id)
566 .await?;
567
568 event_getter.stage_event(event.payload.clone());
570
571 let (entity_before, entity_after, already_applied) = if event.payload.is_entity_create() && entity.head().is_empty() {
573 entity.apply_event(&event_getter, &event.payload).await?;
575 (entity.clone(), entity.clone(), true)
576 } else {
577 use std::sync::atomic::AtomicBool;
579 let trx_alive = Arc::new(AtomicBool::new(true));
580 let forked = entity.snapshot(trx_alive);
581 forked.apply_event(&event_getter, &event.payload).await?;
582 (entity.clone(), forked, false)
583 };
584
585 if let Some(attestation) = self.policy_agent.check_event(self, cdata, &entity_before, &entity_after, &event.payload)? {
587 event.attestations.push(attestation);
588 }
589
590 event_getter.commit_event(event).await?;
592
593 let applied = if already_applied { true } else { entity.apply_event(&event_getter, &event.payload).await? };
596
597 if applied {
598 let state = entity.to_state()?;
599 let entity_state = EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
600 let attestation = self.policy_agent.attest_state(self, &entity_state);
601 let attested = Attested::opt(entity_state, attestation);
602 collection.set_state(attested).await?;
603 changes.push(EntityChange::new(entity.clone(), vec![event.clone()])?);
604 }
605 }
606
607 self.reactor.notify_change(changes).await;
608
609 Ok(())
610 }
611
612 pub(crate) async fn generate_entity_delta<C>(
615 &self,
616 known_map: &std::collections::HashMap<proto::EntityId, proto::Clock>,
617 entity_state: proto::Attested<proto::EntityState>,
618 storage_collection: &crate::storage::StorageCollectionWrapper,
619 cdata: &C,
620 ) -> anyhow::Result<Option<proto::EntityDelta>>
621 where
622 SE: StorageEngine + Send + Sync + 'static,
623 PA: PolicyAgent + Send + Sync + 'static,
624 C: crate::util::Iterable<PA::ContextData>,
625 {
626 let proto::Attested { payload: proto::EntityState { entity_id, collection, state }, attestations } = entity_state;
628 let current_head = &state.head;
629
630 if let Some(known_head) = known_map.get(&entity_id) {
632 if known_head == current_head {
634 return Ok(None);
635 }
636
637 match self.collect_event_bridge(storage_collection, known_head, current_head, cdata).await {
639 Ok(attested_events) if !attested_events.is_empty() => {
640 let event_fragments: Vec<proto::EventFragment> = attested_events.into_iter().map(|e| e.into()).collect();
642
643 return Ok(Some(proto::EntityDelta {
644 entity_id,
645 collection,
646 content: proto::DeltaContent::EventBridge { events: event_fragments },
647 }));
648 }
649 _ => {
650 }
652 }
653 }
654
655 let state_fragment = proto::StateFragment { state, attestations };
657 Ok(Some(proto::EntityDelta { entity_id, collection, content: proto::DeltaContent::StateSnapshot { state: state_fragment } }))
658 }
659
660 pub(crate) async fn collect_event_bridge<C>(
663 &self,
664 storage_collection: &crate::storage::StorageCollectionWrapper,
665 known_head: &proto::Clock,
666 current_head: &proto::Clock,
667 cdata: &C,
668 ) -> anyhow::Result<Vec<proto::Attested<proto::Event>>>
669 where
670 SE: StorageEngine + Send + Sync + 'static,
671 PA: PolicyAgent + Send + Sync + 'static,
672 C: crate::util::Iterable<PA::ContextData>,
673 {
674 use crate::event_dag::{compare, AbstractCausalRelation};
675 use crate::retrieval::LocalEventGetter;
676 use std::collections::HashSet;
677
678 let event_getter = LocalEventGetter::new(storage_collection.clone(), self.durable);
679
680 let comparison_result = compare(&event_getter, current_head, known_head, 100000).await?;
682
683 match comparison_result.relation {
684 AbstractCausalRelation::Equal => {
685 Ok(vec![])
687 }
688 AbstractCausalRelation::StrictDescends { chain: _ } => {
689 let known_set: HashSet<_> = known_head.as_slice().iter().collect();
692 let mut events = Vec::new();
693 let mut frontier: Vec<proto::EventId> = current_head.as_slice().to_vec();
694 let mut visited: HashSet<proto::EventId> = HashSet::new();
695
696 while !frontier.is_empty() {
697 let batch = std::mem::take(&mut frontier);
698 let fetched = storage_collection.get_events(batch).await?;
699
700 for event in fetched {
701 let id = event.payload.id();
702 if visited.insert(id.clone()) && !known_set.contains(&id) {
703 for parent in event.payload.parent.as_slice() {
705 if !visited.contains(parent) && !known_set.contains(parent) {
706 frontier.push(parent.clone());
707 }
708 }
709 events.push(event);
710 }
711 }
712 }
713
714 for event in &events {
720 match self.policy_agent.check_read_event(cdata, event) {
721 Ok(()) => {}
722 Err(AccessDenied::ByPolicy(_)) => return Ok(vec![]),
723 Err(e) => return Err(anyhow!("check_read_event failed while building event bridge: {}", e)),
724 }
725 }
726
727 let events = crate::event_dag::ordering::topo_sort_events(events)?;
732 Ok(events)
733 }
734 _ => {
735 Ok(vec![])
737 }
738 }
739 }
740
741 pub fn next_entity_id(&self) -> proto::EntityId { proto::EntityId::new() }
742
743 pub fn context(&self, data: PA::ContextData) -> Result<Context, anyhow::Error> {
744 if !self.system.is_system_ready() {
745 return Err(anyhow!("System is not ready"));
746 }
747 Ok(Context::new(Node::clone(self), data))
748 }
749
750 pub async fn context_async(&self, data: PA::ContextData) -> Context {
751 self.system.wait_system_ready().await;
752 Context::new(Node::clone(self), data)
753 }
754
755 pub(crate) async fn get_from_peer(
756 &self,
757 collection_id: &CollectionId,
758 ids: Vec<proto::EntityId>,
759 cdata: &PA::ContextData,
760 ) -> Result<(), RetrievalError> {
761 let peer_id = self.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
762
763 match self
764 .request(peer_id, cdata, proto::NodeRequestBody::Get { collection: collection_id.clone(), ids })
765 .await
766 .map_err(|e| RetrievalError::Other(format!("{:?}", e)))?
767 {
768 proto::NodeResponseBody::Get(states) => {
769 let collection = self.collections.get(collection_id).await?;
770
771 for state in states {
774 self.policy_agent.validate_received_state(self, &peer_id, &state)?;
775 collection.set_state(state).await.map_err(|e| RetrievalError::Other(format!("{:?}", e)))?;
776 }
777 Ok(())
778 }
779 proto::NodeResponseBody::Error(e) => {
780 debug!("Error from peer fetch: {}", e);
781 Err(RetrievalError::Other(format!("{:?}", e)))
782 }
783 _ => {
784 debug!("Unexpected response type from peer get");
785 Err(RetrievalError::Other("Unexpected response type".to_string()))
786 }
787 }
788 }
789
790 pub fn get_durable_peer_random(&self) -> Option<proto::EntityId> {
792 let mut rng = rand::thread_rng();
793 let peers: Vec<_> = self.durable_peers.to_vec();
795 peers.choose(&mut rng).copied()
796 }
797
798 pub fn get_durable_peers(&self) -> Vec<proto::EntityId> { self.durable_peers.to_vec() }
800
801 #[cfg(feature = "test-helpers")]
811 pub fn conjure_evil_phantom(&self, id: proto::EntityId, collection: proto::CollectionId) -> crate::entity::Entity {
812 self.entities.conjure_evil_phantom(id, collection)
813 }
814}
815
816impl<SE, PA> NodeInner<SE, PA>
817where
818 SE: StorageEngine + Send + Sync + 'static,
819 PA: PolicyAgent + Send + Sync + 'static,
820{
821 pub async fn request_remote_unsubscribe(&self, query_id: proto::QueryId, peers: Vec<proto::EntityId>) -> anyhow::Result<()> {
822 for (peer_id, item) in self.peer_connections.get_list(peers) {
823 if let Some(connection) = item {
824 connection.send_message(proto::NodeMessage::UnsubscribeQuery { from: peer_id, query_id })?;
825 } else {
826 warn!("Peer {} not connected", peer_id);
827 }
828 }
829
830 Ok(())
831 }
832}
833
834impl<SE, PA> Drop for NodeInner<SE, PA>
835where PA: PolicyAgent
836{
837 fn drop(&mut self) {
838 notice_info!("Node({}) dropped", self.id);
839 }
840}
841
842impl<SE, PA> Node<SE, PA>
843where
844 SE: StorageEngine + Send + Sync + 'static,
845 PA: PolicyAgent + Send + Sync + 'static,
846{
847 pub(crate) fn subscribe_remote_query(
848 &self,
849 query_id: proto::QueryId,
850 collection_id: CollectionId,
851 selection: ankql::ast::Selection,
852 cdata: PA::ContextData,
853 version: u32,
854 livequery: crate::livequery::WeakEntityLiveQuery,
855 ) {
856 if let Some(ref relay) = self.subscription_relay {
857 let selection = self.type_resolver.resolve_selection_types(selection);
859 self.predicate_context.insert(query_id, cdata.clone());
860 relay.subscribe_query(query_id, collection_id, selection, cdata, version, livequery);
861 }
862 }
863
864 pub async fn fetch_entities_from_local(
865 &self,
866 collection_id: &CollectionId,
867 selection: &ankql::ast::Selection,
868 ) -> Result<Vec<Entity>, RetrievalError> {
869 let storage_collection = self.collections.get(collection_id).await?;
870 let initial_states = storage_collection.fetch_states(selection).await?;
871 let state_getter = LocalStateGetter::new(storage_collection.clone());
872 let event_getter = LocalEventGetter::new(storage_collection, self.durable);
873 let mut entities = Vec::with_capacity(initial_states.len());
874 for state in initial_states {
875 let (_, entity) = self
876 .entities
877 .with_state(&state_getter, &event_getter, state.payload.entity_id, collection_id.clone(), state.payload.state)
878 .await?;
879 entities.push(entity);
880 }
881 Ok(entities)
882 }
883}
884#[async_trait::async_trait]
885pub trait TNodeErased<E: AbstractEntity + Filterable + Send + 'static = Entity>: Send + Sync + 'static {
886 fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId);
887 fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error>;
888 async fn fetch_entities_from_local(
889 &self,
890 collection_id: &CollectionId,
891 selection: &ankql::ast::Selection,
892 ) -> Result<Vec<E>, RetrievalError>;
893 fn reactor(&self) -> &Reactor<E>;
894 fn has_subscription_relay(&self) -> bool;
895}
896
897#[async_trait::async_trait]
898impl<SE, PA> TNodeErased<Entity> for Node<SE, PA>
899where
900 SE: StorageEngine + Send + Sync + 'static,
901 PA: PolicyAgent + Send + Sync + 'static,
902{
903 fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId) {
904 self.predicate_context.remove(&query_id);
906
907 if let Some(ref relay) = self.subscription_relay {
909 relay.unsubscribe_predicate(query_id);
910 }
911 }
912
913 fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error> {
914 if let Some(ref relay) = self.subscription_relay {
915 let selection = self.type_resolver.resolve_selection_types(selection);
917 relay.update_query(query_id, selection, version)?;
918 }
919 Ok(())
920 }
921
922 async fn fetch_entities_from_local(
923 &self,
924 collection_id: &CollectionId,
925 selection: &ankql::ast::Selection,
926 ) -> Result<Vec<Entity>, RetrievalError> {
927 Node::fetch_entities_from_local(self, collection_id, selection).await
928 }
929
930 fn reactor(&self) -> &Reactor<Entity> { &self.0.reactor }
931
932 fn has_subscription_relay(&self) -> bool { self.subscription_relay.is_some() }
933}
934
935impl<SE, PA> fmt::Display for Node<SE, PA>
936where PA: PolicyAgent
937{
938 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939 write!(f, "\x1b[1;34mnode\x1b[2m[\x1b[1;34m{}\x1b[2m]\x1b[0m", self.id.to_base64_short())
941 }
942}