atm0s_sdn_network/features/pubsub/
worker.rs

1use std::{collections::HashMap, fmt::Debug};
2
3use atm0s_sdn_identity::ConnId;
4use atm0s_sdn_router::{RouteAction, RouterTable};
5use sans_io_runtime::{collections::DynamicDeque, return_if_err, return_if_none, TaskSwitcherChild};
6
7use crate::{
8    base::{Buffer, FeatureControlActor, FeatureWorker, FeatureWorkerContext, FeatureWorkerInput, FeatureWorkerOutput, TransportMsgHeader},
9    data_plane::NetPair,
10};
11
12use super::{
13    msg::{PubsubMessage, RelayControl, RelayId},
14    ChannelControl, ChannelEvent, Control, Event, RelayWorkerControl, ToController, ToWorker,
15};
16
17struct WorkerRelay<UserData> {
18    source: Option<NetPair>,
19    locals: Vec<FeatureControlActor<UserData>>,
20    remotes: Vec<NetPair>,
21    remotes_uuid: HashMap<NetPair, u64>,
22}
23
24impl<UserData> WorkerRelay<UserData> {
25    pub fn is_empty(&self) -> bool {
26        self.locals.is_empty() && self.remotes.is_empty()
27    }
28}
29
30pub struct PubSubFeatureWorker<UserData> {
31    relays: HashMap<RelayId, WorkerRelay<UserData>>,
32    queue: DynamicDeque<FeatureWorkerOutput<UserData, Control, Event, ToController>, 16>,
33    shutdown: bool,
34}
35
36impl<UserData> Default for PubSubFeatureWorker<UserData> {
37    fn default() -> Self {
38        Self {
39            relays: HashMap::new(),
40            queue: Default::default(),
41            shutdown: false,
42        }
43    }
44}
45
46impl<UserData: Eq + Copy + Debug> FeatureWorker<UserData, Control, Event, ToController, ToWorker<UserData>> for PubSubFeatureWorker<UserData> {
47    fn on_network_raw(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64, _conn: ConnId, remote: NetPair, _header: TransportMsgHeader, buf: Buffer) {
48        log::debug!("[PubSubWorker] on_network_raw from {}", remote);
49        let msg = return_if_err!(PubsubMessage::try_from(&buf as &[u8]));
50        match msg {
51            PubsubMessage::Control(relay_id, control) => {
52                log::debug!("[PubSubWorker] received PubsubMessage::RelayControl({:?}, {:?})", relay_id, control);
53                self.queue.push_back(FeatureWorkerOutput::ToController(ToController::RelayControl(remote, relay_id, control)));
54            }
55            PubsubMessage::SourceHint(channel, control) => {
56                log::debug!("[PubSubWorker] received PubsubMessage::SourceHint({:?}, {:?})", channel, control);
57                self.queue.push_back(FeatureWorkerOutput::ToController(ToController::SourceHint(remote, channel, control)));
58            }
59            PubsubMessage::Data(relay_id, data) => {
60                log::debug!("[PubSubWorker] received PubsubMessage::Data({:?}, size {})", relay_id, data.len());
61                let relay = return_if_none!(self.relays.get(&relay_id));
62                // only relay from trusted source
63                if relay.source == Some(remote) {
64                    for actor in &relay.locals {
65                        self.queue
66                            .push_back(FeatureWorkerOutput::Event(*actor, Event(relay_id.0, ChannelEvent::SourceData(relay_id.1, data.to_vec()))));
67                    }
68
69                    if !relay.remotes.is_empty() {
70                        let control = PubsubMessage::Data(relay_id, data);
71                        //TODO avoid copy
72                        self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
73                    }
74                } else {
75                    log::warn!("[PubsubWorker] Relay from untrusted source local {:?} != remote {}", relay.source, remote);
76                }
77            }
78        }
79    }
80
81    fn on_input(&mut self, ctx: &mut FeatureWorkerContext, _now: u64, input: FeatureWorkerInput<UserData, Control, ToWorker<UserData>>) {
82        match input {
83            FeatureWorkerInput::FromController(_, ToWorker::RelayControl(relay_id, control)) => match control {
84                RelayWorkerControl::SendSub(uuid, remote) => {
85                    let dest = if let Some(remote) = remote {
86                        log::debug!("[PubsubWorker] SendSub for {:?} to {}", relay_id, remote);
87                        remote
88                    } else if let Some(next) = ctx.router.next(relay_id.1) {
89                        log::debug!("[PubsubWorker] SendSub for {:?} to {} by select from router table", relay_id, next);
90                        next
91                    } else {
92                        log::warn!("[PubsubWorker] SendSub: no route for {:?}", relay_id);
93                        return;
94                    };
95                    let control = PubsubMessage::Control(relay_id, RelayControl::Sub(uuid));
96                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(dest, control.into()));
97                }
98                RelayWorkerControl::SendFeedback(fb, remote) => {
99                    log::debug!("[PubsubWorker] SendFeedback for {:?} to {:?}", relay_id, remote);
100                    let control = PubsubMessage::Control(relay_id, RelayControl::Feedback(fb));
101                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
102                }
103                RelayWorkerControl::SendUnsub(uuid, remote) => {
104                    log::debug!("[PubsubWorker] SendUnsub for {:?} to {:?}", relay_id, remote);
105                    let control = PubsubMessage::Control(relay_id, RelayControl::Unsub(uuid));
106                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
107                }
108                RelayWorkerControl::SendSubOk(uuid, remote) => {
109                    log::debug!("[PubsubWorker] SendSubOk for {:?} to {:?}", relay_id, remote);
110                    let control = PubsubMessage::Control(relay_id, RelayControl::SubOK(uuid));
111                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
112                }
113                RelayWorkerControl::SendUnsubOk(uuid, remote) => {
114                    log::debug!("[PubsubWorker] SendUnsubOk for {:?} to {:?}", relay_id, remote);
115                    let control = PubsubMessage::Control(relay_id, RelayControl::UnsubOK(uuid));
116                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
117                }
118                RelayWorkerControl::SendRouteChanged => {
119                    let relay = return_if_none!(self.relays.get(&relay_id));
120                    log::debug!("[PubsubWorker] SendRouteChanged for {:?} to remotes {:?}", relay_id, relay.remotes);
121                    for (addr, uuid) in relay.remotes_uuid.iter() {
122                        let control = PubsubMessage::Control(relay_id, RelayControl::RouteChanged(*uuid));
123                        self.queue.push_back(FeatureWorkerOutput::RawDirect2(*addr, control.into()));
124                    }
125                }
126                RelayWorkerControl::RouteSetSource(source) => {
127                    log::info!("[PubsubWorker] RouteSetSource for {:?} to {:?}", relay_id, source);
128                    let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
129                        source: None,
130                        locals: vec![],
131                        remotes: vec![],
132                        remotes_uuid: HashMap::new(),
133                    });
134
135                    entry.source = Some(source);
136                }
137                RelayWorkerControl::RouteDelSource(source) => {
138                    log::info!("[PubsubWorker] RouteDelSource for {:?} to {:?}", relay_id, source);
139                    if let Some(entry) = self.relays.get_mut(&relay_id) {
140                        if entry.source == Some(source) {
141                            entry.source = None;
142                        } else {
143                            log::warn!("[PubsubWorker] RelayDel: relay {:?} source mismatch locked {:?} vs {}", relay_id, entry.source, source);
144                        }
145                    } else {
146                        log::warn!("[PubsubWorker] RelayDel: relay not found {:?}", relay_id);
147                    }
148                }
149                RelayWorkerControl::RouteSetLocal(actor) => {
150                    log::debug!("[PubsubWorker] RouteSetLocal for {:?} to {:?}", relay_id, actor);
151                    let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
152                        source: None,
153                        locals: vec![],
154                        remotes: vec![],
155                        remotes_uuid: HashMap::new(),
156                    });
157
158                    entry.locals.push(actor);
159                }
160                RelayWorkerControl::RouteDelLocal(actor) => {
161                    log::debug!("[PubsubWorker] RouteDelLocal for {:?} to {:?}", relay_id, actor);
162                    if let Some(entry) = self.relays.get_mut(&relay_id) {
163                        if let Some(pos) = entry.locals.iter().position(|x| *x == actor) {
164                            entry.locals.swap_remove(pos);
165                        }
166                        if entry.is_empty() {
167                            self.relays.remove(&relay_id);
168                        }
169                    } else {
170                        log::warn!("[PubsubWorker] RelayDelLocal: relay not found {:?}", relay_id);
171                    }
172                }
173                RelayWorkerControl::RouteSetRemote(remote, uuid) => {
174                    log::debug!("[PubsubWorker] RouteSetRemote for {:?} to {:?}", relay_id, remote);
175                    let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
176                        source: None,
177                        locals: vec![],
178                        remotes: vec![],
179                        remotes_uuid: HashMap::new(),
180                    });
181
182                    entry.remotes.push(remote);
183                    entry.remotes_uuid.insert(remote, uuid);
184                }
185                RelayWorkerControl::RouteDelRemote(remote) => {
186                    log::debug!("[PubsubWorker] RouteDelRemote for {:?} to {:?}", relay_id, remote);
187                    if let Some(entry) = self.relays.get_mut(&relay_id) {
188                        if let Some(pos) = entry.remotes.iter().position(|x| *x == remote) {
189                            entry.remotes.swap_remove(pos);
190                        }
191                        if entry.is_empty() {
192                            self.relays.remove(&relay_id);
193                        }
194                    } else {
195                        log::warn!("RelayDelSub: relay not found {:?}", relay_id);
196                    }
197                }
198            },
199            FeatureWorkerInput::FromController(_, ToWorker::SourceHint(channel, remote, data)) => {
200                if let Some(remote) = remote {
201                    let control = PubsubMessage::SourceHint(channel, data);
202                    self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
203                } else {
204                    let next = ctx.router.path_to_key(*channel as u32);
205                    if let RouteAction::Next(remote) = next {
206                        let control = PubsubMessage::SourceHint(channel, data);
207                        self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
208                    }
209                }
210            }
211            FeatureWorkerInput::FromController(_, ToWorker::RelayData(relay_id, data)) => {
212                let relay = return_if_none!(self.relays.get(&relay_id));
213                if relay.remotes.is_empty() {
214                    log::warn!("RelayData: no remote for {:?}", relay_id);
215                    return;
216                }
217                let control = PubsubMessage::Data(relay_id, data);
218                self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
219            }
220            FeatureWorkerInput::Control(actor, control) => match control {
221                Control(channel, ChannelControl::PubData(data)) => {
222                    let relay_id = RelayId(channel, ctx.node_id);
223                    let relay = return_if_none!(self.relays.get(&relay_id));
224
225                    for actor in &relay.locals {
226                        self.queue
227                            .push_back(FeatureWorkerOutput::Event(*actor, Event(channel, ChannelEvent::SourceData(ctx.node_id, data.clone()))));
228                    }
229
230                    if !relay.remotes.is_empty() {
231                        let control = PubsubMessage::Data(relay_id, data);
232                        self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
233                    }
234                }
235                _ => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
236            },
237            _ => {}
238        }
239    }
240
241    fn on_shutdown(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64) {
242        self.shutdown = true;
243    }
244}
245
246impl<UserData> TaskSwitcherChild<FeatureWorkerOutput<UserData, Control, Event, ToController>> for PubSubFeatureWorker<UserData> {
247    type Time = u64;
248
249    fn is_empty(&self) -> bool {
250        self.shutdown && self.queue.is_empty()
251    }
252
253    fn empty_event(&self) -> FeatureWorkerOutput<UserData, Control, Event, ToController> {
254        FeatureWorkerOutput::OnResourceEmpty
255    }
256
257    fn pop_output(&mut self, _now: u64) -> Option<FeatureWorkerOutput<UserData, Control, Event, ToController>> {
258        self.queue.pop_front()
259    }
260}