asteroid_mq/protocol/node/edge/
connection.rs1use std::{ops::Deref, sync::Arc};
2
3pub use asteroid_mq_model::connection::{
4 EdgeConnectionError, EdgeConnectionErrorKind, EdgeNodeConnection,
5};
6use asteroid_mq_model::EndpointAddr;
7use tracing::warn;
8
9use crate::protocol::{
10 message::*,
11 node::edge::{middleware::EdgeConnectionHandler, EdgePayload, EdgeResponse},
12};
13
14use super::{
15 super::{Auth, NodeId, NodeRef},
16 EdgePush,
17};
18
19pub mod tokio_channel;
21pub mod tokio_tcp;
22use futures_util::{FutureExt, SinkExt, StreamExt};
25
26#[derive(Clone, Debug)]
27pub struct ConnectionConfig {
28 pub attached_node: NodeRef,
29 pub peer_id: NodeId,
30 pub auth: Auth,
31}
32
33#[derive(Debug)]
34pub struct EdgeConnectionInstance {
35 pub config: ConnectionConfig,
36 pub outbound: tokio::sync::mpsc::UnboundedSender<EdgePayload>,
37 pub alive: Arc<std::sync::atomic::AtomicBool>,
38 pub peer_id: NodeId,
39 pub finish_signal: Arc<tokio::sync::Notify>,
40}
41
42pub struct EdgeConnectionRef {
43 inner: Arc<EdgeConnectionInstance>,
44}
45
46impl Deref for EdgeConnectionRef {
47 type Target = EdgeConnectionInstance;
48 fn deref(&self) -> &Self::Target {
49 &self.inner
50 }
51}
52impl EdgeConnectionInstance {
53 pub fn is_alive(&self) -> bool {
54 self.alive.load(std::sync::atomic::Ordering::Relaxed)
55 }
56 pub fn get_connection_ref(self: &Arc<Self>) -> EdgeConnectionRef {
57 EdgeConnectionRef {
58 inner: Arc::clone(self),
59 }
60 }
61 pub fn send_payload(&self, packet: EdgePayload) -> Result<(), EdgeConnectionError> {
62 self.outbound
63 .send(packet)
64 .map_err(|_e| EdgeConnectionError::new(EdgeConnectionErrorKind::Closed, "send message"))
65 }
66
67 pub fn push_message(
68 &self,
69 endpoint: &EndpointAddr,
70 message: Message,
71 ) -> Result<(), EdgeConnectionError> {
72 self.send_payload(EdgePayload::Push(EdgePush::Message {
73 endpoints: vec![*endpoint],
74 message,
75 }))
76 }
77 pub async fn init<C: EdgeNodeConnection>(
78 config: ConnectionConfig,
79 connection: C,
80 ) -> Result<Self, EdgeConnectionError> {
81 tracing::debug!(?config, "init edge connection");
82 let config_clone = config.clone();
83 let (mut sink, mut stream) = connection.split();
84 let peer_id = config.peer_id;
85
86 let node = config.attached_node.upgrade().ok_or_else(|| {
87 EdgeConnectionError::new(EdgeConnectionErrorKind::Closed, "node is already dropped")
88 })?;
89 let (outbound_tx, mut outbound_rx) = tokio::sync::mpsc::unbounded_channel::<EdgePayload>();
90 let task = if true {
91 enum PollEvent {
92 PacketIn(EdgePayload),
93 PacketOut(EdgePayload),
94 }
95 let internal_outbound_tx = outbound_tx.clone();
96
97 async move {
98 loop {
99 let event = futures_util::select! {
100 next_pack = stream.next().fuse() => {
101 let Some(next_event) = next_pack else {
102 break;
103 };
104 PollEvent::PacketIn(next_event?)
105 }
106 packet = outbound_rx.recv().fuse() => {
107 let Some(packet) = packet else {
108 break;
109 };
110 PollEvent::PacketOut(packet)
111 }
112 };
113 match event {
114 PollEvent::PacketIn(payload) => {
115 let outbound = internal_outbound_tx.clone();
116 match payload {
117 EdgePayload::Request(request) => {
118 let node = node.clone();
119 tokio::spawn(async move {
120 let seq_id = request.seq_id;
121 let handler = node.edge_handler.read().await.clone();
122 let resp =
123 handler.handle(node, peer_id, request.request).await;
124 let resp = EdgeResponse::from_result(seq_id, resp);
125 let payload = EdgePayload::Response(resp);
128 outbound.send(payload).unwrap_or_else(|_e| {
129 warn!("failed to send edge response, tokio mpsc channel closed");
130 });
131 });
133 }
134 _ => {
135 }
137 }
138 }
139 PollEvent::PacketOut(packet) => {
140 let _packet = packet.clone();
141 sink.send(packet).await.inspect_err(|e| {
142 warn!(?e, "failed to send edge packet");
143 })?;
144 }
146 }
147 }
148
149 Result::<(), EdgeConnectionError>::Ok(())
150 }
151 } else {
152 return Err(EdgeConnectionError::new(
153 EdgeConnectionErrorKind::Protocol,
154 "peer is not an edge node",
155 ));
156 };
157
158 let alive_flag = Arc::new(std::sync::atomic::AtomicBool::new(true));
159 let finish_signal = Arc::new(tokio::sync::Notify::new());
160 let finish_notify = finish_signal.clone();
161 let _handle = {
162 let alive_flag = Arc::clone(&alive_flag);
163 let attached_node = config.attached_node.clone();
164
165 tokio::spawn(async move {
166 let result = task.await;
167 let node = attached_node.upgrade();
168 if let Err(e) = result {
169 if node.is_some() {
170 warn!(?e, "connection task failed");
171 }
172 }
173 alive_flag.store(false, std::sync::atomic::Ordering::Relaxed);
174 finish_notify.notify_one();
175 if let Some(node) = node {
176 node.remove_edge_connection(peer_id).await;
177 }
178 })
179 };
180 Ok(Self {
181 config: config_clone,
182 alive: alive_flag,
183 outbound: outbound_tx,
184 peer_id,
185 finish_signal,
186 })
187 }
188}