calimero_node_primitives/
client.rs

1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::num::NonZeroUsize;
4
5use async_stream::stream;
6use calimero_blobstore::BlobManager;
7use calimero_crypto::SharedKey;
8use calimero_network_primitives::client::NetworkClient;
9use calimero_primitives::context::{Context, ContextId};
10use calimero_primitives::events::NodeEvent;
11use calimero_primitives::identity::{PrivateKey, PublicKey};
12use calimero_store::Store;
13use calimero_utils_actix::LazyRecipient;
14use eyre::{OptionExt, WrapErr};
15use futures_util::Stream;
16use libp2p::gossipsub::{IdentTopic, TopicHash};
17use libp2p::PeerId;
18use rand::Rng;
19use tokio::sync::{broadcast, mpsc};
20use tracing::{debug, info};
21
22use crate::messages::NodeMessage;
23use crate::sync::BroadcastMessage;
24
25mod alias;
26mod application;
27mod blob;
28
29#[derive(Clone, Debug)]
30pub struct NodeClient {
31    datastore: Store,
32    blobstore: BlobManager,
33    network_client: NetworkClient,
34    node_manager: LazyRecipient<NodeMessage>,
35    event_sender: broadcast::Sender<NodeEvent>,
36    ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
37}
38
39impl NodeClient {
40    pub fn new(
41        datastore: Store,
42        blobstore: BlobManager,
43        network_client: NetworkClient,
44        node_manager: LazyRecipient<NodeMessage>,
45        event_sender: broadcast::Sender<NodeEvent>,
46        ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
47    ) -> Self {
48        Self {
49            datastore,
50            blobstore,
51            network_client,
52            node_manager,
53            event_sender,
54            ctx_sync_tx,
55        }
56    }
57
58    pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
59        let topic = IdentTopic::new(context_id);
60
61        let _ignored = self.network_client.subscribe(topic).await?;
62
63        info!(%context_id, "Subscribed to context");
64
65        Ok(())
66    }
67
68    pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
69        let topic = IdentTopic::new(context_id);
70
71        let _ignored = self.network_client.unsubscribe(topic).await?;
72
73        info!(%context_id, "Unsubscribed from context");
74
75        Ok(())
76    }
77
78    pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
79        let Some(context) = context else {
80            return self.network_client.peer_count().await;
81        };
82
83        let topic = TopicHash::from_raw(*context);
84
85        self.network_client.mesh_peer_count(topic).await
86    }
87
88    pub async fn broadcast(
89        &self,
90        context: &Context,
91        sender: &PublicKey,
92        sender_key: &PrivateKey,
93        artifact: Vec<u8>,
94        height: NonZeroUsize,
95    ) -> eyre::Result<()> {
96        debug!(
97            context_id=%context.id,
98            %sender,
99            root_hash=%context.root_hash,
100            "Sending state delta"
101        );
102
103        if self.get_peers_count(Some(&context.id)).await == 0 {
104            return Ok(());
105        }
106
107        let shared_key = SharedKey::from_sk(sender_key);
108        let nonce = rand::thread_rng().gen();
109
110        let encrypted = shared_key
111            .encrypt(artifact, nonce)
112            .ok_or_eyre("failed to encrypt artifact")?;
113
114        let payload = BroadcastMessage::StateDelta {
115            context_id: context.id,
116            author_id: *sender,
117            root_hash: context.root_hash,
118            artifact: encrypted.into(),
119            height,
120            nonce,
121        };
122
123        let payload = borsh::to_vec(&payload)?;
124
125        let topic = TopicHash::from_raw(context.id);
126
127        let _ignored = self.network_client.publish(topic, payload).await?;
128
129        Ok(())
130    }
131
132    pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
133        // the caller doesn't care if there are no receivers
134        // so we create a temporary receiver
135        let _ignored = self.event_sender.subscribe();
136
137        let _ignored = self
138            .event_sender
139            .send(event)
140            // this should in-theory never happen, but just in case
141            .wrap_err("failed to send event")?;
142
143        Ok(())
144    }
145
146    pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
147        let mut receiver = self.event_sender.subscribe();
148
149        stream! {
150            loop {
151                match receiver.recv().await {
152                    Ok(event) => yield event,
153                    Err(broadcast::error::RecvError::Closed) => break,
154                    // oh, we missed a message? let's.. just ignore it
155                    Err(broadcast::error::RecvError::Lagged(_)) => {},
156                }
157            }
158        }
159    }
160
161    pub async fn sync(
162        &self,
163        context_id: Option<&ContextId>,
164        peer_id: Option<&PeerId>,
165    ) -> eyre::Result<()> {
166        self.ctx_sync_tx
167            .send((context_id.copied(), peer_id.copied()))
168            .await?;
169
170        Ok(())
171    }
172}