1use ankql::selection::filter::Filterable;
2use ankurah_proto::{self as proto, Attested, CollectionId, EntityState};
3use anyhow::anyhow;
4
5use rand::prelude::*;
6use std::{
7 fmt,
8 hash::Hash,
9 ops::Deref,
10 sync::{Arc, Weak},
11};
12use tokio::sync::oneshot;
13
14use crate::{
15 action_error, action_info,
16 changes::EntityChange,
17 collectionset::CollectionSet,
18 connector::{PeerSender, SendError},
19 context::Context,
20 entity::{Entity, WeakEntitySet},
21 error::{MutationError, RequestError, RetrievalError},
22 notice_info,
23 peer_subscription::{SubscriptionHandler, SubscriptionRelay},
24 policy::{AccessDenied, PolicyAgent},
25 reactor::{AbstractEntity, Reactor},
26 retrieval::LocalRetriever,
27 storage::StorageEngine,
28 system::SystemManager,
29 util::{safemap::SafeMap, safeset::SafeSet, Iterable},
30};
31use itertools::Itertools;
32#[cfg(feature = "instrument")]
33use tracing::instrument;
34
35use tracing::{debug, error, warn};
36
37pub struct PeerState {
38 sender: Box<dyn PeerSender>,
39 _durable: bool,
40 subscription_handler: SubscriptionHandler,
41 pending_requests: SafeMap<proto::RequestId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
42 pending_updates: SafeMap<proto::UpdateId, oneshot::Sender<Result<proto::NodeResponseBody, RequestError>>>,
43}
44
45impl PeerState {
46 pub fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError> { self.sender.send_message(message) }
47}
48
49pub struct MatchArgs {
50 pub selection: ankql::ast::Selection,
51 pub cached: bool,
52}
53
54impl TryInto<MatchArgs> for &str {
55 type Error = ankql::error::ParseError;
56 fn try_into(self) -> Result<MatchArgs, Self::Error> { Ok(MatchArgs { selection: ankql::parser::parse_selection(self)?, cached: true }) }
57}
58impl TryInto<MatchArgs> for String {
59 type Error = ankql::error::ParseError;
60 fn try_into(self) -> Result<MatchArgs, Self::Error> {
61 Ok(MatchArgs { selection: ankql::parser::parse_selection(&self)?, cached: true })
62 }
63}
64
65impl From<ankql::ast::Predicate> for MatchArgs {
66 fn from(val: ankql::ast::Predicate) -> Self {
67 MatchArgs { selection: ankql::ast::Selection { predicate: val, order_by: None, limit: None }, cached: true }
68 }
69}
70
71impl From<ankql::ast::Selection> for MatchArgs {
72 fn from(val: ankql::ast::Selection) -> Self { MatchArgs { selection: val, cached: true } }
73}
74
75impl From<ankql::error::ParseError> for RetrievalError {
76 fn from(e: ankql::error::ParseError) -> Self { RetrievalError::ParseError(e) }
77}
78
79pub fn nocache<T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError>>(s: T) -> Result<MatchArgs, ankql::error::ParseError> {
80 MatchArgs::nocache(s)
81}
82impl MatchArgs {
83 pub fn nocache<T>(s: T) -> Result<Self, ankql::error::ParseError>
84 where T: TryInto<ankql::ast::Selection, Error = ankql::error::ParseError> {
85 Ok(Self { selection: s.try_into()?, cached: false })
86 }
87}
88
89pub struct Node<SE, PA>(pub(crate) Arc<NodeInner<SE, PA>>)
92where PA: PolicyAgent;
93impl<SE, PA> Clone for Node<SE, PA>
94where PA: PolicyAgent
95{
96 fn clone(&self) -> Self { Self(self.0.clone()) }
97}
98
99pub struct WeakNode<SE, PA>(Weak<NodeInner<SE, PA>>)
100where PA: PolicyAgent;
101impl<SE, PA> Clone for WeakNode<SE, PA>
102where PA: PolicyAgent
103{
104 fn clone(&self) -> Self { Self(self.0.clone()) }
105}
106
107impl<SE, PA> WeakNode<SE, PA>
108where PA: PolicyAgent
109{
110 pub fn upgrade(&self) -> Option<Node<SE, PA>> { self.0.upgrade().map(Node) }
111}
112
113impl<SE, PA> Deref for Node<SE, PA>
114where PA: PolicyAgent
115{
116 type Target = Arc<NodeInner<SE, PA>>;
117 fn deref(&self) -> &Self::Target { &self.0 }
118}
119
120pub trait ContextData: Send + Sync + Clone + Hash + Eq + 'static {}
123
124pub struct NodeInner<SE, PA>
125where PA: PolicyAgent
126{
127 pub id: proto::EntityId,
128 pub durable: bool,
129 pub collections: CollectionSet<SE>,
130
131 pub(crate) entities: WeakEntitySet,
132 peer_connections: SafeMap<proto::EntityId, Arc<PeerState>>,
133 durable_peers: SafeSet<proto::EntityId>,
134
135 pub(crate) predicate_context: SafeMap<proto::QueryId, PA::ContextData>,
136
137 pub(crate) reactor: Reactor,
139 pub(crate) policy_agent: PA,
140 pub system: SystemManager<SE, PA>,
141
142 pub(crate) subscription_relay: Option<SubscriptionRelay<PA::ContextData, crate::livequery::WeakEntityLiveQuery>>,
143}
144
145impl<SE, PA> Node<SE, PA>
146where
147 SE: StorageEngine + Send + Sync + 'static,
148 PA: PolicyAgent + Send + Sync + 'static,
149{
150 pub fn new(engine: Arc<SE>, policy_agent: PA) -> Self {
151 let collections = CollectionSet::new(engine.clone());
152 let entityset: WeakEntitySet = Default::default();
153 let id = proto::EntityId::new();
154 let reactor = Reactor::new();
155 notice_info!("Node {id:#} created as ephemeral");
156
157 let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), false);
158
159 let subscription_relay = Some(SubscriptionRelay::new());
161
162 let node = Node(Arc::new(NodeInner {
163 id,
164 collections,
165 entities: entityset,
166 peer_connections: SafeMap::new(),
167 durable_peers: SafeSet::new(),
168 reactor,
169 durable: false,
170 policy_agent,
171 system: system_manager,
172 predicate_context: SafeMap::new(),
173 subscription_relay,
174 }));
175
176 if let Some(ref relay) = node.subscription_relay {
178 let weak_node = node.weak();
179 if relay.set_node(Arc::new(weak_node)).is_err() {
180 warn!("Failed to set message sender for subscription relay");
181 }
182 }
183
184 node
185 }
186 pub fn new_durable(engine: Arc<SE>, policy_agent: PA) -> Self {
187 let collections = CollectionSet::new(engine);
188 let entityset: WeakEntitySet = Default::default();
189 let id = proto::EntityId::new();
190 let reactor = Reactor::new();
191 notice_info!("Node {id:#} created as durable");
192
193 let system_manager = SystemManager::new(collections.clone(), entityset.clone(), reactor.clone(), true);
194
195 Node(Arc::new(NodeInner {
196 id,
197 collections,
198 entities: entityset,
199 peer_connections: SafeMap::new(),
200 durable_peers: SafeSet::new(),
201 reactor,
202 durable: true,
203 policy_agent,
204 system: system_manager,
205 predicate_context: SafeMap::new(),
206 subscription_relay: None,
207 }))
208 }
209 pub fn weak(&self) -> WeakNode<SE, PA> { WeakNode(Arc::downgrade(&self.0)) }
210
211 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %presence.node_id.to_base64_short(), durable = %presence.durable)))]
212 pub fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
213 action_info!(self, "register_peer", "{}", &presence);
214
215 let subscription_handler = SubscriptionHandler::new(presence.node_id, self);
216 self.peer_connections.insert(
217 presence.node_id,
218 Arc::new(PeerState {
219 sender,
220 _durable: presence.durable,
221 subscription_handler,
222 pending_requests: SafeMap::new(),
223 pending_updates: SafeMap::new(),
224 }),
225 );
226 if presence.durable {
227 self.durable_peers.insert(presence.node_id);
228
229 if let Some(ref relay) = self.subscription_relay {
231 relay.notify_peer_connected(presence.node_id);
232 }
233
234 if !self.durable {
235 if let Some(system_root) = presence.system_root {
236 action_info!(self, "received system root", "{}", &system_root.payload);
237 let me = self.clone();
238 crate::task::spawn(async move {
239 if let Err(e) = me.system.join_system(system_root).await {
240 action_error!(me, "failed to join system", "{}", &e);
241 } else {
242 action_info!(me, "successfully joined system");
243 }
244 });
245 } else {
246 error!("Node({}) durable peer {} has no system root", self.id, presence.node_id);
247 }
248 }
249 }
250 }
252 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(node_id = %node_id.to_base64_short())))]
253 pub fn deregister_peer(&self, node_id: proto::EntityId) {
254 notice_info!("Node({:#}) deregister_peer {:#}", self.id, node_id);
255
256 self.durable_peers.remove(&node_id);
257 if let Some(peer_state) = self.peer_connections.remove(&node_id) {
259 action_info!(self, "unsubscribing", "subscription {} for peer {}", peer_state.subscription_handler.subscription_id(), node_id);
260 }
262
263 if let Some(ref relay) = self.subscription_relay {
265 relay.notify_peer_disconnected(node_id);
266 }
267 }
268 #[cfg_attr(feature = "instrument", instrument(skip_all, fields(node_id = %node_id, request_body = %request_body)))]
269 pub async fn request<'a, C>(
270 &self,
271 node_id: proto::EntityId,
272 cdata: &C,
273 request_body: proto::NodeRequestBody,
274 ) -> Result<proto::NodeResponseBody, RequestError>
275 where
276 C: Iterable<PA::ContextData>,
277 {
278 let (response_tx, response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
279 let request_id = proto::RequestId::new();
280
281 let request = proto::NodeRequest { id: request_id.clone(), to: node_id, from: self.id, body: request_body };
282 let auth = self.policy_agent.sign_request(self, cdata, &request);
283
284 let connection = self.peer_connections.get(&node_id).ok_or(RequestError::PeerNotConnected)?;
286
287 connection.pending_requests.insert(request_id, response_tx);
288 connection.send_message(proto::NodeMessage::Request { auth, request })?;
289
290 response_rx.await.map_err(|_| RequestError::InternalChannelClosed)?
292 }
293
294 pub fn send_update(&self, node_id: proto::EntityId, notification: proto::NodeUpdateBody) {
296 debug!("{self}.send_update({node_id:#}, {notification})");
298 let (response_tx, _response_rx) = oneshot::channel::<Result<proto::NodeResponseBody, RequestError>>();
299 let id = proto::UpdateId::new();
300
301 let Some(connection) = self.peer_connections.get(&node_id) else {
303 warn!("Failed to send update to peer {}: {}", node_id, RequestError::PeerNotConnected);
304 return;
305 };
306
307 connection.pending_updates.insert(id.clone(), response_tx);
309
310 let notification = proto::NodeMessage::Update(proto::NodeUpdate { id, from: self.id, to: node_id, body: notification });
311
312 match connection.send_message(notification) {
313 Ok(_) => {}
314 Err(e) => {
315 warn!("Failed to send update to peer {}: {}", node_id, e);
316 }
317 };
318
319 }
321
322 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(message = %message)))]
326 pub async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
327 match message {
328 proto::NodeMessage::Update(update) => {
329 debug!("Node({}) received update {}", self.id, update);
330
331 if let Some(sender) = { self.peer_connections.get(&update.from).map(|c| c.sender.cloned()) } {
332 let _from = update.from;
333 let _id = update.id.clone();
334 if update.to != self.id {
335 warn!("{} received message from {} but is not the intended recipient", self.id, update.from);
336 return Ok(());
337 }
338
339 let id = update.id.clone();
341 let to = update.from;
342 let from = self.id;
343
344 let body = match self.handle_update(update).await {
346 Ok(_) => proto::NodeUpdateAckBody::Success,
347 Err(e) => proto::NodeUpdateAckBody::Error(e.to_string()),
348 };
349
350 sender.send_message(proto::NodeMessage::UpdateAck(proto::NodeUpdateAck { id, from, to, body }))?;
351 }
352 }
353 proto::NodeMessage::UpdateAck(ack) => {
354 debug!("Node({}) received ack notification {} {}", self.id, ack.id, ack.body);
355 }
360 proto::NodeMessage::Request { auth, request } => {
361 debug!("Node({}) received request {}", self.id, request);
362 let cdata = self.policy_agent.check_request(self, &auth, &request).await?;
368
369 if let Some(sender) = { self.peer_connections.get(&request.from).map(|c| c.sender.cloned()) } {
371 let from = request.from;
372 let request_id = request.id.clone();
373 if request.to != self.id {
374 warn!("{} received message from {} but is not the intended recipient", self.id, request.from);
375 return Ok(());
376 }
377
378 let body = match self.handle_request(&cdata, request).await {
379 Ok(result) => result,
380 Err(e) => proto::NodeResponseBody::Error(e.to_string()),
381 };
382 let _result = sender.send_message(proto::NodeMessage::Response(proto::NodeResponse {
383 request_id,
384 from: self.id,
385 to: from,
386 body,
387 }));
388 }
389 }
390 proto::NodeMessage::Response(response) => {
391 debug!("Node {} received response {}", self.id, response);
392 let connection = self.peer_connections.get(&response.from).ok_or(RequestError::PeerNotConnected)?;
393 if let Some(tx) = connection.pending_requests.remove(&response.request_id) {
394 tx.send(Ok(response.body)).map_err(|e| anyhow!("Failed to send response: {:?}", e))?;
395 }
396 }
397 proto::NodeMessage::UnsubscribeQuery { from, query_id } => {
398 if let Some(peer_state) = self.peer_connections.get(&from) {
400 peer_state.subscription_handler.remove_predicate(query_id)?;
401 }
402 }
403 }
404 Ok(())
405 }
406
407 #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(request = %request)))]
408 async fn handle_request<C>(&self, cdata: &C, request: proto::NodeRequest) -> anyhow::Result<proto::NodeResponseBody>
409 where C: Iterable<PA::ContextData> {
410 match request.body {
411 proto::NodeRequestBody::CommitTransaction { id, events } => {
412 let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for CommitTransaction"))?;
416 match self.commit_remote_transaction(cdata, id.clone(), events).await {
417 Ok(_) => Ok(proto::NodeResponseBody::CommitComplete { id }),
418 Err(e) => Ok(proto::NodeResponseBody::Error(e.to_string())),
419 }
420 }
421 proto::NodeRequestBody::Fetch { collection, mut selection, known_matches } => {
422 self.policy_agent.can_access_collection(cdata, &collection)?;
423 let storage_collection = self.collections.get(&collection).await?;
424 selection.predicate = self.policy_agent.filter_predicate(cdata, &collection, selection.predicate)?;
425
426 let expanded_states = crate::util::expand_states::expand_states(
428 storage_collection.fetch_states(&selection).await?,
429 known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
430 &storage_collection,
431 )
432 .await?;
433
434 let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
435
436 let mut deltas = Vec::new();
437 for state in expanded_states {
438 if self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state).is_err() {
439 continue;
440 }
441
442 if let Some(delta) = self.generate_entity_delta(&known_map, state, &storage_collection).await? {
445 deltas.push(delta);
446 }
447 }
448 Ok(proto::NodeResponseBody::Fetch(deltas))
449 }
450 proto::NodeRequestBody::Get { collection, ids } => {
451 self.policy_agent.can_access_collection(cdata, &collection)?;
452 let storage_collection = self.collections.get(&collection).await?;
453
454 let mut states = Vec::new();
456 for state in storage_collection.get_states(ids).await? {
457 match self.policy_agent.check_read(cdata, &state.payload.entity_id, &collection, &state.payload.state) {
458 Ok(_) => states.push(state),
459 Err(AccessDenied::ByPolicy(_)) => {}
460 Err(e) => return Err(anyhow!("Error from peer get: {}", e)),
462 }
463 }
464
465 Ok(proto::NodeResponseBody::Get(states))
466 }
467 proto::NodeRequestBody::GetEvents { collection, event_ids } => {
468 self.policy_agent.can_access_collection(cdata, &collection)?;
469 let storage_collection = self.collections.get(&collection).await?;
470
471 let mut events = Vec::new();
473 for event in storage_collection.get_events(event_ids).await? {
474 match self.policy_agent.check_read_event(cdata, &event) {
475 Ok(_) => events.push(event),
476 Err(AccessDenied::ByPolicy(_)) => {}
477 Err(e) => return Err(anyhow!("Error from peer subscription: {}", e)),
479 }
480 }
481
482 Ok(proto::NodeResponseBody::GetEvents(events))
483 }
484 proto::NodeRequestBody::SubscribeQuery { query_id, collection, selection, version, known_matches } => {
485 let peer_state = self.peer_connections.get(&request.from).ok_or_else(|| anyhow!("Peer {} not connected", request.from))?;
486 use itertools::Itertools;
488 let cdata = cdata.iterable().exactly_one().map_err(|_| anyhow!("Only one cdata is permitted for SubscribePredicate"))?;
489 peer_state.subscription_handler.subscribe_query(self, query_id, collection, selection, cdata, version, known_matches).await
490 }
491 }
492 }
493
494 async fn handle_update(&self, notification: proto::NodeUpdate) -> anyhow::Result<()> {
495 let Some(_connection) = self.peer_connections.get(¬ification.from) else {
496 return Err(anyhow!("Rejected notification from unknown node {}", notification.from));
497 };
498
499 match notification.body {
500 proto::NodeUpdateBody::SubscriptionUpdate { items } => {
501 tracing::info!("Node({}) received subscription update from peer {}", self.id, notification.from);
502 crate::node_applier::NodeApplier::apply_updates(self, ¬ification.from, items).await?;
503 Ok(())
504 }
505 }
506 }
507
508 pub(crate) async fn relay_to_required_peers(
509 &self,
510 cdata: &PA::ContextData,
511 id: proto::TransactionId,
512 events: &[Attested<proto::Event>],
513 ) -> Result<(), MutationError> {
514 for peer_id in self.get_durable_peers() {
517 match self.request(peer_id, cdata, proto::NodeRequestBody::CommitTransaction { id: id.clone(), events: events.to_vec() }).await
518 {
519 Ok(proto::NodeResponseBody::CommitComplete { .. }) => (),
520 Ok(proto::NodeResponseBody::Error(e)) => {
521 return Err(MutationError::General(Box::new(std::io::Error::other(format!("Peer {} rejected: {}", peer_id, e)))));
522 }
523 _ => {
524 return Err(MutationError::General(Box::new(std::io::Error::other(format!(
525 "Peer {} returned unexpected response",
526 peer_id
527 )))));
528 }
529 }
530 }
531 Ok(())
532 }
533
534 pub async fn commit_remote_transaction(
536 &self,
537 cdata: &PA::ContextData,
538 id: proto::TransactionId,
539 mut events: Vec<Attested<proto::Event>>,
540 ) -> Result<(), MutationError> {
541 debug!("{self} commiting transaction {id} with {} events", events.len());
542 let mut changes = Vec::new();
543
544 for event in events.iter_mut() {
545 let collection = self.collections.get(&event.payload.collection).await?;
546
547 let retriever = LocalRetriever::new(collection.clone());
549 let entity = self.entities.get_retrieve_or_create(&retriever, &event.payload.collection, &event.payload.entity_id).await?;
550
551 if let Some(attestation) = self.policy_agent.check_event(self, cdata, &entity, &event.payload)? {
553 event.attestations.push(attestation);
554 }
555
556 if entity.apply_event(&retriever, &event.payload).await? {
557 let state = entity.to_state()?;
558 let entity_state = EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
559 let attestation = self.policy_agent.attest_state(self, &entity_state);
560 let attested = Attested::opt(entity_state, attestation);
561 collection.add_event(event).await?;
562 collection.set_state(attested).await?;
563 changes.push(EntityChange::new(entity.clone(), vec![event.clone()])?);
564 }
565 }
566
567 self.reactor.notify_change(changes).await;
568
569 Ok(())
570 }
571
572 pub(crate) async fn generate_entity_delta(
575 &self,
576 known_map: &std::collections::HashMap<proto::EntityId, proto::Clock>,
577 entity_state: proto::Attested<proto::EntityState>,
578 storage_collection: &crate::storage::StorageCollectionWrapper,
579 ) -> anyhow::Result<Option<proto::EntityDelta>>
580 where
581 SE: StorageEngine + Send + Sync + 'static,
582 PA: PolicyAgent + Send + Sync + 'static,
583 {
584 let proto::Attested { payload: proto::EntityState { entity_id, collection, state }, attestations } = entity_state;
586 let current_head = &state.head;
587
588 if let Some(known_head) = known_map.get(&entity_id) {
590 if known_head == current_head {
592 return Ok(None);
593 }
594
595 match self.collect_event_bridge(storage_collection, known_head, current_head).await {
597 Ok(attested_events) if !attested_events.is_empty() => {
598 let event_fragments: Vec<proto::EventFragment> = attested_events.into_iter().map(|e| e.into()).collect();
600
601 return Ok(Some(proto::EntityDelta {
602 entity_id,
603 collection,
604 content: proto::DeltaContent::EventBridge { events: event_fragments },
605 }));
606 }
607 _ => {
608 }
610 }
611 }
612
613 let state_fragment = proto::StateFragment { state, attestations };
615 Ok(Some(proto::EntityDelta { entity_id, collection, content: proto::DeltaContent::StateSnapshot { state: state_fragment } }))
616 }
617
618 pub(crate) async fn collect_event_bridge(
620 &self,
621 storage_collection: &crate::storage::StorageCollectionWrapper,
622 known_head: &proto::Clock,
623 current_head: &proto::Clock,
624 ) -> anyhow::Result<Vec<proto::Attested<proto::Event>>>
625 where
626 SE: StorageEngine + Send + Sync + 'static,
627 PA: PolicyAgent + Send + Sync + 'static,
628 {
629 use crate::lineage::{EventAccumulator, Ordering};
630 use crate::retrieval::LocalRetriever;
631
632 let retriever = LocalRetriever::new(storage_collection.clone());
633 let accumulator = EventAccumulator::new(None); let mut comparison = crate::lineage::Comparison::new_with_accumulator(
635 &retriever,
636 current_head,
637 known_head,
638 100000, Some(accumulator),
640 );
641
642 loop {
644 match comparison.step().await? {
645 Some(Ordering::Descends) => {
646 break;
648 }
649 Some(Ordering::Equal) => {
650 break;
652 }
653 Some(_) => {
654 return Ok(vec![]);
656 }
657 None => {
658 }
660 }
661 }
662
663 Ok(comparison.take_accumulated_events().unwrap_or_default())
665 }
666
667 pub fn next_entity_id(&self) -> proto::EntityId { proto::EntityId::new() }
668
669 pub fn context(&self, data: PA::ContextData) -> Result<Context, anyhow::Error> {
670 if !self.system.is_system_ready() {
671 return Err(anyhow!("System is not ready"));
672 }
673 Ok(Context::new(Node::clone(self), data))
674 }
675
676 pub async fn context_async(&self, data: PA::ContextData) -> Context {
677 self.system.wait_system_ready().await;
678 Context::new(Node::clone(self), data)
679 }
680
681 pub(crate) async fn get_from_peer(
682 &self,
683 collection_id: &CollectionId,
684 ids: Vec<proto::EntityId>,
685 cdata: &PA::ContextData,
686 ) -> Result<(), RetrievalError> {
687 let peer_id = self.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
688
689 match self
690 .request(peer_id, cdata, proto::NodeRequestBody::Get { collection: collection_id.clone(), ids })
691 .await
692 .map_err(|e| RetrievalError::Other(format!("{:?}", e)))?
693 {
694 proto::NodeResponseBody::Get(states) => {
695 let collection = self.collections.get(collection_id).await?;
696
697 for state in states {
700 self.policy_agent.validate_received_state(self, &peer_id, &state)?;
701 collection.set_state(state).await.map_err(|e| RetrievalError::Other(format!("{:?}", e)))?;
702 }
703 Ok(())
704 }
705 proto::NodeResponseBody::Error(e) => {
706 debug!("Error from peer fetch: {}", e);
707 Err(RetrievalError::Other(format!("{:?}", e)))
708 }
709 _ => {
710 debug!("Unexpected response type from peer get");
711 Err(RetrievalError::Other("Unexpected response type".to_string()))
712 }
713 }
714 }
715
716 pub fn get_durable_peer_random(&self) -> Option<proto::EntityId> {
718 let mut rng = rand::thread_rng();
719 let peers: Vec<_> = self.durable_peers.to_vec();
721 peers.choose(&mut rng).copied()
722 }
723
724 pub fn get_durable_peers(&self) -> Vec<proto::EntityId> { self.durable_peers.to_vec() }
726}
727
728impl<SE, PA> NodeInner<SE, PA>
729where
730 SE: StorageEngine + Send + Sync + 'static,
731 PA: PolicyAgent + Send + Sync + 'static,
732{
733 pub async fn request_remote_unsubscribe(&self, query_id: proto::QueryId, peers: Vec<proto::EntityId>) -> anyhow::Result<()> {
734 for (peer_id, item) in self.peer_connections.get_list(peers) {
735 if let Some(connection) = item {
736 connection.send_message(proto::NodeMessage::UnsubscribeQuery { from: peer_id, query_id })?;
737 } else {
738 warn!("Peer {} not connected", peer_id);
739 }
740 }
741
742 Ok(())
743 }
744}
745
746impl<SE, PA> Drop for NodeInner<SE, PA>
747where PA: PolicyAgent
748{
749 fn drop(&mut self) {
750 notice_info!("Node({}) dropped", self.id);
751 }
752}
753
754impl<SE, PA> Node<SE, PA>
755where
756 SE: StorageEngine + Send + Sync + 'static,
757 PA: PolicyAgent + Send + Sync + 'static,
758{
759 pub(crate) fn subscribe_remote_query(
760 &self,
761 query_id: proto::QueryId,
762 collection_id: CollectionId,
763 selection: ankql::ast::Selection,
764 cdata: PA::ContextData,
765 version: u32,
766 livequery: crate::livequery::WeakEntityLiveQuery,
767 ) {
768 if let Some(ref relay) = self.subscription_relay {
769 self.predicate_context.insert(query_id, cdata.clone());
770 relay.subscribe_query(query_id, collection_id, selection, cdata, version, livequery);
771 }
772 }
773
774 pub async fn fetch_entities_from_local(
775 &self,
776 collection_id: &CollectionId,
777 selection: &ankql::ast::Selection,
778 ) -> Result<Vec<Entity>, RetrievalError> {
779 let storage_collection = self.collections.get(collection_id).await?;
780 let initial_states = storage_collection.fetch_states(selection).await?;
781 let retriever = crate::retrieval::LocalRetriever::new(storage_collection);
782 let mut entities = Vec::with_capacity(initial_states.len());
783 for state in initial_states {
784 let (_, entity) =
785 self.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
786 entities.push(entity);
787 }
788 Ok(entities)
789 }
790}
791#[async_trait::async_trait]
792pub trait TNodeErased<E: AbstractEntity + Filterable + Send + 'static = Entity>: Send + Sync + 'static {
793 fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId);
794 fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error>;
795 async fn fetch_entities_from_local(
796 &self,
797 collection_id: &CollectionId,
798 selection: &ankql::ast::Selection,
799 ) -> Result<Vec<E>, RetrievalError>;
800 fn reactor(&self) -> &Reactor<E>;
801 fn has_subscription_relay(&self) -> bool;
802}
803
804#[async_trait::async_trait]
805impl<SE, PA> TNodeErased<Entity> for Node<SE, PA>
806where
807 SE: StorageEngine + Send + Sync + 'static,
808 PA: PolicyAgent + Send + Sync + 'static,
809{
810 fn unsubscribe_remote_predicate(&self, query_id: proto::QueryId) {
811 self.predicate_context.remove(&query_id);
813
814 if let Some(ref relay) = self.subscription_relay {
816 relay.unsubscribe_predicate(query_id);
817 }
818 }
819
820 fn update_remote_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error> {
821 if let Some(ref relay) = self.subscription_relay {
822 relay.update_query(query_id, selection, version)?;
823 }
824 Ok(())
825 }
826
827 async fn fetch_entities_from_local(
828 &self,
829 collection_id: &CollectionId,
830 selection: &ankql::ast::Selection,
831 ) -> Result<Vec<Entity>, RetrievalError> {
832 Node::fetch_entities_from_local(self, collection_id, selection).await
833 }
834
835 fn reactor(&self) -> &Reactor<Entity> { &self.0.reactor }
836
837 fn has_subscription_relay(&self) -> bool { self.subscription_relay.is_some() }
838}
839
840impl<SE, PA> fmt::Display for Node<SE, PA>
841where PA: PolicyAgent
842{
843 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
844 write!(f, "\x1b[1;34mnode\x1b[2m[\x1b[1;34m{}\x1b[2m]\x1b[0m", self.id.to_base64_short())
846 }
847}