atm0s_sdn_node_alias/
behavior.rs

1use std::{collections::VecDeque, sync::Arc};
2
3use async_std::task::JoinHandle;
4use atm0s_sdn_identity::{ConnId, NodeId};
5use atm0s_sdn_network::{
6    behaviour::{BehaviorContext, ConnectionHandler, NetworkBehavior, NetworkBehaviorAction},
7    msg::{MsgHeader, TransportMsg},
8    transport::{ConnectionRejectReason, ConnectionSender, OutgoingConnectionError},
9};
10use atm0s_sdn_pub_sub::{Publisher, PubsubSdk};
11use atm0s_sdn_router::RouteRule;
12use bytes::Bytes;
13use parking_lot::Mutex;
14
15use crate::{
16    handler::NodeAliasHandler,
17    internal::{ServiceInternal, ServiceInternalAction},
18    msg::{BroadcastMsg, SdkControl},
19    sdk::NodeAliasSdk,
20    NODE_ALIAS_SERVICE_ID,
21};
22
23const NODE_ALIAS_BROADCAST_CHANNEL: u32 = 0x13ba2c; //TODO hash of "atm0s.node_alias.broadcast"
24
25pub struct NodeAliasBehavior {
26    node_id: NodeId,
27    pubsub_sdk: PubsubSdk,
28    pub_channel: Arc<Publisher>,
29    pubsub_task: Option<JoinHandle<()>>,
30    incomming_broadcast_queue: Arc<Mutex<VecDeque<(NodeId, BroadcastMsg)>>>,
31    sdk: NodeAliasSdk,
32    internal: Arc<Mutex<ServiceInternal>>,
33}
34
35impl NodeAliasBehavior {
36    pub fn new(node_id: NodeId, pubsub_sdk: PubsubSdk) -> (Self, NodeAliasSdk) {
37        let sdk = NodeAliasSdk::default();
38        let instance = Self {
39            node_id,
40            pub_channel: Arc::new(pubsub_sdk.create_publisher(NODE_ALIAS_BROADCAST_CHANNEL)),
41            pubsub_sdk,
42            pubsub_task: None,
43            incomming_broadcast_queue: Arc::new(Mutex::new(VecDeque::new())),
44            sdk: sdk.clone(),
45            internal: Arc::new(Mutex::new(ServiceInternal::new(node_id))),
46        };
47
48        (instance, sdk)
49    }
50}
51
52impl<BE, HE, SE> NetworkBehavior<BE, HE, SE> for NodeAliasBehavior {
53    fn service_id(&self) -> u8 {
54        NODE_ALIAS_SERVICE_ID
55    }
56
57    fn on_started(&mut self, ctx: &BehaviorContext, _now_ms: u64) {
58        self.sdk.set_awaker(ctx.awaker.clone());
59        let node_id = self.node_id;
60        let sub_channel = self.pubsub_sdk.create_consumer(NODE_ALIAS_BROADCAST_CHANNEL, None);
61        let awaker = ctx.awaker.clone();
62        let incomming_broadcast_msg = self.incomming_broadcast_queue.clone();
63        self.pubsub_task = Some(async_std::task::spawn(async move {
64            loop {
65                if let Some((_, source, _, msg)) = sub_channel.recv().await {
66                    if source == node_id {
67                        continue;
68                    }
69                    if let Ok(msg) = bincode::deserialize(&msg) {
70                        incomming_broadcast_msg.lock().push_back((source, msg));
71                        awaker.notify();
72                    }
73                }
74            }
75        }));
76    }
77
78    fn on_tick(&mut self, _ctx: &BehaviorContext, now_ms: u64, _interval_ms: u64) {
79        self.internal.lock().on_tick(now_ms);
80    }
81
82    fn on_awake(&mut self, _ctx: &BehaviorContext, now_ms: u64) {
83        let mut incomming_broadcast_msg = self.incomming_broadcast_queue.lock();
84        while let Some((source, msg)) = incomming_broadcast_msg.pop_front() {
85            self.internal.lock().on_incomming_broadcast(now_ms, source, msg);
86        }
87        while let Some(msg) = self.sdk.pop_control() {
88            match msg {
89                SdkControl::Register(alias) => {
90                    self.internal.lock().register(now_ms, alias);
91                }
92                SdkControl::Unregister(alias) => {
93                    self.internal.lock().unregister(now_ms, &alias);
94                }
95                SdkControl::Query(alias, sender) => {
96                    self.internal.lock().find_alias(now_ms, &alias, sender);
97                }
98            }
99        }
100    }
101
102    fn on_sdk_msg(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _from_service: u8, _event: SE) {}
103
104    fn on_local_msg(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _msg: TransportMsg) {
105        panic!("Should not happend");
106    }
107
108    fn check_incoming_connection(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node: NodeId, _conn_id: ConnId) -> Result<(), ConnectionRejectReason> {
109        Ok(())
110    }
111
112    fn check_outgoing_connection(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node: NodeId, _conn_id: ConnId) -> Result<(), ConnectionRejectReason> {
113        Ok(())
114    }
115
116    fn on_incoming_connection_connected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _conn: Arc<dyn ConnectionSender>) -> Option<Box<dyn ConnectionHandler<BE, HE>>> {
117        Some(Box::new(NodeAliasHandler {
118            node_id: self.node_id,
119            internal: self.internal.clone(),
120            pub_channel: self.pub_channel.clone(),
121        }))
122    }
123
124    fn on_outgoing_connection_connected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _conn: Arc<dyn ConnectionSender>) -> Option<Box<dyn ConnectionHandler<BE, HE>>> {
125        Some(Box::new(NodeAliasHandler {
126            node_id: self.node_id,
127            internal: self.internal.clone(),
128            pub_channel: self.pub_channel.clone(),
129        }))
130    }
131
132    fn on_incoming_connection_disconnected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId) {}
133
134    fn on_outgoing_connection_disconnected(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId) {}
135
136    fn on_outgoing_connection_error(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId, _err: &OutgoingConnectionError) {}
137
138    fn on_handler_event(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _node_id: NodeId, _conn_id: ConnId, _event: BE) {}
139
140    fn on_stopped(&mut self, _ctx: &BehaviorContext, _now_ms: u64) {
141        if let Some(task) = self.pubsub_task.take() {
142            async_std::task::spawn(async move {
143                task.cancel().await;
144            });
145        }
146    }
147
148    fn pop_action(&mut self) -> Option<NetworkBehaviorAction<HE, SE>> {
149        match self.internal.lock().pop_action() {
150            Some(ServiceInternalAction::Broadcast(msg)) => {
151                log::info!("[NodeAliasBehavior {}] Broadcasting: {:?}", self.node_id, msg);
152                let msg = bincode::serialize(&msg).unwrap();
153                self.pub_channel.send(Bytes::from(msg));
154                None
155            }
156            Some(ServiceInternalAction::Unicast(dest, msg)) => {
157                log::info!("[NodeAliasBehavior {}] Unicasting to {}: {:?}", self.node_id, dest, msg);
158                let header = MsgHeader::build(NODE_ALIAS_SERVICE_ID, NODE_ALIAS_SERVICE_ID, RouteRule::ToNode(dest)).set_from_node(Some(self.node_id));
159                Some(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &msg)))
160            }
161            None => None,
162        }
163    }
164}