ankurah_core/peer_subscription/
server.rs1use ankurah_proto::{self as proto, Attested};
2use tracing::warn;
3
4use crate::{
5 error::SubscriptionError,
6 node::Node,
7 policy::PolicyAgent,
8 reactor::{ReactorSubscription, ReactorUpdate},
9 storage::StorageEngine,
10};
11use ankurah_signals::{Subscribe, SubscriptionGuard};
12
13pub struct SubscriptionHandler {
18 _peer_id: proto::EntityId,
19 subscription: ReactorSubscription,
20 _guard: SubscriptionGuard,
21}
22
23impl SubscriptionHandler {
24 pub fn new<SE, PA>(peer_id: proto::EntityId, node: &Node<SE, PA>) -> Self
25 where
26 SE: StorageEngine + Send + Sync + 'static,
27 PA: PolicyAgent + Send + Sync + 'static,
28 {
29 let subscription = node.reactor.subscribe();
30 let weak_node = node.weak();
31
32 let guard = subscription.subscribe(move |update: ReactorUpdate| {
34 tracing::info!("SubscriptionHandler[{}] received reactor update with {} items", peer_id, update.items.len());
35
36 if let Some(node) = weak_node.upgrade() {
37 tracing::debug!("SubscriptionHandler[{}] sending update to peer {}", peer_id, peer_id);
38 node.send_update(
39 peer_id,
40 proto::NodeUpdateBody::SubscriptionUpdate {
41 items: update.items.into_iter().filter_map(|item| convert_item(&node, peer_id, item)).collect(),
42 },
43 );
44 }
45 });
46
47 Self { _peer_id: peer_id, subscription, _guard: guard }
48 }
49
50 pub fn subscription_id(&self) -> crate::reactor::ReactorSubscriptionId { self.subscription.id() }
52
53 pub fn subscription(&self) -> &ReactorSubscription { &self.subscription }
55
56 pub fn remove_predicate(&self, query_id: proto::QueryId) -> Result<(), SubscriptionError> {
58 self.subscription.remove_predicate(query_id)?;
59 Ok(())
60 }
61
62 pub async fn subscribe_query<SE, PA>(
64 &self,
65 node: &Node<SE, PA>,
66 query_id: proto::QueryId,
67 collection_id: proto::CollectionId,
68 mut selection: ankql::ast::Selection,
69 cdata: &PA::ContextData,
70 version: u32,
71 known_matches: Vec<proto::KnownEntity>,
72 ) -> anyhow::Result<proto::NodeResponseBody>
73 where
74 SE: StorageEngine + Send + Sync + 'static,
75 PA: PolicyAgent + Send + Sync + 'static,
76 {
77 if version == 0 {
78 return Err(anyhow::anyhow!("Invalid version 0 for subscription"));
79 }
80 node.policy_agent.can_access_collection(cdata, &collection_id)?;
81 selection.predicate = node.policy_agent.filter_predicate(cdata, &collection_id, selection.predicate)?;
82 let storage_collection = node.collections.get(&collection_id).await?;
83
84 let matching_entities = node
86 .reactor
87 .upsert_query(self.subscription.id(), query_id, collection_id.clone(), selection.clone(), node, cdata, version)
88 .await?;
89
90 let initial_states: Vec<_> = matching_entities
95 .into_iter()
96 .filter_map(|e| {
97 let entity_state = e.to_entity_state().ok()?;
98 let attestation = node.policy_agent.attest_state(node, &entity_state);
99 Some(Attested::opt(entity_state, attestation))
100 })
101 .collect();
102
103 let expanded_states = crate::util::expand_states::expand_states(
105 initial_states,
106 known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
107 &storage_collection,
108 )
109 .await?;
110
111 let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
112
113 let mut deltas = Vec::with_capacity(expanded_states.len());
115 for state in expanded_states {
116 if let Some(delta) = node.generate_entity_delta(&known_map, state, &storage_collection).await? {
118 deltas.push(delta);
119 }
120 }
121
122 Ok(proto::NodeResponseBody::QuerySubscribed { query_id, deltas })
123 }
124}
125
126fn convert_item<SE, PA>(
128 node: &Node<SE, PA>,
129 peer_id: proto::EntityId,
130 item: crate::reactor::ReactorUpdateItem,
131) -> Option<proto::SubscriptionUpdateItem>
132where
133 SE: StorageEngine + Send + Sync + 'static,
134 PA: PolicyAgent + Send + Sync + 'static,
135{
136 let entity_state = match item.entity.to_entity_state() {
138 Ok(entity_state) => entity_state,
139 Err(e) => {
140 warn!("Failed to convert entity {} to EntityState for peer {}: {}", item.entity.id(), peer_id, e);
141 return None;
142 }
143 };
144
145 let attestation = node.policy_agent.attest_state(node, &entity_state);
146 let attested_state = Attested::opt(entity_state, attestation);
147
148 let attested_events = item.events;
150
151 let content = proto::UpdateContent::StateAndEvent(attested_state.into(), attested_events.into_iter().map(|e| e.into()).collect());
153
154 let predicate_relevance = item
156 .predicate_relevance
157 .into_iter()
158 .map(|(pred_id, membership)| {
159 let proto_membership = match membership {
160 crate::reactor::MembershipChange::Initial => proto::MembershipChange::Initial,
161 crate::reactor::MembershipChange::Add => proto::MembershipChange::Add,
162 crate::reactor::MembershipChange::Remove => proto::MembershipChange::Remove,
163 };
164 (pred_id, proto_membership)
165 })
166 .collect();
167
168 Some(proto::SubscriptionUpdateItem {
170 entity_id: item.entity.id(),
171 collection: item.entity.collection().clone(),
172 content,
173 predicate_relevance,
174 })
175}