amaru_protocols/tx_submission/
mod.rs1mod initiator;
16mod responder;
17
18pub mod responder_params;
19pub use responder_params::*;
20
21pub mod messages;
22pub use messages::*;
23
24pub mod outcome;
25pub use outcome::*;
26
27#[cfg(test)]
28mod tests;
29
30use amaru_ouroboros::TxOrigin;
31pub use initiator::initiator;
32use pure_stage::{Effects, StageRef, Void};
33pub use responder::{ResponderResult, responder};
34#[cfg(test)]
35pub use tests::*;
36
37use crate::{
38 connection::ConnectionMessage,
39 mux,
40 protocol::{Inputs, PROTO_N2N_TX_SUB, ProtocolState, Role, RoleT},
41};
42
43pub fn register_deserializers() -> pure_stage::DeserializerGuards {
44 vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
45}
46
47pub fn spec<R: RoleT>() -> crate::protocol::ProtoSpec<State, Message, R>
48where
49 State: ProtocolState<R, WireMsg = Message>,
50{
51 use Message::*;
52 use State::*;
53 let mut spec = crate::protocol::ProtoSpec::default();
54 let request_tx_ids_blocking = || RequestTxIdsBlocking(0, 0);
55 let request_tx_ids_non_blocking = || RequestTxIdsNonBlocking(0, 0);
56 let request_txs = || RequestTxs(vec![]);
57 let reply_tx_ids = || ReplyTxIds(vec![]);
58 let reply_txs = || ReplyTxs(vec![]);
59
60 spec.init(State::Init, Message::Init, Idle);
61 spec.init(TxIdsBlocking, reply_tx_ids(), Idle);
62 spec.init(TxIdsNonBlocking, reply_tx_ids(), Idle);
63 spec.init(Txs, reply_txs(), Idle);
64 spec.init(TxIdsBlocking, Message::Done, State::Done);
65 spec.resp(Idle, request_tx_ids_blocking(), TxIdsBlocking);
66 spec.resp(Idle, request_tx_ids_non_blocking(), TxIdsNonBlocking);
67 spec.resp(Idle, request_txs(), Txs);
68 spec
69}
70
71pub async fn register_tx_submission(
72 role: Role,
73 muxer: StageRef<mux::MuxMessage>,
74 eff: &Effects<ConnectionMessage>,
75 origin: TxOrigin,
76) -> StageRef<mux::HandlerMessage> {
77 let tx_submission = if role == Role::Initiator {
78 let (state, stage) = initiator::TxSubmissionInitiator::new(muxer.clone());
79 let tx_submission = eff.wire_up(eff.stage("tx_submission", initiator::initiator()).await, (state, stage)).await;
80 eff.contramap(&tx_submission, "tx_submission_handler", Inputs::<Void>::Network).await
81 } else {
82 let (state, stage) = responder::TxSubmissionResponder::new(muxer.clone(), ResponderParams::new(2, 3), origin);
83 let tx_submission = eff.wire_up(eff.stage("tx_submission", responder::responder()).await, (state, stage)).await;
84 eff.contramap(&tx_submission, "tx_submission_handler", Inputs::<Void>::Network).await
85 };
86
87 eff.send(
88 &muxer,
89 mux::MuxMessage::Register {
90 protocol: PROTO_N2N_TX_SUB.for_role(role).erase(),
91 frame: mux::Frame::OneCborItem,
92 handler: tx_submission.clone(),
93 max_buffer: 2_500_000,
94 },
95 )
96 .await;
97
98 tx_submission
99}
100
101#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)]
103pub enum State {
104 Init,
105 Idle,
106 Done,
107 Txs,
108 TxIdsBlocking,
109 TxIdsNonBlocking,
110}