calimero_node_primitives/
client.rs1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::num::NonZeroUsize;
4
5use async_stream::stream;
6use calimero_blobstore::BlobManager;
7use calimero_crypto::SharedKey;
8use calimero_network_primitives::client::NetworkClient;
9use calimero_primitives::context::{Context, ContextId};
10use calimero_primitives::events::NodeEvent;
11use calimero_primitives::identity::{PrivateKey, PublicKey};
12use calimero_store::Store;
13use calimero_utils_actix::LazyRecipient;
14use eyre::{OptionExt, WrapErr};
15use futures_util::Stream;
16use libp2p::gossipsub::{IdentTopic, TopicHash};
17use libp2p::PeerId;
18use rand::Rng;
19use tokio::sync::{broadcast, mpsc};
20use tracing::{debug, info};
21
22use crate::messages::NodeMessage;
23use crate::sync::BroadcastMessage;
24
25mod alias;
26mod application;
27mod blob;
28
29#[derive(Clone, Debug)]
30pub struct NodeClient {
31 datastore: Store,
32 blobstore: BlobManager,
33 network_client: NetworkClient,
34 node_manager: LazyRecipient<NodeMessage>,
35 event_sender: broadcast::Sender<NodeEvent>,
36 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
37}
38
39impl NodeClient {
40 pub fn new(
41 datastore: Store,
42 blobstore: BlobManager,
43 network_client: NetworkClient,
44 node_manager: LazyRecipient<NodeMessage>,
45 event_sender: broadcast::Sender<NodeEvent>,
46 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
47 ) -> Self {
48 Self {
49 datastore,
50 blobstore,
51 network_client,
52 node_manager,
53 event_sender,
54 ctx_sync_tx,
55 }
56 }
57
58 pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
59 let topic = IdentTopic::new(context_id);
60
61 let _ignored = self.network_client.subscribe(topic).await?;
62
63 info!(%context_id, "Subscribed to context");
64
65 Ok(())
66 }
67
68 pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
69 let topic = IdentTopic::new(context_id);
70
71 let _ignored = self.network_client.unsubscribe(topic).await?;
72
73 info!(%context_id, "Unsubscribed from context");
74
75 Ok(())
76 }
77
78 pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
79 let Some(context) = context else {
80 return self.network_client.peer_count().await;
81 };
82
83 let topic = TopicHash::from_raw(*context);
84
85 self.network_client.mesh_peer_count(topic).await
86 }
87
88 pub async fn broadcast(
89 &self,
90 context: &Context,
91 sender: &PublicKey,
92 sender_key: &PrivateKey,
93 artifact: Vec<u8>,
94 height: NonZeroUsize,
95 ) -> eyre::Result<()> {
96 debug!(
97 context_id=%context.id,
98 %sender,
99 root_hash=%context.root_hash,
100 "Sending state delta"
101 );
102
103 if self.get_peers_count(Some(&context.id)).await == 0 {
104 return Ok(());
105 }
106
107 let shared_key = SharedKey::from_sk(sender_key);
108 let nonce = rand::thread_rng().gen();
109
110 let encrypted = shared_key
111 .encrypt(artifact, nonce)
112 .ok_or_eyre("failed to encrypt artifact")?;
113
114 let payload = BroadcastMessage::StateDelta {
115 context_id: context.id,
116 author_id: *sender,
117 root_hash: context.root_hash,
118 artifact: encrypted.into(),
119 height,
120 nonce,
121 };
122
123 let payload = borsh::to_vec(&payload)?;
124
125 let topic = TopicHash::from_raw(context.id);
126
127 let _ignored = self.network_client.publish(topic, payload).await?;
128
129 Ok(())
130 }
131
132 pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
133 let _ignored = self.event_sender.subscribe();
136
137 let _ignored = self
138 .event_sender
139 .send(event)
140 .wrap_err("failed to send event")?;
142
143 Ok(())
144 }
145
146 pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
147 let mut receiver = self.event_sender.subscribe();
148
149 stream! {
150 loop {
151 match receiver.recv().await {
152 Ok(event) => yield event,
153 Err(broadcast::error::RecvError::Closed) => break,
154 Err(broadcast::error::RecvError::Lagged(_)) => {},
156 }
157 }
158 }
159 }
160
161 pub async fn sync(
162 &self,
163 context_id: Option<&ContextId>,
164 peer_id: Option<&PeerId>,
165 ) -> eyre::Result<()> {
166 self.ctx_sync_tx
167 .send((context_id.copied(), peer_id.copied()))
168 .await?;
169
170 Ok(())
171 }
172}