1use std::{collections::VecDeque, sync::Arc};
2
3use async_std::task::JoinHandle;
4use atm0s_sdn_identity::{ConnId, NodeId};
5use atm0s_sdn_network::{
6 behaviour::{BehaviorContext, ConnectionHandler, NetworkBehavior, NetworkBehaviorAction},
7 msg::{MsgHeader, TransportMsg},
8 transport::{ConnectionRejectReason, ConnectionSender, OutgoingConnectionError},
9};
10use atm0s_sdn_pub_sub::{Publisher, PubsubSdk};
11use atm0s_sdn_router::RouteRule;
12use bytes::Bytes;
13use parking_lot::Mutex;
14
15use crate::{
16 handler::NodeAliasHandler,
17 internal::{ServiceInternal, ServiceInternalAction},
18 msg::{BroadcastMsg, SdkControl},
19 sdk::NodeAliasSdk,
20 NODE_ALIAS_SERVICE_ID,
21};
22
23const NODE_ALIAS_BROADCAST_CHANNEL: u32 = 0x13ba2c; pub struct NodeAliasBehavior {
26 node_id: NodeId,
27 pubsub_sdk: PubsubSdk,
28 pub_channel: Arc<Publisher>,
29 pubsub_task: Option<JoinHandle<()>>,
30 incomming_broadcast_queue: Arc<Mutex<VecDeque<(NodeId, BroadcastMsg)>>>,
31 sdk: NodeAliasSdk,
32 internal: Arc<Mutex<ServiceInternal>>,
33}
34
35impl NodeAliasBehavior {
36 pub fn new(node_id: NodeId, pubsub_sdk: PubsubSdk) -> (Self, NodeAliasSdk) {
37 let sdk = NodeAliasSdk::default();
38 let instance = Self {
39 node_id,
40 pub_channel: Arc::new(pubsub_sdk.create_publisher(NODE_ALIAS_BROADCAST_CHANNEL)),
41 pubsub_sdk,
42 pubsub_task: None,
43 incomming_broadcast_queue: Arc::new(Mutex::new(VecDeque::new())),
44 sdk: sdk.clone(),
45 internal: Arc::new(Mutex::new(ServiceInternal::new(node_id))),
46 };
47
48 (instance, sdk)
49 }
50}
51
52impl<BE, HE, SE> NetworkBehavior<BE, HE, SE> for NodeAliasBehavior {
53 fn service_id(&self) -> u8 {
54 NODE_ALIAS_SERVICE_ID
55 }
56
57 fn on_started(&mut self, ctx: &BehaviorContext, _now_ms: u64) {
58 self.sdk.set_awaker(ctx.awaker.clone());
59 let node_id = self.node_id;
60 let sub_channel = self.pubsub_sdk.create_consumer(NODE_ALIAS_BROADCAST_CHANNEL, None);
61 let awaker = ctx.awaker.clone();
62 let incomming_broadcast_msg = self.incomming_broadcast_queue.clone();
63 self.pubsub_task = Some(async_std::task::spawn(async move {
64 loop {
65 if let Some((_, source, _, msg)) = sub_channel.recv().await {
66 if source == node_id {
67 continue;
68 }
69 if let Ok(msg) = bincode::deserialize(&msg) {
70 incomming_broadcast_msg.lock().push_back((source, msg));
71 awaker.notify();
72 }
73 }
74 }
75 }));
76 }
77
78 fn on_tick(&mut self, _ctx: &BehaviorContext, now_ms: u64, _interval_ms: u64) {
79 self.internal.lock().on_tick(now_ms);
80 }
81
82 fn on_awake(&mut self, _ctx: &BehaviorContext, now_ms: u64) {
83 let mut incomming_broadcast_msg = self.incomming_broadcast_queue.lock();
84 while let Some((source, msg)) = incomming_broadcast_msg.pop_front() {
85 self.internal.lock().on_incomming_broadcast(now_ms, source, msg);
86 }
87 while let Some(msg) = self.sdk.pop_control() {
88 match msg {
89 SdkControl::Register(alias) => {
90 self.internal.lock().register(now_ms, alias);
91 }
92 SdkControl::Unregister(alias) => {
93 self.internal.lock().unregister(now_ms, &alias);
94 }
95 SdkControl::Query(alias, sender) => {
96 self.internal.lock().find_alias(now_ms, &alias, sender);
97 }
98 }
99 }
100 }
101
102 fn on_sdk_msg(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _from_service: u8, _event: SE) {}
103
104 fn on_local_msg(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _msg: TransportMsg) {
105 panic!("Should not happend");
106 }
107
108 fn check_incoming_connection(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node: NodeId, _conn_id: ConnId) -> Result<(), ConnectionRejectReason> {
109 Ok(())
110 }
111
112 fn check_outgoing_connection(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node: NodeId, _conn_id: ConnId) -> Result<(), ConnectionRejectReason> {
113 Ok(())
114 }
115
116 fn on_incoming_connection_connected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _conn: Arc<dyn ConnectionSender>) -> Option<Box<dyn ConnectionHandler<BE, HE>>> {
117 Some(Box::new(NodeAliasHandler {
118 node_id: self.node_id,
119 internal: self.internal.clone(),
120 pub_channel: self.pub_channel.clone(),
121 }))
122 }
123
124 fn on_outgoing_connection_connected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _conn: Arc<dyn ConnectionSender>) -> Option<Box<dyn ConnectionHandler<BE, HE>>> {
125 Some(Box::new(NodeAliasHandler {
126 node_id: self.node_id,
127 internal: self.internal.clone(),
128 pub_channel: self.pub_channel.clone(),
129 }))
130 }
131
132 fn on_incoming_connection_disconnected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId) {}
133
134 fn on_outgoing_connection_disconnected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId) {}
135
136 fn on_outgoing_connection_error(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId, _err: &OutgoingConnectionError) {}
137
138 fn on_handler_event(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId, _event: BE) {}
139
140 fn on_stopped(&mut self, _ctx: &BehaviorContext, _now_ms: u64) {
141 if let Some(task) = self.pubsub_task.take() {
142 async_std::task::spawn(async move {
143 task.cancel().await;
144 });
145 }
146 }
147
148 fn pop_action(&mut self) -> Option<NetworkBehaviorAction<HE, SE>> {
149 match self.internal.lock().pop_action() {
150 Some(ServiceInternalAction::Broadcast(msg)) => {
151 log::info!("[NodeAliasBehavior {}] Broadcasting: {:?}", self.node_id, msg);
152 let msg = bincode::serialize(&msg).unwrap();
153 self.pub_channel.send(Bytes::from(msg));
154 None
155 }
156 Some(ServiceInternalAction::Unicast(dest, msg)) => {
157 log::info!("[NodeAliasBehavior {}] Unicasting to {}: {:?}", self.node_id, dest, msg);
158 let header = MsgHeader::build(NODE_ALIAS_SERVICE_ID, NODE_ALIAS_SERVICE_ID, RouteRule::ToNode(dest)).set_from_node(Some(self.node_id));
159 Some(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &msg)))
160 }
161 None => None,
162 }
163 }
164}