arc_malachitebft_sync/
behaviour.rs1use bytes::Bytes;
2use eyre::Result;
3use libp2p::request_response::{self as rpc, OutboundRequestId, ProtocolSupport};
4use libp2p::swarm::NetworkBehaviour;
5use libp2p::{PeerId, StreamProtocol};
6use thiserror::Error;
7
8use crate::rpc::Codec;
9use crate::types::{RawRequest, RawResponse, ResponseChannel};
10use crate::Config;
11
12#[derive(NetworkBehaviour)]
13#[behaviour(to_swarm = "Event")]
14pub struct Behaviour {
15 rpc: rpc::Behaviour<Codec>,
16}
17
18pub type Event = rpc::Event<RawRequest, RawResponse>;
19
20impl Behaviour {
21 pub fn new(config: Config, sync_protocol: String) -> Result<Self> {
22 let protocol = [(
23 StreamProtocol::try_from_owned(sync_protocol)?,
24 ProtocolSupport::Full,
25 )];
26 let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);
27
28 Ok(Self {
29 rpc: rpc::Behaviour::with_codec(Codec::new(config), protocol, rpc_config),
30 })
31 }
32
33 pub fn send_response(&mut self, channel: ResponseChannel, data: Bytes) -> Result<(), Error> {
34 self.rpc
35 .send_response(channel, RawResponse(data))
36 .map_err(|_| Error::SendResponse)
37 }
38
39 pub fn send_request(&mut self, peer: PeerId, data: Bytes) -> OutboundRequestId {
40 self.rpc.send_request(&peer, RawRequest(data))
41 }
42}
43
44#[derive(Clone, Debug, Error)]
45pub enum Error {
46 #[error("Failed to send response")]
47 SendResponse,
48
49 #[error("Failed to send request")]
50 SendRequest,
51}
52
53impl Behaviour {
54 pub fn with_default_protocol(config: Config) -> Self {
55 let protocol = [(
57 StreamProtocol::new("/malachitebft-sync/v1beta1"),
58 ProtocolSupport::Full,
59 )];
60 let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);
61
62 Self {
63 rpc: rpc::Behaviour::with_codec(Codec::new(config), protocol, rpc_config),
64 }
65 }
66}
67
68impl Default for Behaviour {
69 fn default() -> Self {
70 Self::with_default_protocol(Config::default())
71 }
72}