calimero_node_primitives/
client.rs1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::borrow::Cow;
4use std::num::NonZeroUsize;
5
6use async_stream::stream;
7use calimero_blobstore::BlobManager;
8use calimero_crypto::SharedKey;
9use calimero_network_primitives::client::NetworkClient;
10use calimero_primitives::context::{Context, ContextId};
11use calimero_primitives::events::NodeEvent;
12use calimero_primitives::identity::{PrivateKey, PublicKey};
13use calimero_store::Store;
14use calimero_utils_actix::LazyRecipient;
15use eyre::{OptionExt, WrapErr};
16use futures_util::Stream;
17use libp2p::gossipsub::{IdentTopic, TopicHash};
18use libp2p::PeerId;
19use rand::Rng;
20use tokio::sync::{broadcast, mpsc};
21use tracing::info;
22
23use crate::messages::NodeMessage;
24use crate::sync::BroadcastMessage;
25
26mod alias;
27mod application;
28mod blob;
29
30#[derive(Clone, Debug)]
31pub struct NodeClient {
32 datastore: Store,
33 blobstore: BlobManager,
34 network_client: NetworkClient,
35 node_manager: LazyRecipient<NodeMessage>,
36 event_sender: broadcast::Sender<NodeEvent>,
37 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
38}
39
40impl NodeClient {
41 pub fn new(
42 datastore: Store,
43 blobstore: BlobManager,
44 network_client: NetworkClient,
45 node_manager: LazyRecipient<NodeMessage>,
46 event_sender: broadcast::Sender<NodeEvent>,
47 ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
48 ) -> Self {
49 Self {
50 datastore,
51 blobstore,
52 network_client,
53 node_manager,
54 event_sender,
55 ctx_sync_tx,
56 }
57 }
58
59 pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
60 let topic = IdentTopic::new(context_id);
61
62 let _ignored = self.network_client.subscribe(topic).await?;
63
64 info!(%context_id, "Subscribed to context");
65
66 Ok(())
67 }
68
69 pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
70 let topic = IdentTopic::new(context_id);
71
72 let _ignored = self.network_client.unsubscribe(topic).await?;
73
74 info!(%context_id, "Unsubscribed from context");
75
76 Ok(())
77 }
78
79 pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
80 let Some(context) = context else {
81 return self.network_client.peer_count().await;
82 };
83
84 let topic = TopicHash::from_raw(*context);
85
86 self.network_client.mesh_peer_count(topic).await
87 }
88
89 pub async fn broadcast(
90 &self,
91 context: &Context,
92 sender: &PublicKey,
93 sender_key: &PrivateKey,
94 artifact: Vec<u8>,
95 height: NonZeroUsize,
96 events: Option<Vec<u8>>,
97 ) -> eyre::Result<()> {
98 info!(
99 context_id=%context.id,
100 %sender,
101 root_hash=%context.root_hash,
102 "Sending state delta"
103 );
104
105 if self.get_peers_count(Some(&context.id)).await == 0 {
106 return Ok(());
107 }
108
109 let shared_key = SharedKey::from_sk(sender_key);
110 let nonce = rand::thread_rng().gen();
111
112 let encrypted = shared_key
113 .encrypt(artifact, nonce)
114 .ok_or_eyre("failed to encrypt artifact")?;
115
116 let payload = BroadcastMessage::StateDelta {
117 context_id: context.id,
118 author_id: *sender,
119 root_hash: context.root_hash,
120 artifact: encrypted.into(),
121 height,
122 nonce,
123 events: events.map(Cow::from),
124 };
125
126 let payload = borsh::to_vec(&payload)?;
127
128 let topic = TopicHash::from_raw(context.id);
129
130 let _ignored = self.network_client.publish(topic, payload).await?;
131
132 Ok(())
133 }
134
135 pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
136 let _ignored = self.event_sender.subscribe();
139
140 let _ignored = self
141 .event_sender
142 .send(event)
143 .wrap_err("failed to send event")?;
145
146 Ok(())
147 }
148
149 pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
150 let mut receiver = self.event_sender.subscribe();
151
152 stream! {
153 loop {
154 match receiver.recv().await {
155 Ok(event) => yield event,
156 Err(broadcast::error::RecvError::Closed) => break,
157 Err(broadcast::error::RecvError::Lagged(_)) => {},
159 }
160 }
161 }
162 }
163
164 pub async fn sync(
165 &self,
166 context_id: Option<&ContextId>,
167 peer_id: Option<&PeerId>,
168 ) -> eyre::Result<()> {
169 self.ctx_sync_tx
170 .send((context_id.copied(), peer_id.copied()))
171 .await?;
172
173 Ok(())
174 }
175}