1use std::{
2 collections::{HashMap, VecDeque},
3 fmt::Debug,
4};
5
6use crate::{
7 base::{ConnectionEvent, Feature, FeatureContext, FeatureControlActor, FeatureInput, FeatureOutput, FeatureSharedInput},
8 data_plane::NetPair,
9};
10
11use self::source_hint::SourceHintLogic;
12
13use super::{
14 msg::{ChannelId, Feedback, RelayControl, RelayId, SourceHint},
15 ChannelControl, ChannelEvent, Control, Event, RelayWorkerControl, ToController, ToWorker,
16};
17
18pub const RELAY_TIMEOUT: u64 = 10_000;
19pub const RELAY_STICKY_MS: u64 = 5 * 60 * 1000; mod consumers;
22mod feedbacks;
23mod local_relay;
24mod remote_relay;
25mod source_hint;
26
27use atm0s_sdn_identity::NodeId;
28use local_relay::LocalRelay;
29use remote_relay::RemoteRelay;
30use sans_io_runtime::TaskSwitcherChild;
31
32#[derive(Debug, PartialEq, Eq)]
33pub enum GenericRelayOutput<UserData> {
34 ToWorker(RelayWorkerControl<UserData>),
35 RouteChanged(FeatureControlActor<UserData>),
36 Feedback(Vec<FeatureControlActor<UserData>>, Feedback),
37}
38
39pub trait GenericRelay<UserData> {
40 fn on_tick(&mut self, now: u64);
41 fn on_pub_start(&mut self, actor: FeatureControlActor<UserData>);
42 fn on_pub_stop(&mut self, actor: FeatureControlActor<UserData>);
43 fn on_local_sub(&mut self, now: u64, actor: FeatureControlActor<UserData>);
44 fn on_local_feedback(&mut self, now: u64, actor: FeatureControlActor<UserData>, feedback: Feedback);
45 fn on_local_unsub(&mut self, now: u64, actor: FeatureControlActor<UserData>);
46 fn on_remote(&mut self, now: u64, remote: NetPair, control: RelayControl);
47 fn conn_disconnected(&mut self, now: u64, remote: NetPair);
48 fn should_clear(&self) -> bool;
49 fn relay_dests(&self) -> Option<(&[FeatureControlActor<UserData>], bool)>;
50 fn pop_output(&mut self) -> Option<GenericRelayOutput<UserData>>;
51}
52
53pub struct PubSubFeature<UserData> {
54 relays: HashMap<RelayId, Box<dyn GenericRelay<UserData>>>,
55 source_hints: HashMap<ChannelId, SourceHintLogic<UserData>>,
56 queue: VecDeque<FeatureOutput<UserData, Event, ToWorker<UserData>>>,
57 shutdown: bool,
58}
59
60impl<UserData: 'static + Eq + Copy + Debug> Default for PubSubFeature<UserData> {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl<UserData: 'static + Eq + Copy + Debug> PubSubFeature<UserData> {
67 pub fn new() -> Self {
68 Self {
69 relays: HashMap::new(),
70 source_hints: HashMap::new(),
71 queue: VecDeque::new(),
72 shutdown: false,
73 }
74 }
75
76 fn get_relay(&mut self, ctx: &FeatureContext, relay_id: RelayId, auto_create: bool) -> Option<&mut Box<dyn GenericRelay<UserData>>> {
77 if !self.relays.contains_key(&relay_id) && auto_create {
78 let relay: Box<dyn GenericRelay<UserData>> = if ctx.node_id == relay_id.1 {
79 log::info!("[PubSubFeatureController] Creating new LocalRelay: {:?}", relay_id);
80 Box::new(LocalRelay::default())
81 } else {
82 log::info!("[PubSubFeatureController] Creating new RemoteRelay: {:?}", relay_id);
83 Box::new(RemoteRelay::new(ctx.session))
84 };
85 self.relays.insert(relay_id, relay);
86 }
87 self.relays.get_mut(&relay_id)
88 }
89
90 fn get_source_hint(&mut self, node_id: NodeId, session: u64, channel: ChannelId, auto_create: bool) -> Option<&mut SourceHintLogic<UserData>> {
91 if !self.source_hints.contains_key(&channel) && auto_create {
92 log::info!("[PubSubFeatureController] Creating new SourceHintLogic: {}", channel);
93 self.source_hints.insert(channel, SourceHintLogic::new(node_id, session));
94 }
95 self.source_hints.get_mut(&channel)
96 }
97
98 fn on_local(&mut self, ctx: &FeatureContext, now: u64, actor: FeatureControlActor<UserData>, channel: ChannelId, control: ChannelControl) {
99 match control {
100 ChannelControl::SubAuto => {
101 log::info!("[PubSubFeatureController] SubAuto for {} from {:?}", channel, actor);
102 let sh = self.get_source_hint(ctx.node_id, ctx.session, channel, true).expect("Should create");
103 sh.on_local(now, actor, source_hint::LocalCmd::Subscribe);
104 self.pop_single_source_hint(ctx, now, channel);
105 }
106 ChannelControl::UnsubAuto => {
107 log::info!("[PubSubFeatureController] UnsubAuto for {} from {:?}", channel, actor);
108 if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
109 sh.on_local(now, actor, source_hint::LocalCmd::Unsubscribe);
110 self.pop_single_source_hint(ctx, now, channel);
111 }
112 }
113 ChannelControl::PubStart => {
114 log::info!("[PubSubFeatureController] PubStart for {} from {:?}", channel, actor);
115 let relay_id = RelayId(channel, ctx.node_id);
116 let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
117 relay.on_pub_start(actor);
118 Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
119
120 let sh = self.get_source_hint(ctx.node_id, ctx.session, channel, true).expect("Should create");
121 sh.on_local(now, actor, source_hint::LocalCmd::Register);
122 self.pop_single_source_hint(ctx, now, channel);
123 }
124 ChannelControl::PubStop => {
125 log::info!("[PubSubFeatureController] PubStop for {} from {:?}", channel, actor);
126 let relay_id = RelayId(channel, ctx.node_id);
127 if let Some(relay) = self.relays.get_mut(&relay_id) {
128 relay.on_pub_stop(actor);
129 Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
130 }
131
132 if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
133 sh.on_local(now, actor, source_hint::LocalCmd::Unregister);
134 self.pop_single_source_hint(ctx, now, channel);
135 }
136 }
137 ChannelControl::SubSource(source) => {
138 log::info!("[PubSubFeatureController] SubSource(source) for {} from {:?}", channel, actor);
139 let relay_id = RelayId(channel, source);
140 let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
141 log::debug!("[PubSubFeatureController] Sub for {:?} from {:?}", relay_id, actor);
142 relay.on_local_sub(now, actor);
143 Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
144 }
145 ChannelControl::FeedbackAuto(fb) => {
146 if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, false) {
147 for source in sh.sources() {
148 let relay_id = RelayId(channel, source);
149 let relay = self.get_relay(ctx, relay_id, true).expect("Should create");
150 log::debug!("[PubSubFeatureController] Feedback for {:?} from {:?}", relay_id, actor);
151 relay.on_local_feedback(now, actor, fb);
152 Self::pop_single_relay(relay_id, self.relays.get_mut(&relay_id).expect("Should have"), &mut self.queue);
153 }
154 }
155 }
156 ChannelControl::UnsubSource(source) => {
157 log::info!("[PubSubFeatureController] UnsubSource(source) for {} from {:?}", channel, actor);
158 let relay_id = RelayId(channel, source);
159 if let Some(relay) = self.relays.get_mut(&relay_id) {
160 log::debug!("[PubSubFeatureController] Unsub for {:?} from {:?}", relay_id, actor);
161 relay.on_local_unsub(now, actor);
162 Self::pop_single_relay(relay_id, relay, &mut self.queue);
163 if relay.should_clear() {
164 self.relays.remove(&relay_id);
165 }
166 } else {
167 log::warn!("[PubSubFeatureController] Unsub for unknown relay {:?}", relay_id);
168 }
169 }
170 ChannelControl::PubData(data) => {
171 let relay_id = RelayId(channel, ctx.node_id);
172 if let Some(relay) = self.relays.get(&relay_id) {
173 if let Some((locals, has_remote)) = relay.relay_dests() {
174 log::debug!(
175 "[PubSubFeatureController] Pub for {:?} from {:?} to {:?} locals, has remote {has_remote}",
176 relay_id,
177 actor,
178 locals.len()
179 );
180 for local in locals {
181 self.queue.push_back(FeatureOutput::Event(*local, Event(channel, ChannelEvent::SourceData(ctx.node_id, data.clone()))));
182 }
183
184 if has_remote {
185 self.queue.push_back(FeatureOutput::ToWorker(true, ToWorker::RelayData(relay_id, data)));
186 }
187 } else {
188 log::debug!("[PubSubFeatureController] No subscribers for {:?}, dropping data from {:?}", relay_id, actor)
189 }
190 } else {
191 log::warn!("[PubSubFeatureController] Pub for unknown relay {:?}", relay_id);
192 }
193 }
194 }
195 }
196
197 fn on_remote_relay_control(&mut self, ctx: &FeatureContext, now: u64, remote: NetPair, relay_id: RelayId, control: RelayControl) {
198 if self.get_relay(ctx, relay_id, control.should_create()).is_some() {
199 let relay: &mut Box<dyn GenericRelay<UserData>> = self.relays.get_mut(&relay_id).expect("Should have relay");
200 log::debug!("[PubSubFeatureController] Remote control for {:?} from {:?}: {:?}", relay_id, remote, control);
201 relay.on_remote(now, remote, control);
202 Self::pop_single_relay(relay_id, relay, &mut self.queue);
203 if relay.should_clear() {
204 self.relays.remove(&relay_id);
205 }
206 } else {
207 log::warn!("[PubSubFeatureController] Remote control for unknown relay {:?}", relay_id);
208 }
209 }
210
211 fn on_remote_source_hint_control(&mut self, ctx: &FeatureContext, now: u64, remote: NetPair, channel: ChannelId, control: SourceHint) {
212 if let Some(sh) = self.get_source_hint(ctx.node_id, ctx.session, channel, control.should_create()) {
213 log::debug!("[PubSubFeatureController] SourceHint control for {:?} from {:?}: {:?}", channel, remote, control);
214 sh.on_remote(now, remote, control);
215 self.pop_single_source_hint(ctx, now, channel);
216
217 let sh = self.source_hints.get_mut(&channel).expect("Should have source hint");
218 if sh.should_clear() {
219 self.source_hints.remove(&channel);
220 }
221 } else {
222 log::warn!("[PubSubFeatureController] Remote control for unknown channel {:?}", channel);
223 }
224 }
225
226 fn pop_single_relay(relay_id: RelayId, relay: &mut Box<dyn GenericRelay<UserData>>, queue: &mut VecDeque<FeatureOutput<UserData, Event, ToWorker<UserData>>>) {
227 while let Some(control) = relay.pop_output() {
228 match control {
229 GenericRelayOutput::ToWorker(control) => queue.push_back(FeatureOutput::ToWorker(true, ToWorker::RelayControl(relay_id, control))),
230 GenericRelayOutput::RouteChanged(actor) => queue.push_back(FeatureOutput::Event(actor, Event(relay_id.0, ChannelEvent::RouteChanged(relay_id.1)))),
231 GenericRelayOutput::Feedback(actors, fb) => {
232 log::debug!("[PubsubController] Feedback for {:?} {:?} to actors {:?}", relay_id, fb, actors);
233 for actor in actors {
234 queue.push_back(FeatureOutput::Event(actor, Event(relay_id.0, ChannelEvent::FeedbackData(fb))));
235 }
236 }
237 };
238 }
239 }
240
241 fn pop_single_source_hint(&mut self, ctx: &FeatureContext, now: u64, channel: ChannelId) {
242 loop {
243 let sh = self.source_hints.get_mut(&channel).expect("Should have source hint");
244 let out = if let Some(out) = sh.pop_output() {
245 out
246 } else {
247 return;
248 };
249 match out {
250 source_hint::Output::SendRemote(dest, control) => {
251 self.queue.push_back(FeatureOutput::ToWorker(true, ToWorker::SourceHint(channel, dest, control)));
252 }
253 source_hint::Output::SubscribeSource(actors, source) => {
254 for actor in actors {
255 self.on_local(ctx, now, actor, channel, ChannelControl::SubSource(source));
256 }
257 }
258 source_hint::Output::UnsubscribeSource(actors, source) => {
259 for actor in actors {
260 self.on_local(ctx, now, actor, channel, ChannelControl::UnsubSource(source));
261 }
262 }
263 }
264 }
265 }
266}
267
268impl<UserData: 'static + Eq + Copy + Debug> Feature<UserData, Control, Event, ToController, ToWorker<UserData>> for PubSubFeature<UserData> {
269 fn on_shared_input(&mut self, ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
270 match input {
271 FeatureSharedInput::Tick(_) => {
272 let mut clears = vec![];
273 for (relay_id, relay) in self.relays.iter_mut() {
274 if relay.should_clear() {
275 clears.push(*relay_id);
276 } else {
277 relay.on_tick(now);
278 Self::pop_single_relay(*relay_id, relay, &mut self.queue);
279 }
280 }
281 for relay_id in clears {
282 self.relays.remove(&relay_id);
283 }
284
285 let mut clears = vec![];
286 let mut not_clears = vec![];
287 for (channel, sh) in self.source_hints.iter_mut() {
288 if sh.should_clear() {
289 clears.push(*channel);
290 } else {
291 sh.on_tick(now);
292 not_clears.push(*channel);
293 }
294 }
295 for channel in clears {
296 self.source_hints.remove(&channel);
297 }
298 for channel in not_clears {
299 self.pop_single_source_hint(ctx, now, channel);
300 }
301 }
302 FeatureSharedInput::Connection(event) => {
303 if let ConnectionEvent::Disconnected(ctx) = event {
304 for (relay_id, relay) in self.relays.iter_mut() {
305 relay.conn_disconnected(now, ctx.pair);
306 Self::pop_single_relay(*relay_id, relay, &mut self.queue);
307 }
308 }
309 }
310 }
311 }
312
313 fn on_input(&mut self, ctx: &FeatureContext, now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
314 match input {
315 FeatureInput::FromWorker(ToController::RelayControl(remote, relay_id, control)) => {
316 self.on_remote_relay_control(ctx, now_ms, remote, relay_id, control);
317 }
318 FeatureInput::FromWorker(ToController::SourceHint(remote, channel, control)) => {
319 self.on_remote_source_hint_control(ctx, now_ms, remote, channel, control);
320 }
321 FeatureInput::Control(actor, Control(channel, control)) => {
322 self.on_local(ctx, now_ms, actor, channel, control);
323 }
324 _ => panic!("Unexpected input"),
325 }
326 }
327
328 fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
329 log::info!("[PubSubFeatureWorker] Shutdown");
330 self.shutdown = true;
331 }
332}
333
334impl<UserData> TaskSwitcherChild<FeatureOutput<UserData, Event, ToWorker<UserData>>> for PubSubFeature<UserData> {
335 type Time = u64;
336
337 fn is_empty(&self) -> bool {
338 self.shutdown && self.queue.is_empty()
339 }
340
341 fn empty_event(&self) -> FeatureOutput<UserData, Event, ToWorker<UserData>> {
342 FeatureOutput::OnResourceEmpty
343 }
344
345 fn pop_output(&mut self, _now: u64) -> Option<FeatureOutput<UserData, Event, ToWorker<UserData>>> {
346 self.queue.pop_front()
347 }
348}