amaru_protocols/keepalive/
mod.rs1mod initiator;
16mod messages;
17mod responder;
18#[cfg(test)]
19mod tests;
20
21pub use messages::{Cookie, Message};
22use pure_stage::{Effects, StageRef, Void};
23
24use crate::{
25 connection::ConnectionMessage,
26 mux,
27 protocol::{Inputs, PROTO_N2N_KEEP_ALIVE, ProtocolState},
28};
29
30pub fn register_deserializers() -> pure_stage::DeserializerGuards {
31 vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
35pub enum State {
36 Idle,
37 Waiting,
38}
39
40pub fn spec<R: crate::protocol::RoleT>() -> crate::protocol::ProtoSpec<State, Message, R>
41where
42 State: ProtocolState<R, WireMsg = Message>,
43{
44 let mut spec = crate::protocol::ProtoSpec::default();
45 let keep_alive = || Message::KeepAlive(Cookie::new());
46 let response_keep_alive = || Message::ResponseKeepAlive(Cookie::new());
47
48 spec.init(State::Idle, keep_alive(), State::Waiting);
50 spec.resp(State::Waiting, response_keep_alive(), State::Idle);
52
53 spec
54}
55
56pub async fn register_keepalive(
57 role: crate::protocol::Role,
58 muxer: StageRef<crate::mux::MuxMessage>,
59 eff: &Effects<ConnectionMessage>,
60) -> StageRef<crate::mux::HandlerMessage> {
61 let keepalive = if role == crate::protocol::Role::Initiator {
62 let (state, stage) = initiator::KeepAliveInitiator::new(muxer.clone());
63 let keepalive = eff.wire_up(eff.stage("keepalive", initiator::initiator()).await, (state, stage)).await;
64 eff.contramap(&keepalive, "keepalive_handler", Inputs::<initiator::InitiatorMessage>::Network).await
65 } else {
66 let (state, stage) = responder::KeepAliveResponder::new(muxer.clone());
67 let keepalive = eff.wire_up(eff.stage("keepalive", responder::responder()).await, (state, stage)).await;
68 eff.contramap(&keepalive, "keepalive_handler", Inputs::<Void>::Network).await
69 };
70
71 eff.send(
72 &muxer,
73 crate::mux::MuxMessage::Register {
74 protocol: PROTO_N2N_KEEP_ALIVE.for_role(role).erase(),
75 frame: mux::Frame::OneCborItem,
76 handler: keepalive.clone(),
77 max_buffer: 65535,
78 },
79 )
80 .await;
81
82 keepalive
83}