blueprint_networking/
service_handle.rs1use crate::types::MessageRouting;
2use crate::{
3 blueprint_protocol::InstanceMessageRequest,
4 discovery::{PeerInfo, PeerManager, peers::VerificationIdentifierKey},
5 service::NetworkCommandMessage,
6 types::ProtocolMessage,
7};
8use blueprint_core::debug;
9use blueprint_crypto::KeyType;
10use crossbeam_channel::{self, Receiver, Sender};
11use libp2p::{Multiaddr, PeerId};
12use std::sync::Arc;
13use tokio::task::JoinHandle;
14
15#[derive(Clone)]
17pub struct NetworkSender<K: KeyType> {
18 network_message_sender: Sender<NetworkCommandMessage<K>>,
19}
20
21impl<K: KeyType> NetworkSender<K> {
22 #[must_use]
23 pub fn new(network_message_sender: Sender<NetworkCommandMessage<K>>) -> Self {
24 Self {
25 network_message_sender,
26 }
27 }
28
29 pub fn send_message(&self, message: NetworkCommandMessage<K>) -> Result<(), String> {
35 self.network_message_sender
36 .send(message)
37 .map_err(|e| e.to_string())
38 }
39}
40
41pub struct NetworkReceiver {
43 protocol_message_receiver: Receiver<ProtocolMessage>,
44}
45
46impl NetworkReceiver {
47 #[must_use]
48 pub fn new(protocol_message_receiver: Receiver<ProtocolMessage>) -> Self {
49 Self {
50 protocol_message_receiver,
51 }
52 }
53
54 pub fn try_recv(&self) -> Result<ProtocolMessage, crossbeam_channel::TryRecvError> {
60 self.protocol_message_receiver.try_recv()
61 }
62}
63
64pub struct NetworkServiceHandle<K: KeyType> {
66 pub local_peer_id: PeerId,
67 pub blueprint_protocol_name: Arc<str>,
68 pub local_signing_key: K::Secret,
69 pub sender: NetworkSender<K>,
70 pub receiver: NetworkReceiver,
71 pub peer_manager: Arc<PeerManager<K>>,
72 pub local_verification_key: Option<VerificationIdentifierKey<K>>,
74}
75
76impl<K: KeyType> Clone for NetworkServiceHandle<K> {
77 fn clone(&self) -> Self {
78 Self {
79 local_peer_id: self.local_peer_id,
80 blueprint_protocol_name: self.blueprint_protocol_name.clone(),
81 local_signing_key: self.local_signing_key.clone(),
82 sender: self.sender.clone(),
83 receiver: NetworkReceiver::new(self.receiver.protocol_message_receiver.clone()),
84 peer_manager: self.peer_manager.clone(),
85 local_verification_key: self.local_verification_key.clone(),
86 }
87 }
88}
89
90impl<K: KeyType> NetworkServiceHandle<K> {
91 #[must_use]
92 pub fn new(
93 local_peer_id: PeerId,
94 blueprint_protocol_name: String,
95 local_signing_key: K::Secret,
96 peer_manager: Arc<PeerManager<K>>,
97 network_message_sender: Sender<NetworkCommandMessage<K>>,
98 protocol_message_receiver: Receiver<ProtocolMessage>,
99 ) -> Self {
100 Self {
101 local_peer_id,
102 blueprint_protocol_name: Arc::from(blueprint_protocol_name),
103 local_signing_key,
104 sender: NetworkSender::new(network_message_sender),
105 receiver: NetworkReceiver::new(protocol_message_receiver),
106 peer_manager,
107 local_verification_key: None,
108 }
109 }
110
111 pub fn next_protocol_message(&mut self) -> Option<ProtocolMessage> {
112 self.receiver.try_recv().ok()
113 }
114
115 #[must_use]
116 pub fn peers(&self) -> Vec<PeerId> {
117 self.peer_manager
118 .get_peers()
119 .clone()
120 .into_read_only()
121 .iter()
122 .map(|(peer_id, _)| *peer_id)
123 .collect()
124 }
125
126 #[must_use]
127 pub fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
128 self.peer_manager.get_peer_info(peer_id)
129 }
130
131 pub fn send(&self, routing: MessageRouting, message: impl Into<Vec<u8>>) -> Result<(), String> {
137 let protocol_message = ProtocolMessage {
138 protocol: self.blueprint_protocol_name.clone().to_string(),
139 routing,
140 payload: message.into(),
141 };
142
143 let raw_payload = bincode::serialize(&protocol_message).map_err(|err| err.to_string())?;
144 match protocol_message.routing.recipient {
145 Some(recipient) => {
146 let instance_message_request = InstanceMessageRequest::Protocol {
147 protocol: self.blueprint_protocol_name.clone().to_string(),
148 payload: raw_payload,
149 metadata: None,
150 };
151
152 self.send_network_message(NetworkCommandMessage::InstanceRequest {
153 peer: recipient,
154 request: instance_message_request,
155 })?;
156 debug!(
157 "Sent outbound p2p `NetworkCommandMessage` to {:?}",
158 recipient
159 );
160 }
161 None => {
162 let gossip_message = NetworkCommandMessage::GossipMessage {
163 source: self.local_peer_id,
164 topic: self.blueprint_protocol_name.clone().to_string(),
165 message: raw_payload,
166 };
167 self.send_network_message(gossip_message)?;
168 debug!("Sent outbound gossip `NetworkCommandMessage`");
169 }
170 }
171
172 Ok(())
173 }
174
175 pub fn send_network_message(&self, message: NetworkCommandMessage<K>) -> Result<(), String> {
181 self.sender.send_message(message)
182 }
183
184 #[must_use]
185 pub fn get_listen_addr(&self) -> Option<Multiaddr> {
186 if let Some(peer_info) = self.peer_manager.get_peer_info(&self.local_peer_id) {
188 peer_info.addresses.iter().next().cloned()
190 } else {
191 None
192 }
193 }
194
195 #[must_use]
203 pub fn get_participant_id(&self) -> Option<usize> {
204 if let Some(verification_key) = &self.local_verification_key {
205 self.peer_manager
206 .get_key_position_in_whitelist(verification_key)
207 } else {
208 None
209 }
210 }
211
212 #[must_use]
214 pub fn split(self) -> (NetworkSender<K>, NetworkReceiver) {
215 (self.sender, self.receiver)
216 }
217}
218
219pub struct NetworkServiceTaskHandle {
221 pub service_task: JoinHandle<()>,
223}