Skip to main content

calimero_node_primitives/
client.rs

1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::borrow::Cow;
4// Removed: NonZeroUsize (no longer using height)
5
6use async_stream::stream;
7use calimero_blobstore::BlobManager;
8use calimero_crypto::SharedKey;
9use calimero_network_primitives::client::NetworkClient;
10use calimero_primitives::context::{Context, ContextId};
11use calimero_primitives::events::NodeEvent;
12use calimero_primitives::identity::{PrivateKey, PublicKey};
13use calimero_store::Store;
14use calimero_utils_actix::LazyRecipient;
15use eyre::{OptionExt, WrapErr};
16use futures_util::Stream;
17use libp2p::gossipsub::{IdentTopic, TopicHash};
18use libp2p::PeerId;
19use rand::Rng;
20use tokio::sync::{broadcast, mpsc};
21use tracing::info;
22
23use calimero_network_primitives::specialized_node_invite::SpecializedNodeType;
24
25use crate::messages::{
26    NodeMessage, RegisterPendingSpecializedNodeInvite, RemovePendingSpecializedNodeInvite,
27};
28use crate::sync::BroadcastMessage;
29
30mod alias;
31mod application;
32mod blob;
33
34#[derive(Clone, Debug)]
35pub struct NodeClient {
36    datastore: Store,
37    blobstore: BlobManager,
38    network_client: NetworkClient,
39    node_manager: LazyRecipient<NodeMessage>,
40    event_sender: broadcast::Sender<NodeEvent>,
41    ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
42    specialized_node_invite_topic: String,
43}
44
45impl NodeClient {
46    #[must_use]
47    pub fn new(
48        datastore: Store,
49        blobstore: BlobManager,
50        network_client: NetworkClient,
51        node_manager: LazyRecipient<NodeMessage>,
52        event_sender: broadcast::Sender<NodeEvent>,
53        ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
54        specialized_node_invite_topic: String,
55    ) -> Self {
56        Self {
57            datastore,
58            blobstore,
59            network_client,
60            node_manager,
61            event_sender,
62            ctx_sync_tx,
63            specialized_node_invite_topic,
64        }
65    }
66
67    pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
68        let topic = IdentTopic::new(context_id);
69
70        let _ignored = self.network_client.subscribe(topic).await?;
71
72        info!(%context_id, "Subscribed to context");
73
74        Ok(())
75    }
76
77    pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
78        let topic = IdentTopic::new(context_id);
79
80        let _ignored = self.network_client.unsubscribe(topic).await?;
81
82        info!(%context_id, "Unsubscribed from context");
83
84        Ok(())
85    }
86
87    pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
88        let Some(context) = context else {
89            return self.network_client.peer_count().await;
90        };
91
92        let topic = TopicHash::from_raw(*context);
93
94        self.network_client.mesh_peer_count(topic).await
95    }
96
97    pub async fn broadcast(
98        &self,
99        context: &Context,
100        sender: &PublicKey,
101        sender_key: &PrivateKey,
102        artifact: Vec<u8>,
103        delta_id: [u8; 32],
104        parent_ids: Vec<[u8; 32]>,
105        hlc: calimero_storage::logical_clock::HybridTimestamp,
106        events: Option<Vec<u8>>,
107    ) -> eyre::Result<()> {
108        info!(
109            context_id=%context.id,
110            %sender,
111            root_hash=%context.root_hash,
112            delta_id=?delta_id,
113            parent_count=parent_ids.len(),
114            "Sending state delta"
115        );
116
117        if self.get_peers_count(Some(&context.id)).await == 0 {
118            return Ok(());
119        }
120
121        let shared_key = SharedKey::from_sk(sender_key);
122        let nonce = rand::thread_rng().gen();
123
124        let encrypted = shared_key
125            .encrypt(artifact, nonce)
126            .ok_or_eyre("failed to encrypt artifact")?;
127
128        let payload = BroadcastMessage::StateDelta {
129            context_id: context.id,
130            author_id: *sender,
131            delta_id,
132            parent_ids,
133            hlc,
134            root_hash: context.root_hash,
135            artifact: encrypted.into(),
136            nonce,
137            events: events.map(Cow::from),
138        };
139
140        let payload = borsh::to_vec(&payload)?;
141
142        let topic = TopicHash::from_raw(context.id);
143
144        let _ignored = self.network_client.publish(topic, payload).await?;
145
146        Ok(())
147    }
148
149    pub async fn broadcast_heartbeat(
150        &self,
151        context_id: &ContextId,
152        root_hash: calimero_primitives::hash::Hash,
153        dag_heads: Vec<[u8; 32]>,
154    ) -> eyre::Result<()> {
155        if self.get_peers_count(Some(context_id)).await == 0 {
156            return Ok(());
157        }
158
159        let payload = BroadcastMessage::HashHeartbeat {
160            context_id: *context_id,
161            root_hash,
162            dag_heads,
163        };
164
165        let payload = borsh::to_vec(&payload)?;
166        let topic = TopicHash::from_raw(*context_id);
167
168        let _ignored = self.network_client.publish(topic, payload).await?;
169
170        Ok(())
171    }
172
173    /// Broadcast a specialized node invite discovery to the global invite topic.
174    ///
175    /// This broadcasts a discovery message and registers a pending invite so that
176    /// when a specialized node responds with verification, the node can create an invitation.
177    ///
178    /// # Arguments
179    /// * `context_id` - The context to invite specialized nodes to
180    /// * `inviter_id` - The identity performing the invitation
181    /// * `invite_topic` - The global topic name for specialized node invite discovery
182    ///
183    /// # Returns
184    /// The nonce used in the request
185    pub async fn broadcast_specialized_node_invite(
186        &self,
187        context_id: ContextId,
188        inviter_id: PublicKey,
189    ) -> eyre::Result<[u8; 32]> {
190        let nonce: [u8; 32] = rand::thread_rng().gen();
191        // Currently only ReadOnly node type is supported
192        let node_type = SpecializedNodeType::ReadOnly;
193
194        info!(
195            %context_id,
196            %inviter_id,
197            ?node_type,
198            topic = %self.specialized_node_invite_topic,
199            nonce = %hex::encode(nonce),
200            "Broadcasting specialized node invite discovery"
201        );
202
203        // Register the pending invite FIRST to avoid race condition
204        // A fast-responding specialized node could send verification request
205        // before registration completes if we broadcast first
206        self.node_manager
207            .send(NodeMessage::RegisterPendingSpecializedNodeInvite {
208                request: RegisterPendingSpecializedNodeInvite {
209                    nonce,
210                    context_id,
211                    inviter_id,
212                },
213            })
214            .await
215            .expect("Mailbox not to be dropped");
216
217        // Now broadcast the discovery message
218        let payload = BroadcastMessage::SpecializedNodeDiscovery { nonce, node_type };
219        let payload = borsh::to_vec(&payload)?;
220        let topic = IdentTopic::new(self.specialized_node_invite_topic.to_owned());
221        let result = self.network_client.publish(topic.hash(), payload).await;
222
223        // If broadcast failed, clean up the pending invite before returning error
224        if result.is_err() {
225            self.node_manager
226                .send(NodeMessage::RemovePendingSpecializedNodeInvite {
227                    request: RemovePendingSpecializedNodeInvite { nonce },
228                })
229                .await
230                .expect("Mailbox not to be dropped");
231        }
232
233        let _ignored = result?;
234
235        Ok(nonce)
236    }
237
238    pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
239        // the caller doesn't care if there are no receivers
240        // so we create a temporary receiver
241        let _ignored = self.event_sender.subscribe();
242
243        let _ignored = self
244            .event_sender
245            .send(event)
246            // this should in-theory never happen, but just in case
247            .wrap_err("failed to send event")?;
248
249        Ok(())
250    }
251
252    pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
253        let mut receiver = self.event_sender.subscribe();
254
255        stream! {
256            loop {
257                match receiver.recv().await {
258                    Ok(event) => yield event,
259                    Err(broadcast::error::RecvError::Closed) => break,
260                    // oh, we missed a message? let's.. just ignore it
261                    Err(broadcast::error::RecvError::Lagged(_)) => {},
262                }
263            }
264        }
265    }
266
267    pub async fn sync(
268        &self,
269        context_id: Option<&ContextId>,
270        peer_id: Option<&PeerId>,
271    ) -> eyre::Result<()> {
272        self.ctx_sync_tx
273            .send((context_id.copied(), peer_id.copied()))
274            .await?;
275
276        Ok(())
277    }
278}