amaru_protocols/blockfetch/
mod.rs1mod initiator;
16pub(crate) mod messages;
17mod responder;
18
19use std::sync::Arc;
20
21use amaru_kernel::{EraHistory, Peer, Point};
22use amaru_ouroboros::ConnectionId;
23pub use initiator::{BlockFetchInitiator, BlockFetchMessage, Blocks, initiator};
25pub use messages::Message;
26use pure_stage::{DeserializerGuards, Effects, StageRef};
27pub use responder::{BlockFetchResponder, StreamBlocks, responder};
28
29use crate::{
30 mux::{Frame, MuxMessage},
31 protocol::{Inputs, ProtoSpec, ProtocolState, RoleT},
32};
33
34pub fn spec<R: RoleT>() -> ProtoSpec<State, Message, R>
35where
36 State: ProtocolState<R, WireMsg = Message>,
37{
38 use State::*;
39
40 let mut spec = ProtoSpec::default();
41 let request_range = || Message::RequestRange { from: Point::Origin, through: Point::Origin };
42 let no_blocks = || Message::NoBlocks;
43 let client_done = || Message::ClientDone;
44 let batch_done = || Message::BatchDone;
45 let start_batch = || Message::StartBatch;
46 let block = || Message::Block { body: vec![1] };
47
48 spec.init(Idle, client_done(), Done);
49 spec.init(Idle, request_range(), Busy);
50 spec.resp(Busy, no_blocks(), Idle);
51 spec.resp(Busy, start_batch(), Streaming);
52 spec.resp(Streaming, block(), Streaming);
53 spec.resp(Streaming, batch_done(), Idle);
54 spec
55}
56
57pub fn register_deserializers() -> DeserializerGuards {
58 vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
59}
60
61#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
62pub enum State {
63 Idle,
64 Busy,
65 Streaming,
66 Done,
67}
68
69pub async fn register_blockfetch_initiator<M>(
70 muxer: &StageRef<MuxMessage>,
71 peer: Peer,
72 conn_id: ConnectionId,
73 era_history: Arc<EraHistory>,
74 eff: &Effects<M>,
75) -> StageRef<BlockFetchMessage> {
76 use crate::protocol::PROTO_N2N_BLOCK_FETCH;
77 let blockfetch = eff
78 .wire_up(
79 eff.stage("blockfetch", initiator()).await,
80 BlockFetchInitiator::new(muxer.clone(), peer, conn_id, era_history),
81 )
82 .await;
83 eff.send(
84 muxer,
85 MuxMessage::Register {
86 protocol: PROTO_N2N_BLOCK_FETCH.erase(),
87 frame: Frame::OneCborItem,
88 handler: eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Network).await,
89 max_buffer: 2_500_000,
90 },
91 )
92 .await;
93 eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Local).await
94}
95
96pub async fn register_blockfetch_responder<M>(
97 muxer: &StageRef<MuxMessage>,
98 eff: &Effects<M>,
99) -> StageRef<StreamBlocks> {
100 use crate::protocol::PROTO_N2N_BLOCK_FETCH;
101 let blockfetch =
102 eff.wire_up(eff.stage("blockfetch", responder()).await, BlockFetchResponder::new(muxer.clone())).await;
103 eff.send(
104 muxer,
105 MuxMessage::Register {
106 protocol: PROTO_N2N_BLOCK_FETCH.responder().erase(),
107 frame: Frame::OneCborItem,
108 handler: eff.contramap(&blockfetch, "blockfetch_bytes", Inputs::Network).await,
109 max_buffer: 2_500_000,
110 },
111 )
112 .await;
113 eff.contramap(&blockfetch, "blockfetch_handler", Inputs::Local).await
114}