blueprint_networking/
service_handle.rs

1use crate::types::MessageRouting;
2use crate::{
3    blueprint_protocol::InstanceMessageRequest,
4    discovery::{PeerInfo, PeerManager, peers::VerificationIdentifierKey},
5    service::NetworkCommandMessage,
6    types::ProtocolMessage,
7};
8use blueprint_core::debug;
9use blueprint_crypto::KeyType;
10use crossbeam_channel::{self, Receiver, Sender};
11use libp2p::{Multiaddr, PeerId};
12use std::sync::Arc;
13use tokio::task::JoinHandle;
14
15/// Handle for sending outgoing messages to the network
16#[derive(Clone)]
17pub struct NetworkSender<K: KeyType> {
18    network_message_sender: Sender<NetworkCommandMessage<K>>,
19}
20
21impl<K: KeyType> NetworkSender<K> {
22    #[must_use]
23    pub fn new(network_message_sender: Sender<NetworkCommandMessage<K>>) -> Self {
24        Self {
25            network_message_sender,
26        }
27    }
28
29    /// Send a protocol message over the network
30    ///
31    /// # Errors
32    ///
33    /// See [`crossbeam_channel::Sender::send`]
34    pub fn send_message(&self, message: NetworkCommandMessage<K>) -> Result<(), String> {
35        self.network_message_sender
36            .send(message)
37            .map_err(|e| e.to_string())
38    }
39}
40
41/// Handle for receiving incoming messages from the network
42pub struct NetworkReceiver {
43    protocol_message_receiver: Receiver<ProtocolMessage>,
44}
45
46impl NetworkReceiver {
47    #[must_use]
48    pub fn new(protocol_message_receiver: Receiver<ProtocolMessage>) -> Self {
49        Self {
50            protocol_message_receiver,
51        }
52    }
53
54    /// Get the next protocol message
55    ///
56    /// # Errors
57    ///
58    /// See [`crossbeam_channel::Receiver::try_recv()`]
59    pub fn try_recv(&self) -> Result<ProtocolMessage, crossbeam_channel::TryRecvError> {
60        self.protocol_message_receiver.try_recv()
61    }
62}
63
64/// Combined handle for the network service
65pub struct NetworkServiceHandle<K: KeyType> {
66    pub local_peer_id: PeerId,
67    pub blueprint_protocol_name: Arc<str>,
68    pub local_signing_key: K::Secret,
69    pub sender: NetworkSender<K>,
70    pub receiver: NetworkReceiver,
71    pub peer_manager: Arc<PeerManager<K>>,
72    /// The local verification key used to identify this node in the whitelist
73    pub local_verification_key: Option<VerificationIdentifierKey<K>>,
74}
75
76impl<K: KeyType> Clone for NetworkServiceHandle<K> {
77    fn clone(&self) -> Self {
78        Self {
79            local_peer_id: self.local_peer_id,
80            blueprint_protocol_name: self.blueprint_protocol_name.clone(),
81            local_signing_key: self.local_signing_key.clone(),
82            sender: self.sender.clone(),
83            receiver: NetworkReceiver::new(self.receiver.protocol_message_receiver.clone()),
84            peer_manager: self.peer_manager.clone(),
85            local_verification_key: self.local_verification_key.clone(),
86        }
87    }
88}
89
90impl<K: KeyType> NetworkServiceHandle<K> {
91    #[must_use]
92    pub fn new(
93        local_peer_id: PeerId,
94        blueprint_protocol_name: String,
95        local_signing_key: K::Secret,
96        peer_manager: Arc<PeerManager<K>>,
97        network_message_sender: Sender<NetworkCommandMessage<K>>,
98        protocol_message_receiver: Receiver<ProtocolMessage>,
99    ) -> Self {
100        Self {
101            local_peer_id,
102            blueprint_protocol_name: Arc::from(blueprint_protocol_name),
103            local_signing_key,
104            sender: NetworkSender::new(network_message_sender),
105            receiver: NetworkReceiver::new(protocol_message_receiver),
106            peer_manager,
107            local_verification_key: None,
108        }
109    }
110
111    pub fn next_protocol_message(&mut self) -> Option<ProtocolMessage> {
112        self.receiver.try_recv().ok()
113    }
114
115    #[must_use]
116    pub fn peers(&self) -> Vec<PeerId> {
117        self.peer_manager
118            .get_peers()
119            .clone()
120            .into_read_only()
121            .iter()
122            .map(|(peer_id, _)| *peer_id)
123            .collect()
124    }
125
126    #[must_use]
127    pub fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
128        self.peer_manager.get_peer_info(peer_id)
129    }
130
131    /// Send a message
132    ///
133    /// # Errors
134    ///
135    /// See [`crossbeam_channel::Sender::send`]
136    pub fn send(&self, routing: MessageRouting, message: impl Into<Vec<u8>>) -> Result<(), String> {
137        let protocol_message = ProtocolMessage {
138            protocol: self.blueprint_protocol_name.clone().to_string(),
139            routing,
140            payload: message.into(),
141        };
142
143        let raw_payload = bincode::serialize(&protocol_message).map_err(|err| err.to_string())?;
144        match protocol_message.routing.recipient {
145            Some(recipient) => {
146                let instance_message_request = InstanceMessageRequest::Protocol {
147                    protocol: self.blueprint_protocol_name.clone().to_string(),
148                    payload: raw_payload,
149                    metadata: None,
150                };
151
152                self.send_network_message(NetworkCommandMessage::InstanceRequest {
153                    peer: recipient,
154                    request: instance_message_request,
155                })?;
156                debug!(
157                    "Sent outbound p2p `NetworkCommandMessage` to {:?}",
158                    recipient
159                );
160            }
161            None => {
162                let gossip_message = NetworkCommandMessage::GossipMessage {
163                    source: self.local_peer_id,
164                    topic: self.blueprint_protocol_name.clone().to_string(),
165                    message: raw_payload,
166                };
167                self.send_network_message(gossip_message)?;
168                debug!("Sent outbound gossip `NetworkCommandMessage`");
169            }
170        }
171
172        Ok(())
173    }
174
175    /// Send a network message
176    ///
177    /// # Errors
178    ///
179    /// See [`crossbeam_channel::Sender::send`]
180    pub fn send_network_message(&self, message: NetworkCommandMessage<K>) -> Result<(), String> {
181        self.sender.send_message(message)
182    }
183
184    #[must_use]
185    pub fn get_listen_addr(&self) -> Option<Multiaddr> {
186        // Get the first peer info for our local peer ID
187        if let Some(peer_info) = self.peer_manager.get_peer_info(&self.local_peer_id) {
188            // Return the first address from our peer info
189            peer_info.addresses.iter().next().cloned()
190        } else {
191            None
192        }
193    }
194
195    /// Get the participant ID (index) of this node in the whitelisted keys
196    ///
197    /// This returns the position of this node's verification key in the
198    /// whitelist, which can be used as a participant identifier.
199    ///
200    /// # Returns
201    /// Returns the index in the whitelist if found, None otherwise
202    #[must_use]
203    pub fn get_participant_id(&self) -> Option<usize> {
204        if let Some(verification_key) = &self.local_verification_key {
205            self.peer_manager
206                .get_key_position_in_whitelist(verification_key)
207        } else {
208            None
209        }
210    }
211
212    /// Split the handle into separate sender and receiver
213    #[must_use]
214    pub fn split(self) -> (NetworkSender<K>, NetworkReceiver) {
215        (self.sender, self.receiver)
216    }
217}
218
219/// We might also bundle a `JoinHandle` so the user can await its completion if needed.
220pub struct NetworkServiceTaskHandle {
221    /// The join handle for the background service task.
222    pub service_task: JoinHandle<()>,
223}