Skip to main content

ankurah_core/peer_subscription/
server.rs

1use ankurah_proto::{self as proto, Attested};
2use tracing::warn;
3
4use crate::{
5    error::SubscriptionError,
6    node::Node,
7    policy::PolicyAgent,
8    reactor::{ReactorSubscription, ReactorUpdate},
9    storage::StorageEngine,
10};
11use ankurah_signals::{Subscribe, SubscriptionGuard};
12
13/// Manages a peer's subscription to this node's reactor.
14///
15/// This handler owns both the ReactorSubscription and the SubscriptionGuard
16/// for listening to changes on that subscription.
17pub struct SubscriptionHandler {
18    _peer_id: proto::EntityId,
19    subscription: ReactorSubscription,
20    _guard: SubscriptionGuard,
21}
22
23impl SubscriptionHandler {
24    pub fn new<SE, PA>(peer_id: proto::EntityId, node: &Node<SE, PA>) -> Self
25    where
26        SE: StorageEngine + Send + Sync + 'static,
27        PA: PolicyAgent + Send + Sync + 'static,
28    {
29        let subscription = node.reactor.subscribe();
30        let weak_node = node.weak();
31
32        // Subscribe to changes on this subscription
33        let guard = subscription.subscribe(move |update: ReactorUpdate| {
34            tracing::info!("SubscriptionHandler[{}] received reactor update with {} items", peer_id, update.items.len());
35
36            if let Some(node) = weak_node.upgrade() {
37                tracing::debug!("SubscriptionHandler[{}] sending update to peer {}", peer_id, peer_id);
38                node.send_update(
39                    peer_id,
40                    proto::NodeUpdateBody::SubscriptionUpdate {
41                        items: update.items.into_iter().filter_map(|item| convert_item(&node, peer_id, item)).collect(),
42                    },
43                );
44            }
45        });
46
47        Self { _peer_id: peer_id, subscription, _guard: guard }
48    }
49
50    /// Get the subscription ID for this peer.
51    pub fn subscription_id(&self) -> crate::reactor::ReactorSubscriptionId { self.subscription.id() }
52
53    /// Get a reference to the subscription for adding/removing predicates.
54    pub fn subscription(&self) -> &ReactorSubscription { &self.subscription }
55
56    /// Remove a predicate from this peer's subscription.
57    pub fn remove_predicate(&self, query_id: proto::QueryId) -> Result<(), SubscriptionError> {
58        self.subscription.remove_predicate(query_id)?;
59        Ok(())
60    }
61
62    /// Handle a subscription request for this peer.
63    pub async fn subscribe_query<SE, PA>(
64        &self,
65        node: &Node<SE, PA>,
66        query_id: proto::QueryId,
67        collection_id: proto::CollectionId,
68        mut selection: ankql::ast::Selection,
69        cdata: &PA::ContextData,
70        version: u32,
71        known_matches: Vec<proto::KnownEntity>,
72    ) -> anyhow::Result<proto::NodeResponseBody>
73    where
74        SE: StorageEngine + Send + Sync + 'static,
75        PA: PolicyAgent + Send + Sync + 'static,
76    {
77        if version == 0 {
78            return Err(anyhow::anyhow!("Invalid version 0 for subscription"));
79        }
80        node.policy_agent.can_access_collection(cdata, &collection_id)?;
81        selection.predicate = node.policy_agent.filter_predicate(cdata, &collection_id, selection.predicate)?;
82        let storage_collection = node.collections.get(&collection_id).await?;
83
84        // Add or update the query - idempotent, works whether query exists or not
85        let matching_entities = node
86            .reactor
87            .upsert_query(self.subscription.id(), query_id, collection_id.clone(), selection.clone(), node, cdata, version)
88            .await?;
89
90        // TASK: Audit SubscriptionUpdate vs QuerySubscribed sequencing https://github.com/ankurah/ankurah/issues/147
91
92        // TASK: Optimize to avoid re-attesting entities fetched from storage https://github.com/ankurah/ankurah/issues/148
93        // Convert matching entities to Attested<EntityState>
94        let initial_states: Vec<_> = matching_entities
95            .into_iter()
96            .filter_map(|e| {
97                let entity_state = e.to_entity_state().ok()?;
98                let attestation = node.policy_agent.attest_state(node, &entity_state);
99                Some(Attested::opt(entity_state, attestation))
100            })
101            .collect();
102
103        // Expand initial_states to include entities from known_matches that weren't in the predicate results
104        let expanded_states = crate::util::expand_states::expand_states(
105            initial_states,
106            known_matches.iter().map(|k| k.entity_id).collect::<Vec<_>>(),
107            &storage_collection,
108        )
109        .await?;
110
111        let known_map: std::collections::HashMap<_, _> = known_matches.into_iter().map(|k| (k.entity_id, k.head)).collect();
112
113        // Generate deltas based on known_matches - use expanded states
114        let mut deltas = Vec::with_capacity(expanded_states.len());
115        for state in expanded_states {
116            // Only include delta if heads differ (None means heads are equal)
117            if let Some(delta) = node.generate_entity_delta(&known_map, state, &storage_collection).await? {
118                deltas.push(delta);
119            }
120        }
121
122        Ok(proto::NodeResponseBody::QuerySubscribed { query_id, deltas })
123    }
124}
125
126/// Convert a single ReactorUpdateItem to a SubscriptionUpdateItem.
127fn convert_item<SE, PA>(
128    node: &Node<SE, PA>,
129    peer_id: proto::EntityId,
130    item: crate::reactor::ReactorUpdateItem,
131) -> Option<proto::SubscriptionUpdateItem>
132where
133    SE: StorageEngine + Send + Sync + 'static,
134    PA: PolicyAgent + Send + Sync + 'static,
135{
136    // Convert entity to EntityState and attest it
137    let entity_state = match item.entity.to_entity_state() {
138        Ok(entity_state) => entity_state,
139        Err(e) => {
140            warn!("Failed to convert entity {} to EntityState for peer {}: {}", item.entity.id(), peer_id, e);
141            return None;
142        }
143    };
144
145    let attestation = node.policy_agent.attest_state(node, &entity_state);
146    let attested_state = Attested::opt(entity_state, attestation);
147
148    // Events should already be attested
149    let attested_events = item.events;
150
151    // Determine content based on whether we have events
152    let content = proto::UpdateContent::StateAndEvent(attested_state.into(), attested_events.into_iter().map(|e| e.into()).collect());
153
154    // Convert predicate relevance from reactor types to proto types
155    let predicate_relevance = item
156        .predicate_relevance
157        .into_iter()
158        .map(|(pred_id, membership)| {
159            let proto_membership = match membership {
160                crate::reactor::MembershipChange::Initial => proto::MembershipChange::Initial,
161                crate::reactor::MembershipChange::Add => proto::MembershipChange::Add,
162                crate::reactor::MembershipChange::Remove => proto::MembershipChange::Remove,
163            };
164            (pred_id, proto_membership)
165        })
166        .collect();
167
168    // Create subscription update item
169    Some(proto::SubscriptionUpdateItem {
170        entity_id: item.entity.id(),
171        collection: item.entity.collection().clone(),
172        content,
173        predicate_relevance,
174    })
175}