calimero_node_primitives/
client.rs1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::borrow::Cow;
4use async_stream::stream;
7use calimero_blobstore::BlobManager;
8use calimero_crypto::SharedKey;
9use calimero_network_primitives::client::NetworkClient;
10use calimero_primitives::context::{Context, ContextId};
11use calimero_primitives::events::NodeEvent;
12use calimero_primitives::identity::{PrivateKey, PublicKey};
13use calimero_store::Store;
14use calimero_utils_actix::LazyRecipient;
15use eyre::{OptionExt, WrapErr};
16use futures_util::Stream;
17use libp2p::gossipsub::{IdentTopic, TopicHash};
18use libp2p::PeerId;
19use rand::Rng;
20use tokio::sync::{broadcast, mpsc};
21use tracing::info;
22
23use calimero_network_primitives::specialized_node_invite::SpecializedNodeType;
24
25use crate::messages::{
26 NodeMessage, RegisterPendingSpecializedNodeInvite, RemovePendingSpecializedNodeInvite,
27};
28use crate::sync::BroadcastMessage;
29
30mod alias;
31mod application;
32mod blob;
33
34#[derive(Clone, Debug)]
35pub struct NodeClient {
36 datastore: Store,
37 blobstore: BlobManager,
38 network_client: NetworkClient,
39 node_manager: LazyRecipient<NodeMessage>,
40 event_sender: broadcast::Sender<NodeEvent>,
41 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
42 specialized_node_invite_topic: String,
43}
44
45impl NodeClient {
46 #[must_use]
47 pub fn new(
48 datastore: Store,
49 blobstore: BlobManager,
50 network_client: NetworkClient,
51 node_manager: LazyRecipient<NodeMessage>,
52 event_sender: broadcast::Sender<NodeEvent>,
53 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
54 specialized_node_invite_topic: String,
55 ) -> Self {
56 Self {
57 datastore,
58 blobstore,
59 network_client,
60 node_manager,
61 event_sender,
62 ctx_sync_tx,
63 specialized_node_invite_topic,
64 }
65 }
66
67 pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
68 let topic = IdentTopic::new(context_id);
69
70 let _ignored = self.network_client.subscribe(topic).await?;
71
72 info!(%context_id, "Subscribed to context");
73
74 Ok(())
75 }
76
77 pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
78 let topic = IdentTopic::new(context_id);
79
80 let _ignored = self.network_client.unsubscribe(topic).await?;
81
82 info!(%context_id, "Unsubscribed from context");
83
84 Ok(())
85 }
86
87 pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
88 let Some(context) = context else {
89 return self.network_client.peer_count().await;
90 };
91
92 let topic = TopicHash::from_raw(*context);
93
94 self.network_client.mesh_peer_count(topic).await
95 }
96
97 pub async fn broadcast(
98 &self,
99 context: &Context,
100 sender: &PublicKey,
101 sender_key: &PrivateKey,
102 artifact: Vec<u8>,
103 delta_id: [u8; 32],
104 parent_ids: Vec<[u8; 32]>,
105 hlc: calimero_storage::logical_clock::HybridTimestamp,
106 events: Option<Vec<u8>>,
107 ) -> eyre::Result<()> {
108 info!(
109 context_id=%context.id,
110 %sender,
111 root_hash=%context.root_hash,
112 delta_id=?delta_id,
113 parent_count=parent_ids.len(),
114 "Sending state delta"
115 );
116
117 if self.get_peers_count(Some(&context.id)).await == 0 {
118 return Ok(());
119 }
120
121 let shared_key = SharedKey::from_sk(sender_key);
122 let nonce = rand::thread_rng().gen();
123
124 let encrypted = shared_key
125 .encrypt(artifact, nonce)
126 .ok_or_eyre("failed to encrypt artifact")?;
127
128 let payload = BroadcastMessage::StateDelta {
129 context_id: context.id,
130 author_id: *sender,
131 delta_id,
132 parent_ids,
133 hlc,
134 root_hash: context.root_hash,
135 artifact: encrypted.into(),
136 nonce,
137 events: events.map(Cow::from),
138 };
139
140 let payload = borsh::to_vec(&payload)?;
141
142 let topic = TopicHash::from_raw(context.id);
143
144 let _ignored = self.network_client.publish(topic, payload).await?;
145
146 Ok(())
147 }
148
149 pub async fn broadcast_heartbeat(
150 &self,
151 context_id: &ContextId,
152 root_hash: calimero_primitives::hash::Hash,
153 dag_heads: Vec<[u8; 32]>,
154 ) -> eyre::Result<()> {
155 if self.get_peers_count(Some(context_id)).await == 0 {
156 return Ok(());
157 }
158
159 let payload = BroadcastMessage::HashHeartbeat {
160 context_id: *context_id,
161 root_hash,
162 dag_heads,
163 };
164
165 let payload = borsh::to_vec(&payload)?;
166 let topic = TopicHash::from_raw(*context_id);
167
168 let _ignored = self.network_client.publish(topic, payload).await?;
169
170 Ok(())
171 }
172
173 pub async fn broadcast_specialized_node_invite(
186 &self,
187 context_id: ContextId,
188 inviter_id: PublicKey,
189 ) -> eyre::Result<[u8; 32]> {
190 let nonce: [u8; 32] = rand::thread_rng().gen();
191 let node_type = SpecializedNodeType::ReadOnly;
193
194 info!(
195 %context_id,
196 %inviter_id,
197 ?node_type,
198 topic = %self.specialized_node_invite_topic,
199 nonce = %hex::encode(nonce),
200 "Broadcasting specialized node invite discovery"
201 );
202
203 self.node_manager
207 .send(NodeMessage::RegisterPendingSpecializedNodeInvite {
208 request: RegisterPendingSpecializedNodeInvite {
209 nonce,
210 context_id,
211 inviter_id,
212 },
213 })
214 .await
215 .expect("Mailbox not to be dropped");
216
217 let payload = BroadcastMessage::SpecializedNodeDiscovery { nonce, node_type };
219 let payload = borsh::to_vec(&payload)?;
220 let topic = IdentTopic::new(self.specialized_node_invite_topic.to_owned());
221 let result = self.network_client.publish(topic.hash(), payload).await;
222
223 if result.is_err() {
225 self.node_manager
226 .send(NodeMessage::RemovePendingSpecializedNodeInvite {
227 request: RemovePendingSpecializedNodeInvite { nonce },
228 })
229 .await
230 .expect("Mailbox not to be dropped");
231 }
232
233 let _ignored = result?;
234
235 Ok(nonce)
236 }
237
238 pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
239 let _ignored = self.event_sender.subscribe();
242
243 let _ignored = self
244 .event_sender
245 .send(event)
246 .wrap_err("failed to send event")?;
248
249 Ok(())
250 }
251
252 pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
253 let mut receiver = self.event_sender.subscribe();
254
255 stream! {
256 loop {
257 match receiver.recv().await {
258 Ok(event) => yield event,
259 Err(broadcast::error::RecvError::Closed) => break,
260 Err(broadcast::error::RecvError::Lagged(_)) => {},
262 }
263 }
264 }
265 }
266
267 pub async fn sync(
268 &self,
269 context_id: Option<&ContextId>,
270 peer_id: Option<&PeerId>,
271 ) -> eyre::Result<()> {
272 self.ctx_sync_tx
273 .send((context_id.copied(), peer_id.copied()))
274 .await?;
275
276 Ok(())
277 }
278}