calimero_node_primitives/
client.rs

1#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
2
3use std::borrow::Cow;
4use std::num::NonZeroUsize;
5
6use async_stream::stream;
7use calimero_blobstore::BlobManager;
8use calimero_crypto::SharedKey;
9use calimero_network_primitives::client::NetworkClient;
10use calimero_primitives::context::{Context, ContextId};
11use calimero_primitives::events::NodeEvent;
12use calimero_primitives::identity::{PrivateKey, PublicKey};
13use calimero_store::Store;
14use calimero_utils_actix::LazyRecipient;
15use eyre::{OptionExt, WrapErr};
16use futures_util::Stream;
17use libp2p::gossipsub::{IdentTopic, TopicHash};
18use libp2p::PeerId;
19use rand::Rng;
20use tokio::sync::{broadcast, mpsc};
21use tracing::info;
22
23use crate::messages::NodeMessage;
24use crate::sync::BroadcastMessage;
25
26mod alias;
27mod application;
28mod blob;
29
30#[derive(Clone, Debug)]
31pub struct NodeClient {
32    datastore: Store,
33    blobstore: BlobManager,
34    network_client: NetworkClient,
35    node_manager: LazyRecipient<NodeMessage>,
36    event_sender: broadcast::Sender<NodeEvent>,
37    ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
38}
39
40impl NodeClient {
41    pub fn new(
42        datastore: Store,
43        blobstore: BlobManager,
44        network_client: NetworkClient,
45        node_manager: LazyRecipient<NodeMessage>,
46        event_sender: broadcast::Sender<NodeEvent>,
47        ctx_sync_tx: mpsc::Sender<(Option<ContextId>, Option<PeerId>)>,
48    ) -> Self {
49        Self {
50            datastore,
51            blobstore,
52            network_client,
53            node_manager,
54            event_sender,
55            ctx_sync_tx,
56        }
57    }
58
59    pub async fn subscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
60        let topic = IdentTopic::new(context_id);
61
62        let _ignored = self.network_client.subscribe(topic).await?;
63
64        info!(%context_id, "Subscribed to context");
65
66        Ok(())
67    }
68
69    pub async fn unsubscribe(&self, context_id: &ContextId) -> eyre::Result<()> {
70        let topic = IdentTopic::new(context_id);
71
72        let _ignored = self.network_client.unsubscribe(topic).await?;
73
74        info!(%context_id, "Unsubscribed from context");
75
76        Ok(())
77    }
78
79    pub async fn get_peers_count(&self, context: Option<&ContextId>) -> usize {
80        let Some(context) = context else {
81            return self.network_client.peer_count().await;
82        };
83
84        let topic = TopicHash::from_raw(*context);
85
86        self.network_client.mesh_peer_count(topic).await
87    }
88
89    pub async fn broadcast(
90        &self,
91        context: &Context,
92        sender: &PublicKey,
93        sender_key: &PrivateKey,
94        artifact: Vec<u8>,
95        height: NonZeroUsize,
96        events: Option<Vec<u8>>,
97    ) -> eyre::Result<()> {
98        info!(
99            context_id=%context.id,
100            %sender,
101            root_hash=%context.root_hash,
102            "Sending state delta"
103        );
104
105        if self.get_peers_count(Some(&context.id)).await == 0 {
106            return Ok(());
107        }
108
109        let shared_key = SharedKey::from_sk(sender_key);
110        let nonce = rand::thread_rng().gen();
111
112        let encrypted = shared_key
113            .encrypt(artifact, nonce)
114            .ok_or_eyre("failed to encrypt artifact")?;
115
116        let payload = BroadcastMessage::StateDelta {
117            context_id: context.id,
118            author_id: *sender,
119            root_hash: context.root_hash,
120            artifact: encrypted.into(),
121            height,
122            nonce,
123            events: events.map(Cow::from),
124        };
125
126        let payload = borsh::to_vec(&payload)?;
127
128        let topic = TopicHash::from_raw(context.id);
129
130        let _ignored = self.network_client.publish(topic, payload).await?;
131
132        Ok(())
133    }
134
135    pub fn send_event(&self, event: NodeEvent) -> eyre::Result<()> {
136        // the caller doesn't care if there are no receivers
137        // so we create a temporary receiver
138        let _ignored = self.event_sender.subscribe();
139
140        let _ignored = self
141            .event_sender
142            .send(event)
143            // this should in-theory never happen, but just in case
144            .wrap_err("failed to send event")?;
145
146        Ok(())
147    }
148
149    pub fn receive_events(&self) -> impl Stream<Item = NodeEvent> {
150        let mut receiver = self.event_sender.subscribe();
151
152        stream! {
153            loop {
154                match receiver.recv().await {
155                    Ok(event) => yield event,
156                    Err(broadcast::error::RecvError::Closed) => break,
157                    // oh, we missed a message? let's.. just ignore it
158                    Err(broadcast::error::RecvError::Lagged(_)) => {},
159                }
160            }
161        }
162    }
163
164    pub async fn sync(
165        &self,
166        context_id: Option<&ContextId>,
167        peer_id: Option<&PeerId>,
168    ) -> eyre::Result<()> {
169        self.ctx_sync_tx
170            .send((context_id.copied(), peer_id.copied()))
171            .await?;
172
173        Ok(())
174    }
175}