calimero_network_primitives/
client.rs

1use calimero_primitives::blobs::BlobId;
2use calimero_primitives::context::ContextId;
3use calimero_utils_actix::LazyRecipient;
4use libp2p::gossipsub::{IdentTopic, MessageId, TopicHash};
5use libp2p::Multiaddr;
6use tokio::sync::oneshot;
7
8use crate::messages::{
9    AnnounceBlob, Bootstrap, Dial, ListenOn, MeshPeerCount, MeshPeers, NetworkMessage, OpenStream,
10    PeerCount, Publish, QueryBlob, RequestBlob, Subscribe, Unsubscribe,
11};
12use crate::stream::Stream;
13
14#[derive(Clone, Debug)]
15pub struct NetworkClient {
16    network_manager: LazyRecipient<NetworkMessage>,
17}
18
19impl NetworkClient {
20    pub const fn new(network_manager: LazyRecipient<NetworkMessage>) -> Self {
21        Self { network_manager }
22    }
23
24    pub async fn dial(&self, peer_addr: Multiaddr) -> eyre::Result<()> {
25        let (tx, rx) = oneshot::channel();
26
27        self.network_manager
28            .send(NetworkMessage::Dial {
29                request: Dial(peer_addr),
30                outcome: tx,
31            })
32            .await
33            .expect("Mailbox not to be dropped");
34
35        rx.await.expect("Mailbox not to be dropped")
36    }
37
38    pub async fn listen_on(&self, addr: Multiaddr) -> eyre::Result<()> {
39        let (tx, rx) = oneshot::channel();
40
41        self.network_manager
42            .send(NetworkMessage::ListenOn {
43                request: ListenOn(addr),
44                outcome: tx,
45            })
46            .await
47            .expect("Mailbox not to be dropped");
48
49        rx.await.expect("Mailbox not to be dropped")
50    }
51
52    pub async fn bootstrap(&self) -> eyre::Result<()> {
53        let (tx, rx) = oneshot::channel();
54
55        let _result = self
56            .network_manager
57            .send(NetworkMessage::Bootstrap {
58                request: Bootstrap,
59                outcome: tx,
60            })
61            .await
62            .expect("Mailbox not to be dropped");
63
64        rx.await.expect("Mailbox not to be dropped")
65    }
66
67    pub async fn subscribe(&self, topic: IdentTopic) -> eyre::Result<IdentTopic> {
68        let (tx, rx) = oneshot::channel();
69
70        self.network_manager
71            .send(NetworkMessage::Subscribe {
72                request: Subscribe(topic),
73                outcome: tx,
74            })
75            .await
76            .expect("Mailbox not to be dropped");
77
78        rx.await.expect("Mailbox not to be dropped")
79    }
80
81    pub async fn unsubscribe(&self, topic: IdentTopic) -> eyre::Result<IdentTopic> {
82        let (tx, rx) = oneshot::channel();
83
84        self.network_manager
85            .send(NetworkMessage::Unsubscribe {
86                request: Unsubscribe(topic),
87                outcome: tx,
88            })
89            .await
90            .expect("Mailbox not to be dropped");
91
92        rx.await.expect("Mailbox not to be dropped")
93    }
94
95    pub async fn publish(&self, topic: TopicHash, data: Vec<u8>) -> eyre::Result<MessageId> {
96        let (tx, rx) = oneshot::channel();
97
98        self.network_manager
99            .send(NetworkMessage::Publish {
100                request: Publish { topic, data },
101                outcome: tx,
102            })
103            .await
104            .expect("Mailbox not to be dropped");
105
106        rx.await.expect("Mailbox not to be dropped")
107    }
108
109    pub async fn open_stream(&self, peer_id: libp2p::PeerId) -> eyre::Result<Stream> {
110        let (tx, rx) = oneshot::channel();
111
112        self.network_manager
113            .send(NetworkMessage::OpenStream {
114                request: OpenStream(peer_id),
115                outcome: tx,
116            })
117            .await
118            .expect("Mailbox not to be dropped");
119
120        rx.await.expect("Mailbox not to be dropped")
121    }
122
123    pub async fn peer_count(&self) -> usize {
124        let (tx, rx) = oneshot::channel();
125
126        self.network_manager
127            .send(NetworkMessage::PeerCount {
128                request: PeerCount,
129                outcome: tx,
130            })
131            .await
132            .expect("Mailbox not to be dropped");
133
134        rx.await.expect("Mailbox not to be dropped")
135    }
136
137    pub async fn mesh_peer_count(&self, topic: TopicHash) -> usize {
138        let (tx, rx) = oneshot::channel();
139
140        self.network_manager
141            .send(NetworkMessage::MeshPeerCount {
142                request: MeshPeerCount(topic),
143                outcome: tx,
144            })
145            .await
146            .expect("Mailbox not to be dropped");
147
148        rx.await.expect("Mailbox not to be dropped")
149    }
150
151    pub async fn mesh_peers(&self, topic: TopicHash) -> Vec<libp2p::PeerId> {
152        let (tx, rx) = oneshot::channel();
153
154        self.network_manager
155            .send(NetworkMessage::MeshPeers {
156                request: MeshPeers(topic),
157                outcome: tx,
158            })
159            .await
160            .expect("Mailbox not to be dropped");
161
162        rx.await.expect("Mailbox not to be dropped")
163    }
164
165    // Blob discovery methods
166
167    /// Announce a blob to the DHT for a specific context
168    pub async fn announce_blob(
169        &self,
170        blob_id: BlobId,
171        context_id: ContextId,
172        size: u64,
173    ) -> eyre::Result<()> {
174        let (tx, rx) = oneshot::channel();
175
176        self.network_manager
177            .send(NetworkMessage::AnnounceBlob {
178                request: AnnounceBlob {
179                    blob_id,
180                    context_id,
181                    size,
182                },
183                outcome: tx,
184            })
185            .await
186            .expect("Mailbox not to be dropped");
187
188        rx.await.expect("Mailbox not to be dropped")
189    }
190
191    /// Query the DHT for peers that have a specific blob
192    pub async fn query_blob(
193        &self,
194        blob_id: BlobId,
195        context_id: Option<ContextId>,
196    ) -> eyre::Result<Vec<libp2p::PeerId>> {
197        let (tx, rx) = oneshot::channel();
198
199        self.network_manager
200            .send(NetworkMessage::QueryBlob {
201                request: QueryBlob {
202                    blob_id,
203                    context_id,
204                },
205                outcome: tx,
206            })
207            .await
208            .expect("Mailbox not to be dropped");
209
210        rx.await.expect("Mailbox not to be dropped")
211    }
212
213    /// Request a blob from a specific peer
214    pub async fn request_blob(
215        &self,
216        blob_id: BlobId,
217        context_id: ContextId,
218        peer_id: libp2p::PeerId,
219    ) -> eyre::Result<Option<Vec<u8>>> {
220        let (tx, rx) = oneshot::channel();
221
222        self.network_manager
223            .send(NetworkMessage::RequestBlob {
224                request: RequestBlob {
225                    blob_id,
226                    context_id,
227                    peer_id,
228                },
229                outcome: tx,
230            })
231            .await
232            .expect("Mailbox not to be dropped");
233
234        rx.await.expect("Mailbox not to be dropped")
235    }
236}