Skip to main content

amaru_protocols/tx_submission/
mod.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod initiator;
16mod responder;
17
18pub mod responder_params;
19pub use responder_params::*;
20
21pub mod messages;
22pub use messages::*;
23
24pub mod outcome;
25pub use outcome::*;
26
27#[cfg(test)]
28mod tests;
29
30use amaru_ouroboros::TxOrigin;
31pub use initiator::initiator;
32use pure_stage::{Effects, StageRef, Void};
33pub use responder::{ResponderResult, responder};
34#[cfg(test)]
35pub use tests::*;
36
37use crate::{
38    connection::ConnectionMessage,
39    mux,
40    protocol::{Inputs, PROTO_N2N_TX_SUB, ProtocolState, Role, RoleT},
41};
42
43pub fn register_deserializers() -> pure_stage::DeserializerGuards {
44    vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
45}
46
47pub fn spec<R: RoleT>() -> crate::protocol::ProtoSpec<State, Message, R>
48where
49    State: ProtocolState<R, WireMsg = Message>,
50{
51    use Message::*;
52    use State::*;
53    let mut spec = crate::protocol::ProtoSpec::default();
54    let request_tx_ids_blocking = || RequestTxIdsBlocking(0, 0);
55    let request_tx_ids_non_blocking = || RequestTxIdsNonBlocking(0, 0);
56    let request_txs = || RequestTxs(vec![]);
57    let reply_tx_ids = || ReplyTxIds(vec![]);
58    let reply_txs = || ReplyTxs(vec![]);
59
60    spec.init(State::Init, Message::Init, Idle);
61    spec.init(TxIdsBlocking, reply_tx_ids(), Idle);
62    spec.init(TxIdsNonBlocking, reply_tx_ids(), Idle);
63    spec.init(Txs, reply_txs(), Idle);
64    spec.init(TxIdsBlocking, Message::Done, State::Done);
65    spec.resp(Idle, request_tx_ids_blocking(), TxIdsBlocking);
66    spec.resp(Idle, request_tx_ids_non_blocking(), TxIdsNonBlocking);
67    spec.resp(Idle, request_txs(), Txs);
68    spec
69}
70
71pub async fn register_tx_submission(
72    role: Role,
73    muxer: StageRef<mux::MuxMessage>,
74    eff: &Effects<ConnectionMessage>,
75    origin: TxOrigin,
76) -> StageRef<mux::HandlerMessage> {
77    let tx_submission = if role == Role::Initiator {
78        let (state, stage) = initiator::TxSubmissionInitiator::new(muxer.clone());
79        let tx_submission = eff.wire_up(eff.stage("tx_submission", initiator::initiator()).await, (state, stage)).await;
80        eff.contramap(&tx_submission, "tx_submission_handler", Inputs::<Void>::Network).await
81    } else {
82        let (state, stage) = responder::TxSubmissionResponder::new(muxer.clone(), ResponderParams::new(2, 3), origin);
83        let tx_submission = eff.wire_up(eff.stage("tx_submission", responder::responder()).await, (state, stage)).await;
84        eff.contramap(&tx_submission, "tx_submission_handler", Inputs::<Void>::Network).await
85    };
86
87    eff.send(
88        &muxer,
89        mux::MuxMessage::Register {
90            protocol: PROTO_N2N_TX_SUB.for_role(role).erase(),
91            frame: mux::Frame::OneCborItem,
92            handler: tx_submission.clone(),
93            max_buffer: 2_500_000,
94        },
95    )
96    .await;
97
98    tx_submission
99}
100
101/// The state of the tx submission protocol as a whole.
102#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)]
103pub enum State {
104    Init,
105    Idle,
106    Done,
107    Txs,
108    TxIdsBlocking,
109    TxIdsNonBlocking,
110}