Skip to main content

asteroid_mq/protocol/node/edge/
connection.rs

1use std::{ops::Deref, sync::Arc};
2
3pub use asteroid_mq_model::connection::{
4    EdgeConnectionError, EdgeConnectionErrorKind, EdgeNodeConnection,
5};
6use asteroid_mq_model::EndpointAddr;
7use tracing::warn;
8
9use crate::protocol::{
10    message::*,
11    node::edge::{middleware::EdgeConnectionHandler, EdgePayload, EdgeResponse},
12};
13
14use super::{
15    super::{Auth, NodeId, NodeRef},
16    EdgePush,
17};
18
19// todo
20pub mod tokio_channel;
21pub mod tokio_tcp;
22// pub mod tokio_ws;
23
24use futures_util::{FutureExt, SinkExt, StreamExt};
25
26#[derive(Clone, Debug)]
27pub struct ConnectionConfig {
28    pub attached_node: NodeRef,
29    pub peer_id: NodeId,
30    pub auth: Auth,
31}
32
33#[derive(Debug)]
34pub struct EdgeConnectionInstance {
35    pub config: ConnectionConfig,
36    pub outbound: tokio::sync::mpsc::UnboundedSender<EdgePayload>,
37    pub alive: Arc<std::sync::atomic::AtomicBool>,
38    pub peer_id: NodeId,
39    pub finish_signal: Arc<tokio::sync::Notify>,
40}
41
42pub struct EdgeConnectionRef {
43    inner: Arc<EdgeConnectionInstance>,
44}
45
46impl Deref for EdgeConnectionRef {
47    type Target = EdgeConnectionInstance;
48    fn deref(&self) -> &Self::Target {
49        &self.inner
50    }
51}
52impl EdgeConnectionInstance {
53    pub fn is_alive(&self) -> bool {
54        self.alive.load(std::sync::atomic::Ordering::Relaxed)
55    }
56    pub fn get_connection_ref(self: &Arc<Self>) -> EdgeConnectionRef {
57        EdgeConnectionRef {
58            inner: Arc::clone(self),
59        }
60    }
61    pub fn send_payload(&self, packet: EdgePayload) -> Result<(), EdgeConnectionError> {
62        self.outbound
63            .send(packet)
64            .map_err(|_e| EdgeConnectionError::new(EdgeConnectionErrorKind::Closed, "send message"))
65    }
66
67    pub fn push_message(
68        &self,
69        endpoint: &EndpointAddr,
70        message: Message,
71    ) -> Result<(), EdgeConnectionError> {
72        self.send_payload(EdgePayload::Push(EdgePush::Message {
73            endpoints: vec![*endpoint],
74            message,
75        }))
76    }
77    pub async fn init<C: EdgeNodeConnection>(
78        config: ConnectionConfig,
79        connection: C,
80    ) -> Result<Self, EdgeConnectionError> {
81        tracing::debug!(?config, "init edge connection");
82        let config_clone = config.clone();
83        let (mut sink, mut stream) = connection.split();
84        let peer_id = config.peer_id;
85
86        let node = config.attached_node.upgrade().ok_or_else(|| {
87            EdgeConnectionError::new(EdgeConnectionErrorKind::Closed, "node is already dropped")
88        })?;
89        let (outbound_tx, mut outbound_rx) = tokio::sync::mpsc::unbounded_channel::<EdgePayload>();
90        let task = if true {
91            enum PollEvent {
92                PacketIn(EdgePayload),
93                PacketOut(EdgePayload),
94            }
95            let internal_outbound_tx = outbound_tx.clone();
96
97            async move {
98                loop {
99                    let event = futures_util::select! {
100                        next_pack = stream.next().fuse() => {
101                            let Some(next_event) = next_pack else {
102                                break;
103                            };
104                            PollEvent::PacketIn(next_event?)
105                        }
106                        packet = outbound_rx.recv().fuse() => {
107                            let Some(packet) = packet else {
108                                break;
109                            };
110                            PollEvent::PacketOut(packet)
111                        }
112                    };
113                    match event {
114                        PollEvent::PacketIn(payload) => {
115                            let outbound = internal_outbound_tx.clone();
116                            match payload {
117                                EdgePayload::Request(request) => {
118                                    let node = node.clone();
119                                    tokio::spawn(async move {
120                                        let seq_id = request.seq_id;
121                                        let handler = node.edge_handler.read().await.clone();
122                                        let resp =
123                                            handler.handle(node, peer_id, request.request).await;
124                                        let resp = EdgeResponse::from_result(seq_id, resp);
125                                        // tracing::warn!(?resp,"[debug] edge response going to send");
126                                        // let seq_id = resp.seq_id;
127                                        let payload = EdgePayload::Response(resp);
128                                        outbound.send(payload).unwrap_or_else(|_e| {
129                                            warn!("failed to send edge response, tokio mpsc channel closed");
130                                        });
131                                        // tracing::warn!(?seq_id, "[debug] edge response send out");
132                                    });
133                                }
134                                _ => {
135                                    // invalid event, ignore
136                                }
137                            }
138                        }
139                        PollEvent::PacketOut(packet) => {
140                            let _packet = packet.clone();
141                            sink.send(packet).await.inspect_err(|e| {
142                                warn!(?e, "failed to send edge packet");
143                            })?;
144                            // tracing::warn!(?_packet, "[debug] ws-level edge response send out");
145                        }
146                    }
147                }
148
149                Result::<(), EdgeConnectionError>::Ok(())
150            }
151        } else {
152            return Err(EdgeConnectionError::new(
153                EdgeConnectionErrorKind::Protocol,
154                "peer is not an edge node",
155            ));
156        };
157
158        let alive_flag = Arc::new(std::sync::atomic::AtomicBool::new(true));
159        let finish_signal = Arc::new(tokio::sync::Notify::new());
160        let finish_notify = finish_signal.clone();
161        let _handle = {
162            let alive_flag = Arc::clone(&alive_flag);
163            let attached_node = config.attached_node.clone();
164
165            tokio::spawn(async move {
166                let result = task.await;
167                let node = attached_node.upgrade();
168                if let Err(e) = result {
169                    if node.is_some() {
170                        warn!(?e, "connection task failed");
171                    }
172                }
173                alive_flag.store(false, std::sync::atomic::Ordering::Relaxed);
174                finish_notify.notify_one();
175                if let Some(node) = node {
176                    node.remove_edge_connection(peer_id).await;
177                }
178            })
179        };
180        Ok(Self {
181            config: config_clone,
182            alive: alive_flag,
183            outbound: outbound_tx,
184            peer_id,
185            finish_signal,
186        })
187    }
188}