Skip to main content

arc_malachitebft_sync/
behaviour.rs

1use bytes::Bytes;
2use eyre::Result;
3use libp2p::request_response::{self as rpc, OutboundRequestId, ProtocolSupport};
4use libp2p::swarm::NetworkBehaviour;
5use libp2p::{PeerId, StreamProtocol};
6use thiserror::Error;
7
8use crate::rpc::Codec;
9use crate::types::{RawRequest, RawResponse, ResponseChannel};
10use crate::Config;
11
12#[derive(NetworkBehaviour)]
13#[behaviour(to_swarm = "Event")]
14pub struct Behaviour {
15    rpc: rpc::Behaviour<Codec>,
16}
17
18pub type Event = rpc::Event<RawRequest, RawResponse>;
19
20impl Behaviour {
21    pub fn new(config: Config, sync_protocol: String) -> Result<Self> {
22        let protocol = [(
23            StreamProtocol::try_from_owned(sync_protocol)?,
24            ProtocolSupport::Full,
25        )];
26        let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);
27
28        Ok(Self {
29            rpc: rpc::Behaviour::with_codec(Codec::new(config), protocol, rpc_config),
30        })
31    }
32
33    pub fn send_response(&mut self, channel: ResponseChannel, data: Bytes) -> Result<(), Error> {
34        self.rpc
35            .send_response(channel, RawResponse(data))
36            .map_err(|_| Error::SendResponse)
37    }
38
39    pub fn send_request(&mut self, peer: PeerId, data: Bytes) -> OutboundRequestId {
40        self.rpc.send_request(&peer, RawRequest(data))
41    }
42}
43
44#[derive(Clone, Debug, Error)]
45pub enum Error {
46    #[error("Failed to send response")]
47    SendResponse,
48
49    #[error("Failed to send request")]
50    SendRequest,
51}
52
53impl Behaviour {
54    pub fn with_default_protocol(config: Config) -> Self {
55        // Infallible constructor using hardcoded default protocol
56        let protocol = [(
57            StreamProtocol::new("/malachitebft-sync/v1beta1"),
58            ProtocolSupport::Full,
59        )];
60        let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);
61
62        Self {
63            rpc: rpc::Behaviour::with_codec(Codec::new(config), protocol, rpc_config),
64        }
65    }
66}
67
68impl Default for Behaviour {
69    fn default() -> Self {
70        Self::with_default_protocol(Config::default())
71    }
72}