calimero_network_primitives/
messages.rs

1use calimero_primitives::blobs::BlobId;
2use calimero_primitives::context::ContextId;
3use libp2p::core::transport::ListenerId;
4pub use libp2p::gossipsub::{IdentTopic, Message, MessageId, TopicHash};
5pub use libp2p::PeerId;
6use libp2p::{Multiaddr, StreamProtocol};
7use tokio::sync::oneshot;
8
9use crate::stream::Stream;
10
11#[derive(Debug, actix::Message)]
12#[rtype("()")]
13pub enum NetworkMessage {
14    Dial {
15        request: Dial,
16        outcome: oneshot::Sender<<Dial as actix::Message>::Result>,
17    },
18    ListenOn {
19        request: ListenOn,
20        outcome: oneshot::Sender<<ListenOn as actix::Message>::Result>,
21    },
22    Bootstrap {
23        request: Bootstrap,
24        outcome: oneshot::Sender<<Bootstrap as actix::Message>::Result>,
25    },
26    Subscribe {
27        request: Subscribe,
28        outcome: oneshot::Sender<<Subscribe as actix::Message>::Result>,
29    },
30    Unsubscribe {
31        request: Unsubscribe,
32        outcome: oneshot::Sender<<Unsubscribe as actix::Message>::Result>,
33    },
34    Publish {
35        request: Publish,
36        outcome: oneshot::Sender<<Publish as actix::Message>::Result>,
37    },
38    OpenStream {
39        request: OpenStream,
40        outcome: oneshot::Sender<<OpenStream as actix::Message>::Result>,
41    },
42    PeerCount {
43        request: PeerCount,
44        outcome: oneshot::Sender<<PeerCount as actix::Message>::Result>,
45    },
46    MeshPeers {
47        request: MeshPeers,
48        outcome: oneshot::Sender<<MeshPeers as actix::Message>::Result>,
49    },
50    MeshPeerCount {
51        request: MeshPeerCount,
52        outcome: oneshot::Sender<<MeshPeerCount as actix::Message>::Result>,
53    },
54    // Blob discovery messages
55    AnnounceBlob {
56        request: AnnounceBlob,
57        outcome: oneshot::Sender<<AnnounceBlob as actix::Message>::Result>,
58    },
59    QueryBlob {
60        request: QueryBlob,
61        outcome: oneshot::Sender<<QueryBlob as actix::Message>::Result>,
62    },
63    RequestBlob {
64        request: RequestBlob,
65        outcome: oneshot::Sender<<RequestBlob as actix::Message>::Result>,
66    },
67}
68
69#[derive(Clone, Copy, Debug)]
70pub struct Bootstrap;
71
72impl actix::Message for Bootstrap {
73    type Result = eyre::Result<()>;
74}
75
76#[derive(Clone, Debug)]
77pub struct Dial(pub Multiaddr);
78
79impl actix::Message for Dial {
80    type Result = eyre::Result<()>;
81}
82
83#[derive(Clone, Debug)]
84pub struct ListenOn(pub Multiaddr);
85
86impl actix::Message for ListenOn {
87    type Result = eyre::Result<()>;
88}
89
90#[derive(Clone, Debug)]
91pub struct MeshPeerCount(pub TopicHash);
92
93impl actix::Message for MeshPeerCount {
94    type Result = usize;
95}
96
97#[derive(Clone, Debug)]
98pub struct MeshPeers(pub TopicHash);
99
100impl actix::Message for MeshPeers {
101    type Result = Vec<PeerId>;
102}
103
104#[derive(Clone, Copy, Debug)]
105pub struct OpenStream(pub PeerId);
106
107impl actix::Message for OpenStream {
108    type Result = eyre::Result<Stream>;
109}
110
111#[derive(Clone, Copy, Debug)]
112pub struct PeerCount;
113
114impl actix::Message for PeerCount {
115    type Result = usize;
116}
117
118#[derive(Clone, Debug)]
119pub struct Publish {
120    pub topic: TopicHash,
121    pub data: Vec<u8>,
122}
123
124impl actix::Message for Publish {
125    type Result = eyre::Result<MessageId>;
126}
127
128#[derive(Clone, Debug)]
129pub struct Subscribe(pub IdentTopic);
130
131impl actix::Message for Subscribe {
132    type Result = eyre::Result<IdentTopic>;
133}
134
135#[derive(Clone, Debug)]
136pub struct Unsubscribe(pub IdentTopic);
137
138impl actix::Message for Unsubscribe {
139    type Result = eyre::Result<IdentTopic>;
140}
141
142// Blob discovery messages
143
144/// Announce a blob to the DHT for a specific context
145#[derive(Clone, Copy, Debug)]
146pub struct AnnounceBlob {
147    pub blob_id: BlobId,
148    pub context_id: ContextId,
149    pub size: u64,
150}
151
152impl actix::Message for AnnounceBlob {
153    type Result = eyre::Result<()>;
154}
155
156/// Query for blob availability in the DHT
157#[derive(Clone, Copy, Debug)]
158pub struct QueryBlob {
159    pub blob_id: BlobId,
160    pub context_id: Option<ContextId>, // None for global queries
161}
162
163impl actix::Message for QueryBlob {
164    type Result = eyre::Result<Vec<PeerId>>;
165}
166
167/// Request a blob from a specific peer
168#[derive(Clone, Copy, Debug)]
169pub struct RequestBlob {
170    pub blob_id: BlobId,
171    pub context_id: ContextId,
172    pub peer_id: PeerId,
173}
174
175impl actix::Message for RequestBlob {
176    type Result = eyre::Result<Option<Vec<u8>>>;
177}
178
179#[derive(Debug)]
180pub enum NetworkEvent {
181    ListeningOn {
182        listener_id: ListenerId,
183        address: Multiaddr,
184    },
185    Subscribed {
186        peer_id: PeerId,
187        topic: TopicHash,
188    },
189    Unsubscribed {
190        peer_id: PeerId,
191        topic: TopicHash,
192    },
193    Message {
194        id: MessageId,
195        message: Message,
196    },
197    StreamOpened {
198        peer_id: PeerId,
199        stream: Box<Stream>,
200        protocol: StreamProtocol,
201    },
202    // Blob discovery events
203    BlobRequested {
204        blob_id: BlobId,
205        context_id: ContextId,
206        requesting_peer: PeerId,
207    },
208    BlobProvidersFound {
209        blob_id: BlobId,
210        context_id: Option<ContextId>,
211        providers: Vec<PeerId>,
212    },
213    BlobDownloaded {
214        blob_id: BlobId,
215        context_id: ContextId,
216        data: Vec<u8>,
217        from_peer: PeerId,
218    },
219    BlobDownloadFailed {
220        blob_id: BlobId,
221        context_id: ContextId,
222        from_peer: PeerId,
223        error: String,
224    },
225}
226
227impl actix::Message for NetworkEvent {
228    type Result = ();
229}