Skip to main content

amaru_protocols/chainsync/
mod.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod initiator;
16mod messages;
17mod responder;
18
19pub use initiator::{ChainSyncInitiator, ChainSyncInitiatorMsg, InitiatorMessage, InitiatorResult, initiator};
20pub use messages::HeaderContent;
21pub use responder::{ChainSyncResponder, ResponderMessage, responder};
22
23pub fn register_deserializers() -> pure_stage::DeserializerGuards {
24    vec![messages::register_deserializers(), initiator::register_deserializers(), responder::register_deserializers()]
25        .into_iter()
26        .flatten()
27        .collect()
28}
29
30pub fn to_traverse(header: &messages::HeaderContent) -> Result<MultiEraHeader<'_>, String> {
31    let out = match header.byron_prefix {
32        Some((subtag, _)) => MultiEraHeader::decode(header.variant.header_variant(), Some(subtag), &header.cbor),
33        None => MultiEraHeader::decode(header.variant.header_variant(), None, &header.cbor),
34    };
35
36    out.map_err(|e| e.to_string())
37}
38
39use pallas_traverse::MultiEraHeader;
40pub use register::{register_chainsync_initiator, register_chainsync_responder};
41
42mod register {
43    use amaru_kernel::{Peer, Tip};
44    use amaru_ouroboros::ConnectionId;
45    use pure_stage::{Effects, StageRef};
46
47    use super::*;
48    use crate::{
49        connection::ConnectionMessage,
50        mux::{Frame, MuxMessage},
51        protocol::{Inputs, PROTO_N2N_CHAIN_SYNC},
52    };
53
54    pub async fn register_chainsync_initiator(
55        muxer: &StageRef<MuxMessage>,
56        peer: Peer,
57        conn_id: ConnectionId,
58        pipeline: StageRef<ChainSyncInitiatorMsg>,
59        eff: &Effects<ConnectionMessage>,
60    ) -> StageRef<InitiatorMessage> {
61        let chainsync = eff
62            .wire_up(
63                eff.stage("chainsync", initiator()).await,
64                ChainSyncInitiator::new(peer, conn_id, muxer.clone(), pipeline),
65            )
66            .await;
67        eff.send(
68            muxer,
69            MuxMessage::Register {
70                protocol: PROTO_N2N_CHAIN_SYNC.erase(),
71                frame: Frame::OneCborItem,
72                handler: eff.contramap(&chainsync, "chainsync_bytes", Inputs::Network).await,
73                max_buffer: 5760,
74            },
75        )
76        .await;
77        eff.contramap(&chainsync, "chainsync_handler", Inputs::Local).await
78    }
79
80    pub async fn register_chainsync_responder(
81        muxer: &StageRef<MuxMessage>,
82        upstream: Tip,
83        peer: Peer,
84        conn_id: ConnectionId,
85        eff: &Effects<ConnectionMessage>,
86    ) -> StageRef<ResponderMessage> {
87        let chainsync = eff
88            .wire_up(
89                eff.stage("chainsync", responder()).await,
90                ChainSyncResponder::new(upstream, peer, conn_id, muxer.clone()),
91            )
92            .await;
93        eff.send(
94            muxer,
95            MuxMessage::Register {
96                protocol: PROTO_N2N_CHAIN_SYNC.responder().erase(),
97                frame: Frame::OneCborItem,
98                handler: eff.contramap(&chainsync, "chainsync_bytes", Inputs::Network).await,
99                max_buffer: 5760,
100            },
101        )
102        .await;
103        eff.contramap(&chainsync, "chainsync_bytes", Inputs::Local).await
104    }
105}