Skip to main content

amaru_protocols/
accept.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16
17use pure_stage::{Effects, StageRef};
18
19use crate::{
20    manager::{ManagerConfig, ManagerMessage},
21    network_effects::{AcceptError, Network, NetworkOps},
22};
23
24/// Create a stage that repeatedly accepts incoming connections and notifies the manager about them.
25pub async fn stage(state: AcceptState, _msg: PullAccept, eff: Effects<PullAccept>) -> AcceptState {
26    match Network::new(&eff).accept(state.listener_addr).await {
27        Ok((peer, connection_id)) => {
28            eff.send(&state.manager_stage, ManagerMessage::Accepted(peer, connection_id)).await;
29        }
30        Err(AcceptError::ConnectionAborted) => {
31            tracing::debug!("failed to accept a connection: connection aborted");
32        }
33        Err(AcceptError::Other(err)) => {
34            tracing::error!(?err, "failed to accept a connection");
35            return eff.terminate().await;
36        }
37    }
38    eff.schedule_after(PullAccept, state.manager_config.accept_interval).await;
39    state
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
43pub struct PullAccept;
44
45#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
46pub struct AcceptState {
47    manager_stage: StageRef<ManagerMessage>,
48    manager_config: ManagerConfig,
49    listener_addr: SocketAddr,
50}
51
52impl AcceptState {
53    pub fn new(
54        manager_stage: StageRef<ManagerMessage>,
55        manager_config: ManagerConfig,
56        listener_addr: SocketAddr,
57    ) -> Self {
58        Self { manager_stage, manager_config, listener_addr }
59    }
60}
61
62pub fn register_deserializers() -> pure_stage::DeserializerGuards {
63    vec![pure_stage::register_data_deserializer::<AcceptState>().boxed()]
64}