Skip to main content

amaru_protocols/keepalive/
mod.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod initiator;
16mod messages;
17mod responder;
18#[cfg(test)]
19mod tests;
20
21pub use messages::{Cookie, Message};
22use pure_stage::{Effects, StageRef, Void};
23
24use crate::{
25    connection::ConnectionMessage,
26    mux,
27    protocol::{Inputs, PROTO_N2N_KEEP_ALIVE, ProtocolState},
28};
29
30pub fn register_deserializers() -> pure_stage::DeserializerGuards {
31    vec![initiator::register_deserializers(), responder::register_deserializers()].into_iter().flatten().collect()
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
35pub enum State {
36    Idle,
37    Waiting,
38}
39
40pub fn spec<R: crate::protocol::RoleT>() -> crate::protocol::ProtoSpec<State, Message, R>
41where
42    State: ProtocolState<R, WireMsg = Message>,
43{
44    let mut spec = crate::protocol::ProtoSpec::default();
45    let keep_alive = || Message::KeepAlive(Cookie::new());
46    let response_keep_alive = || Message::ResponseKeepAlive(Cookie::new());
47
48    // Initiator sends KeepAlive from Idle, transitions to Waiting
49    spec.init(State::Idle, keep_alive(), State::Waiting);
50    // Initiator receives ResponseKeepAlive in Waiting, transitions to Idle
51    spec.resp(State::Waiting, response_keep_alive(), State::Idle);
52
53    spec
54}
55
56pub async fn register_keepalive(
57    role: crate::protocol::Role,
58    muxer: StageRef<crate::mux::MuxMessage>,
59    eff: &Effects<ConnectionMessage>,
60) -> StageRef<crate::mux::HandlerMessage> {
61    let keepalive = if role == crate::protocol::Role::Initiator {
62        let (state, stage) = initiator::KeepAliveInitiator::new(muxer.clone());
63        let keepalive = eff.wire_up(eff.stage("keepalive", initiator::initiator()).await, (state, stage)).await;
64        eff.contramap(&keepalive, "keepalive_handler", Inputs::<initiator::InitiatorMessage>::Network).await
65    } else {
66        let (state, stage) = responder::KeepAliveResponder::new(muxer.clone());
67        let keepalive = eff.wire_up(eff.stage("keepalive", responder::responder()).await, (state, stage)).await;
68        eff.contramap(&keepalive, "keepalive_handler", Inputs::<Void>::Network).await
69    };
70
71    eff.send(
72        &muxer,
73        crate::mux::MuxMessage::Register {
74            protocol: PROTO_N2N_KEEP_ALIVE.for_role(role).erase(),
75            frame: mux::Frame::OneCborItem,
76            handler: keepalive.clone(),
77            max_buffer: 65535,
78        },
79    )
80    .await;
81
82    keepalive
83}