Skip to main content

amaru_protocols/blockfetch/
mod.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod initiator;
16pub(crate) mod messages;
17mod responder;
18
19use std::sync::Arc;
20
21use amaru_kernel::{EraHistory, Peer, Point};
22use amaru_ouroboros::ConnectionId;
23// Re-export types
24pub use initiator::{BlockFetchInitiator, BlockFetchMessage, Blocks, initiator};
25pub use messages::Message;
26use pure_stage::{DeserializerGuards, Effects, StageRef};
27pub use responder::{BlockFetchResponder, StreamBlocks, responder};
28
29use crate::{
30    mux::{Frame, MuxMessage},
31    protocol::{Inputs, ProtoSpec, ProtocolState, RoleT},
32};
33
34pub fn spec<R: RoleT>() -> ProtoSpec<State, Message, R>
35where
36    State: ProtocolState<R, WireMsg = Message>,
37{
38    use State::*;
39
40    let mut spec = ProtoSpec::default();
41    let request_range = || Message::RequestRange { from: Point::Origin, through: Point::Origin };
42    let no_blocks = || Message::NoBlocks;
43    let client_done = || Message::ClientDone;
44    let batch_done = || Message::BatchDone;
45    let start_batch = || Message::StartBatch;
46    let block = || Message::Block { body: vec![1] };
47
48    spec.init(Idle, client_done(), Done);
49    spec.init(Idle, request_range(), Busy);
50    spec.resp(Busy, no_blocks(), Idle);
51    spec.resp(Busy, start_batch(), Streaming);
52    spec.resp(Streaming, block(), Streaming);
53    spec.resp(Streaming, batch_done(), Idle);
54    spec
55}
56
57pub fn register_deserializers() -> DeserializerGuards {
58    vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
59}
60
61#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
62pub enum State {
63    Idle,
64    Busy,
65    Streaming,
66    Done,
67}
68
69pub async fn register_blockfetch_initiator<M>(
70    muxer: &StageRef<MuxMessage>,
71    peer: Peer,
72    conn_id: ConnectionId,
73    era_history: Arc<EraHistory>,
74    eff: &Effects<M>,
75) -> StageRef<BlockFetchMessage> {
76    use crate::protocol::PROTO_N2N_BLOCK_FETCH;
77    let blockfetch = eff
78        .wire_up(
79            eff.stage("blockfetch", initiator()).await,
80            BlockFetchInitiator::new(muxer.clone(), peer, conn_id, era_history),
81        )
82        .await;
83    eff.send(
84        muxer,
85        MuxMessage::Register {
86            protocol: PROTO_N2N_BLOCK_FETCH.erase(),
87            frame: Frame::OneCborItem,
88            handler: eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Network).await,
89            max_buffer: 2_500_000,
90        },
91    )
92    .await;
93    eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Local).await
94}
95
96pub async fn register_blockfetch_responder<M>(
97    muxer: &StageRef<MuxMessage>,
98    eff: &Effects<M>,
99) -> StageRef<StreamBlocks> {
100    use crate::protocol::PROTO_N2N_BLOCK_FETCH;
101    let blockfetch =
102        eff.wire_up(eff.stage("blockfetch", responder()).await, BlockFetchResponder::new(muxer.clone())).await;
103    eff.send(
104        muxer,
105        MuxMessage::Register {
106            protocol: PROTO_N2N_BLOCK_FETCH.responder().erase(),
107            frame: Frame::OneCborItem,
108            handler: eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Network).await,
109            max_buffer: 2_500_000,
110        },
111    )
112    .await;
113    eff.contramap(&blockfetch, "blockfetch_handler", Inputs::Local).await
114}