amaru_protocols/
accept.rs1use std::net::SocketAddr;
16
17use pure_stage::{Effects, StageRef};
18
19use crate::{
20 manager::{ManagerConfig, ManagerMessage},
21 network_effects::{AcceptError, Network, NetworkOps},
22};
23
24pub async fn stage(state: AcceptState, _msg: PullAccept, eff: Effects<PullAccept>) -> AcceptState {
26 match Network::new(&eff).accept(state.listener_addr).await {
27 Ok((peer, connection_id)) => {
28 eff.send(&state.manager_stage, ManagerMessage::Accepted(peer, connection_id)).await;
29 }
30 Err(AcceptError::ConnectionAborted) => {
31 tracing::debug!("failed to accept a connection: connection aborted");
32 }
33 Err(AcceptError::Other(err)) => {
34 tracing::error!(?err, "failed to accept a connection");
35 return eff.terminate().await;
36 }
37 }
38 eff.schedule_after(PullAccept, state.manager_config.accept_interval).await;
39 state
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
43pub struct PullAccept;
44
45#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
46pub struct AcceptState {
47 manager_stage: StageRef<ManagerMessage>,
48 manager_config: ManagerConfig,
49 listener_addr: SocketAddr,
50}
51
52impl AcceptState {
53 pub fn new(
54 manager_stage: StageRef<ManagerMessage>,
55 manager_config: ManagerConfig,
56 listener_addr: SocketAddr,
57 ) -> Self {
58 Self { manager_stage, manager_config, listener_addr }
59 }
60}
61
62pub fn register_deserializers() -> pure_stage::DeserializerGuards {
63 vec![pure_stage::register_data_deserializer::<AcceptState>().boxed()]
64}