calimero_network_primitives/
client.rs1use calimero_primitives::blobs::BlobId;
2use calimero_primitives::context::ContextId;
3use calimero_utils_actix::LazyRecipient;
4use libp2p::gossipsub::{IdentTopic, MessageId, TopicHash};
5use libp2p::Multiaddr;
6use tokio::sync::oneshot;
7
8use crate::messages::{
9 AnnounceBlob, Bootstrap, Dial, ListenOn, MeshPeerCount, MeshPeers, NetworkMessage, OpenStream,
10 PeerCount, Publish, QueryBlob, RequestBlob, Subscribe, Unsubscribe,
11};
12use crate::stream::Stream;
13
14#[derive(Clone, Debug)]
15pub struct NetworkClient {
16 network_manager: LazyRecipient<NetworkMessage>,
17}
18
19impl NetworkClient {
20 pub const fn new(network_manager: LazyRecipient<NetworkMessage>) -> Self {
21 Self { network_manager }
22 }
23
24 pub async fn dial(&self, peer_addr: Multiaddr) -> eyre::Result<()> {
25 let (tx, rx) = oneshot::channel();
26
27 self.network_manager
28 .send(NetworkMessage::Dial {
29 request: Dial(peer_addr),
30 outcome: tx,
31 })
32 .await
33 .expect("Mailbox not to be dropped");
34
35 rx.await.expect("Mailbox not to be dropped")
36 }
37
38 pub async fn listen_on(&self, addr: Multiaddr) -> eyre::Result<()> {
39 let (tx, rx) = oneshot::channel();
40
41 self.network_manager
42 .send(NetworkMessage::ListenOn {
43 request: ListenOn(addr),
44 outcome: tx,
45 })
46 .await
47 .expect("Mailbox not to be dropped");
48
49 rx.await.expect("Mailbox not to be dropped")
50 }
51
52 pub async fn bootstrap(&self) -> eyre::Result<()> {
53 let (tx, rx) = oneshot::channel();
54
55 let _result = self
56 .network_manager
57 .send(NetworkMessage::Bootstrap {
58 request: Bootstrap,
59 outcome: tx,
60 })
61 .await
62 .expect("Mailbox not to be dropped");
63
64 rx.await.expect("Mailbox not to be dropped")
65 }
66
67 pub async fn subscribe(&self, topic: IdentTopic) -> eyre::Result<IdentTopic> {
68 let (tx, rx) = oneshot::channel();
69
70 self.network_manager
71 .send(NetworkMessage::Subscribe {
72 request: Subscribe(topic),
73 outcome: tx,
74 })
75 .await
76 .expect("Mailbox not to be dropped");
77
78 rx.await.expect("Mailbox not to be dropped")
79 }
80
81 pub async fn unsubscribe(&self, topic: IdentTopic) -> eyre::Result<IdentTopic> {
82 let (tx, rx) = oneshot::channel();
83
84 self.network_manager
85 .send(NetworkMessage::Unsubscribe {
86 request: Unsubscribe(topic),
87 outcome: tx,
88 })
89 .await
90 .expect("Mailbox not to be dropped");
91
92 rx.await.expect("Mailbox not to be dropped")
93 }
94
95 pub async fn publish(&self, topic: TopicHash, data: Vec<u8>) -> eyre::Result<MessageId> {
96 let (tx, rx) = oneshot::channel();
97
98 self.network_manager
99 .send(NetworkMessage::Publish {
100 request: Publish { topic, data },
101 outcome: tx,
102 })
103 .await
104 .expect("Mailbox not to be dropped");
105
106 rx.await.expect("Mailbox not to be dropped")
107 }
108
109 pub async fn open_stream(&self, peer_id: libp2p::PeerId) -> eyre::Result<Stream> {
110 let (tx, rx) = oneshot::channel();
111
112 self.network_manager
113 .send(NetworkMessage::OpenStream {
114 request: OpenStream(peer_id),
115 outcome: tx,
116 })
117 .await
118 .expect("Mailbox not to be dropped");
119
120 rx.await.expect("Mailbox not to be dropped")
121 }
122
123 pub async fn peer_count(&self) -> usize {
124 let (tx, rx) = oneshot::channel();
125
126 self.network_manager
127 .send(NetworkMessage::PeerCount {
128 request: PeerCount,
129 outcome: tx,
130 })
131 .await
132 .expect("Mailbox not to be dropped");
133
134 rx.await.expect("Mailbox not to be dropped")
135 }
136
137 pub async fn mesh_peer_count(&self, topic: TopicHash) -> usize {
138 let (tx, rx) = oneshot::channel();
139
140 self.network_manager
141 .send(NetworkMessage::MeshPeerCount {
142 request: MeshPeerCount(topic),
143 outcome: tx,
144 })
145 .await
146 .expect("Mailbox not to be dropped");
147
148 rx.await.expect("Mailbox not to be dropped")
149 }
150
151 pub async fn mesh_peers(&self, topic: TopicHash) -> Vec<libp2p::PeerId> {
152 let (tx, rx) = oneshot::channel();
153
154 self.network_manager
155 .send(NetworkMessage::MeshPeers {
156 request: MeshPeers(topic),
157 outcome: tx,
158 })
159 .await
160 .expect("Mailbox not to be dropped");
161
162 rx.await.expect("Mailbox not to be dropped")
163 }
164
165 pub async fn announce_blob(
169 &self,
170 blob_id: BlobId,
171 context_id: ContextId,
172 size: u64,
173 ) -> eyre::Result<()> {
174 let (tx, rx) = oneshot::channel();
175
176 self.network_manager
177 .send(NetworkMessage::AnnounceBlob {
178 request: AnnounceBlob {
179 blob_id,
180 context_id,
181 size,
182 },
183 outcome: tx,
184 })
185 .await
186 .expect("Mailbox not to be dropped");
187
188 rx.await.expect("Mailbox not to be dropped")
189 }
190
191 pub async fn query_blob(
193 &self,
194 blob_id: BlobId,
195 context_id: Option<ContextId>,
196 ) -> eyre::Result<Vec<libp2p::PeerId>> {
197 let (tx, rx) = oneshot::channel();
198
199 self.network_manager
200 .send(NetworkMessage::QueryBlob {
201 request: QueryBlob {
202 blob_id,
203 context_id,
204 },
205 outcome: tx,
206 })
207 .await
208 .expect("Mailbox not to be dropped");
209
210 rx.await.expect("Mailbox not to be dropped")
211 }
212
213 pub async fn request_blob(
215 &self,
216 blob_id: BlobId,
217 context_id: ContextId,
218 peer_id: libp2p::PeerId,
219 ) -> eyre::Result<Option<Vec<u8>>> {
220 let (tx, rx) = oneshot::channel();
221
222 self.network_manager
223 .send(NetworkMessage::RequestBlob {
224 request: RequestBlob {
225 blob_id,
226 context_id,
227 peer_id,
228 },
229 outcome: tx,
230 })
231 .await
232 .expect("Mailbox not to be dropped");
233
234 rx.await.expect("Mailbox not to be dropped")
235 }
236}