atm0s_sdn_network/features/pubsub/
controller.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fmt::Debug,
4};
5
6use crate::{
7    base::{ConnectionEvent, Feature, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput},
8    data_plane::NetPair,
9};
10
11use self::source_hint::SourceHintLogic;
12
13use super::{
14    msg::{ChannelId, Feedback, RelayControl, RelayId, SourceHint},
15    ChannelControl, ChannelEvent, Control, Event, RelayWorkerControl, ToController, ToWorker,
16};
17
18pub const RELAY_TIMEOUT: u64 = 10_000;
19pub const RELAY_STICKY_MS: u64 = 5 * 60 * 1000; //sticky route path in 5 minutes
20
21mod consumers;
22mod feedbacks;
23mod local_relay;
24mod remote_relay;
25mod source_hint;
26
27use atm0s_sdn_identity::NodeId;
28use local_relay::LocalRelay;
29use remote_relay::RemoteRelay;
30use sans_io_runtime::TaskSwitcherChild;
31
32#[derive(Debug, PartialEq, Eq)]
33pub enum GenericRelayOutput<UserData> {
34    ToWorker(RelayWorkerControl<UserData>),
35    RouteChanged(FeatureControlActor<UserData>),
36    Feedback(Vec<FeatureControlActor<UserData>>, Feedback),
37}
38
39pub trait GenericRelay<UserData> {
40    fn on_tick(&mut self, now: u64);
41    fn on_pub_start(&mut self, actor: FeatureControlActor<UserData>);
42    fn on_pub_stop(&mut self, actor: FeatureControlActor<UserData>);
43    fn on_local_sub(&mut self, now: u64, actor: FeatureControlActor<UserData>);
44    fn on_local_feedback(&mut self, now: u64, actor: FeatureControlActor<UserData>, feedback: Feedback);
45    fn on_local_unsub(&mut self, now: u64, actor: FeatureControlActor<UserData>);
46    fn on_remote(&mut self, now: u64, remote: NetPair, control: RelayControl);
47    fn conn_disconnected(&mut self, now: u64, remote: NetPair);
48    fn should_clear(&self) -> bool;
49    fn relay_dests(&self) -> Option<(&[FeatureControlActor<UserData>], bool)>;
50    fn pop_output(&mut self) -> Option<GenericRelayOutput<UserData>>;
51}
52
53pub struct PubSubFeature<UserData> {
54    relays: HashMap<RelayId, Box<dyn GenericRelay<UserData>>>,
55    source_hints: HashMap<ChannelId, SourceHintLogic<UserData>>,
56    queue: VecDeque<FeatureOutput<UserData, Event, ToWorker<UserData>>>,
57    shutdown: bool,
58}
59
60impl<UserData: 'static + Eq + Copy + Debug> Default for PubSubFeature<UserData> {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl<UserData: 'static + Eq + Copy + Debug> PubSubFeature<UserData> {
67    pub fn new() -> Self {
68        Self {
69            relays: HashMap::new(),
70            source_hints: HashMap::new(),
71            queue: VecDeque::new(),
72            shutdown: false,
73        }
74    }
75
76    fn get_relay(&mut self, ctx: &FeatureContext, relay_id: RelayId, auto_create: bool) -> Option<&mut Box<dyn GenericRelay<UserData>>> {
77        if !self.relays.contains_key(&relay_id) && auto_create {
78            let relay: Box<dyn GenericRelay<UserData>> = if ctx.node_id == relay_id.1 {
79                log::info!("[PubSubFeatureController] Creating new LocalRelay: {:?}", relay_id);
80                Box::new(LocalRelay::default())
81            } else {
82                log::info!("[PubSubFeatureController] Creating new RemoteRelay: {:?}", relay_id);
83                Box::new(RemoteRelay::new(ctx.session))
84            };
85            self.relays.insert(relay_id, relay);
86        }
87        self.relays.get_mut(&relay_id)
88    }
89
90    fn get_source_hint(&mut self, node_id: NodeId, session: u64, channel: ChannelId, auto_create: bool) -> Option<&mut SourceHintLogic<UserData>> {
91        if !self.source_hints.contains_key(&channel) && auto_create {
92            log::info!("[PubSubFeatureController] Creating new SourceHintLogic: {}", channel);
93            self.source_hints.insert(channel, SourceHintLogic::new(node_id, session));
94        }
95        self.source_hints.get_mut(&channel)
96    }
97
98    fn on_local(&mut self, ctx: &FeatureContext, now: u64, actor: FeatureControlActor<UserData>, channel: ChannelId, control: ChannelControl) {
99        match control {
100            ChannelControl::SubAuto => {
101                log::info!("[PubSubFeatureController] SubAuto for {} from {:?}", channel, actor);
102                let sh = self.get_source_hint(ctx.node_id, ctx.session, channel, true).expect("Should create");
103                sh.on_local(now, actor, source_hint::LocalCmd::Subscribe);
104                self.pop_single_source_hint(ctx, now, channel);
105            }
106            ChannelControl::UnsubAuto => {
107                log::info!("[PubSubFeatureController] UnsubAuto for {} from {:?}", channel, actor);
108                if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
109                    sh.on_local(now, actor, source_hint::LocalCmd::Unsubscribe);
110                    self.pop_single_source_hint(ctx, now, channel);
111                }
112            }
113            ChannelControl::PubStart => {
114                log::info!("[PubSubFeatureController] PubStart for {} from {:?}", channel, actor);
115                let relay_id = RelayId(channel, ctx.node_id);
116                let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
117                relay.on_pub_start(actor);
118                Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
119
120                let sh = self.get_source_hint(ctx.node_id, ctx.session, channel, true).expect("Should create");
121                sh.on_local(now, actor, source_hint::LocalCmd::Register);
122                self.pop_single_source_hint(ctx, now, channel);
123            }
124            ChannelControl::PubStop => {
125                log::info!("[PubSubFeatureController] PubStop for {} from {:?}", channel, actor);
126                let relay_id = RelayId(channel, ctx.node_id);
127                if let Some(relay) = self.relays.get_mut(&relay_id) {
128                    relay.on_pub_stop(actor);
129                    Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
130                }
131
132                if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
133                    sh.on_local(now, actor, source_hint::LocalCmd::Unregister);
134                    self.pop_single_source_hint(ctx, now, channel);
135                }
136            }
137            ChannelControl::SubSource(source) => {
138                log::info!("[PubSubFeatureController] SubSource(source) for {} from {:?}", channel, actor);
139                let relay_id = RelayId(channel, source);
140                let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
141                log::debug!("[PubSubFeatureController] Sub for {:?} from {:?}", relay_id, actor);
142                relay.on_local_sub(now, actor);
143                Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
144            }
145            ChannelControl::FeedbackAuto(fb) => {
146                if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
147                    for source in sh.sources() {
148                        let relay_id = RelayId(channel, source);
149                        let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
150                        log::debug!("[PubSubFeatureController] Feedback for {:?} from {:?}", relay_id, actor);
151                        relay.on_local_feedback(now, actor, fb);
152                        Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
153                    }
154                }
155            }
156            ChannelControl::UnsubSource(source) => {
157                log::info!("[PubSubFeatureController] UnsubSource(source) for {} from {:?}", channel, actor);
158                let relay_id = RelayId(channel, source);
159                if let Some(relay) = self.relays.get_mut(&relay_id) {
160                    log::debug!("[PubSubFeatureController] Unsub for {:?} from {:?}", relay_id, actor);
161                    relay.on_local_unsub(now, actor);
162                    Self::pop_single_relay(relay_id, relay, &mut self.queue);
163                    if relay.should_clear() {
164                        self.relays.remove(&relay_id);
165                    }
166                } else {
167                    log::warn!("[PubSubFeatureController] Unsub for unknown relay {:?}", relay_id);
168                }
169            }
170            ChannelControl::PubData(data) => {
171                let relay_id = RelayId(channel, ctx.node_id);
172                if let Some(relay) = self.relays.get(&relay_id) {
173                    if let Some((locals, has_remote)) = relay.relay_dests() {
174                        log::debug!(
175                            "[PubSubFeatureController] Pub for {:?} from {:?} to {:?} locals, has remote {has_remote}",
176                            relay_id,
177                            actor,
178                            locals.len()
179                        );
180                        for local in locals {
181                            self.queue.push_back(FeatureOutput::Event(*local, Event(channel, ChannelEvent::SourceData(ctx.node_id, data.clone()))));
182                        }
183
184                        if has_remote {
185                            self.queue.push_back(FeatureOutput::ToWorker(true, ToWorker::RelayData(relay_id, data)));
186                        }
187                    } else {
188                        log::debug!("[PubSubFeatureController] No subscribers for {:?}, dropping data from {:?}", relay_id, actor)
189                    }
190                } else {
191                    log::warn!("[PubSubFeatureController] Pub for unknown relay {:?}", relay_id);
192                }
193            }
194        }
195    }
196
197    fn on_remote_relay_control(&mut self, ctx: &FeatureContext, now: u64, remote: NetPair, relay_id: RelayId, control: RelayControl) {
198        if self.get_relay(ctx, relay_id, control.should_create()).is_some() {
199            let relay: &mut Box<dyn GenericRelay<UserData>> = self.relays.get_mut(&relay_id).expect("Should have relay");
200            log::debug!("[PubSubFeatureController] Remote control for {:?} from {:?}: {:?}", relay_id, remote, control);
201            relay.on_remote(now, remote, control);
202            Self::pop_single_relay(relay_id, relay, &mut self.queue);
203            if relay.should_clear() {
204                self.relays.remove(&relay_id);
205            }
206        } else {
207            log::warn!("[PubSubFeatureController] Remote control for unknown relay {:?}", relay_id);
208        }
209    }
210
211    fn on_remote_source_hint_control(&mut self, ctx: &FeatureContext, now: u64, remote: NetPair, channel: ChannelId, control: SourceHint) {
212        if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, control.should_create()) {
213            log::debug!("[PubSubFeatureController] SourceHint control for {:?} from {:?}: {:?}", channel, remote, control);
214            sh.on_remote(now, remote, control);
215            self.pop_single_source_hint(ctx, now, channel);
216
217            let sh = self.source_hints.get_mut(&channel).expect("Should have source hint");
218            if sh.should_clear() {
219                self.source_hints.remove(&channel);
220            }
221        } else {
222            log::warn!("[PubSubFeatureController] Remote control for unknown channel {:?}", channel);
223        }
224    }
225
226    fn pop_single_relay(relay_id: RelayId, relay: &mut Box<dyn GenericRelay<UserData>>, queue: &mut VecDeque<FeatureOutput<UserData, Event, ToWorker<UserData>>>) {
227        while let Some(control) = relay.pop_output() {
228            match control {
229                GenericRelayOutput::ToWorker(control) => queue.push_back(FeatureOutput::ToWorker(true, ToWorker::RelayControl(relay_id, control))),
230                GenericRelayOutput::RouteChanged(actor) => queue.push_back(FeatureOutput::Event(actor, Event(relay_id.0, ChannelEvent::RouteChanged(relay_id.1)))),
231                GenericRelayOutput::Feedback(actors, fb) => {
232                    log::debug!("[PubsubController] Feedback for {:?} {:?} to actors {:?}", relay_id, fb, actors);
233                    for actor in actors {
234                        queue.push_back(FeatureOutput::Event(actor, Event(relay_id.0, ChannelEvent::FeedbackData(fb))));
235                    }
236                }
237            };
238        }
239    }
240
241    fn pop_single_source_hint(&mut self, ctx: &FeatureContext, now: u64, channel: ChannelId) {
242        loop {
243            let sh = self.source_hints.get_mut(&channel).expect("Should have source hint");
244            let out = if let Some(out) = sh.pop_output() {
245                out
246            } else {
247                return;
248            };
249            match out {
250                source_hint::Output::SendRemote(dest, control) => {
251                    self.queue.push_back(FeatureOutput::ToWorker(true, ToWorker::SourceHint(channel, dest, control)));
252                }
253                source_hint::Output::SubscribeSource(actors, source) => {
254                    for actor in actors {
255                        self.on_local(ctx, now, actor, channel, ChannelControl::SubSource(source));
256                    }
257                }
258                source_hint::Output::UnsubscribeSource(actors, source) => {
259                    for actor in actors {
260                        self.on_local(ctx, now, actor, channel, ChannelControl::UnsubSource(source));
261                    }
262                }
263            }
264        }
265    }
266}
267
268impl<UserData: 'static + Eq + Copy + Debug> Feature<UserData, Control, Event, ToController, ToWorker<UserData>> for PubSubFeature<UserData> {
269    fn on_shared_input(&mut self, ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
270        match input {
271            FeatureSharedInput::Tick(_) => {
272                let mut clears = vec![];
273                for (relay_id, relay) in self.relays.iter_mut() {
274                    if relay.should_clear() {
275                        clears.push(*relay_id);
276                    } else {
277                        relay.on_tick(now);
278                        Self::pop_single_relay(*relay_id, relay, &mut self.queue);
279                    }
280                }
281                for relay_id in clears {
282                    self.relays.remove(&relay_id);
283                }
284
285                let mut clears = vec![];
286                let mut not_clears = vec![];
287                for (channel, sh) in self.source_hints.iter_mut() {
288                    if sh.should_clear() {
289                        clears.push(*channel);
290                    } else {
291                        sh.on_tick(now);
292                        not_clears.push(*channel);
293                    }
294                }
295                for channel in clears {
296                    self.source_hints.remove(&channel);
297                }
298                for channel in not_clears {
299                    self.pop_single_source_hint(ctx, now, channel);
300                }
301            }
302            FeatureSharedInput::Connection(event) => {
303                if let ConnectionEvent::Disconnected(ctx) = event {
304                    for (relay_id, relay) in self.relays.iter_mut() {
305                        relay.conn_disconnected(now, ctx.pair);
306                        Self::pop_single_relay(*relay_id, relay, &mut self.queue);
307                    }
308                }
309            }
310        }
311    }
312
313    fn on_input(&mut self, ctx: &FeatureContext, now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
314        match input {
315            FeatureInput::FromWorker(ToController::RelayControl(remote, relay_id, control)) => {
316                self.on_remote_relay_control(ctx, now_ms, remote, relay_id, control);
317            }
318            FeatureInput::FromWorker(ToController::SourceHint(remote, channel, control)) => {
319                self.on_remote_source_hint_control(ctx, now_ms, remote, channel, control);
320            }
321            FeatureInput::Control(actor, Control(channel, control)) => {
322                self.on_local(ctx, now_ms, actor, channel, control);
323            }
324            _ => panic!("Unexpected input"),
325        }
326    }
327
328    fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
329        log::info!("[PubSubFeatureWorker] Shutdown");
330        self.shutdown = true;
331    }
332}
333
334impl<UserData> TaskSwitcherChild<FeatureOutput<UserData, Event, ToWorker<UserData>>> for PubSubFeature<UserData> {
335    type Time = u64;
336
337    fn is_empty(&self) -> bool {
338        self.shutdown && self.queue.is_empty()
339    }
340
341    fn empty_event(&self) -> FeatureOutput<UserData, Event, ToWorker<UserData>> {
342        FeatureOutput::OnResourceEmpty
343    }
344
345    fn pop_output(&mut self, _now: u64) -> Option<FeatureOutput<UserData, Event, ToWorker<UserData>>> {
346        self.queue.pop_front()
347    }
348}