amaru_protocols/chainsync/
mod.rs1mod initiator;
16mod messages;
17mod responder;
18
19pub use initiator::{ChainSyncInitiator, ChainSyncInitiatorMsg, InitiatorMessage, InitiatorResult, initiator};
20pub use messages::HeaderContent;
21pub use responder::{ChainSyncResponder, ResponderMessage, responder};
22
23pub fn register_deserializers() -> pure_stage::DeserializerGuards {
24 vec![messages::register_deserializers(), initiator::register_deserializers(), responder::register_deserializers()]
25 .into_iter()
26 .flatten()
27 .collect()
28}
29
30pub fn to_traverse(header: &messages::HeaderContent) -> Result<MultiEraHeader<'_>, String> {
31 let out = match header.byron_prefix {
32 Some((subtag, _)) => MultiEraHeader::decode(header.variant.header_variant(), Some(subtag), &header.cbor),
33 None => MultiEraHeader::decode(header.variant.header_variant(), None, &header.cbor),
34 };
35
36 out.map_err(|e| e.to_string())
37}
38
39use pallas_traverse::MultiEraHeader;
40pub use register::{register_chainsync_initiator, register_chainsync_responder};
41
42mod register {
43 use amaru_kernel::{Peer, Tip};
44 use amaru_ouroboros::ConnectionId;
45 use pure_stage::{Effects, StageRef};
46
47 use super::*;
48 use crate::{
49 connection::ConnectionMessage,
50 mux::{Frame, MuxMessage},
51 protocol::{Inputs, PROTO_N2N_CHAIN_SYNC},
52 };
53
54 pub async fn register_chainsync_initiator(
55 muxer: &StageRef<MuxMessage>,
56 peer: Peer,
57 conn_id: ConnectionId,
58 pipeline: StageRef<ChainSyncInitiatorMsg>,
59 eff: &Effects<ConnectionMessage>,
60 ) -> StageRef<InitiatorMessage> {
61 let chainsync = eff
62 .wire_up(
63 eff.stage("chainsync", initiator()).await,
64 ChainSyncInitiator::new(peer, conn_id, muxer.clone(), pipeline),
65 )
66 .await;
67 eff.send(
68 muxer,
69 MuxMessage::Register {
70 protocol: PROTO_N2N_CHAIN_SYNC.erase(),
71 frame: Frame::OneCborItem,
72 handler: eff.contramap(&chainsync, "chainsync_bytes", Inputs::Network).await,
73 max_buffer: 5760,
74 },
75 )
76 .await;
77 eff.contramap(&chainsync, "chainsync_handler", Inputs::Local).await
78 }
79
80 pub async fn register_chainsync_responder(
81 muxer: &StageRef<MuxMessage>,
82 upstream: Tip,
83 peer: Peer,
84 conn_id: ConnectionId,
85 eff: &Effects<ConnectionMessage>,
86 ) -> StageRef<ResponderMessage> {
87 let chainsync = eff
88 .wire_up(
89 eff.stage("chainsync", responder()).await,
90 ChainSyncResponder::new(upstream, peer, conn_id, muxer.clone()),
91 )
92 .await;
93 eff.send(
94 muxer,
95 MuxMessage::Register {
96 protocol: PROTO_N2N_CHAIN_SYNC.responder().erase(),
97 frame: Frame::OneCborItem,
98 handler: eff.contramap(&chainsync, "chainsync_bytes", Inputs::Network).await,
99 max_buffer: 5760,
100 },
101 )
102 .await;
103 eff.contramap(&chainsync, "chainsync_bytes", Inputs::Local).await
104 }
105}