1use std::{collections::HashMap, fmt::Debug};
2
3use atm0s_sdn_identity::ConnId;
4use atm0s_sdn_router::{RouteAction, RouterTable};
5use sans_io_runtime::{collections::DynamicDeque, return_if_err, return_if_none, TaskSwitcherChild};
6
7use crate::{
8 base::{Buffer, FeatureControlActor, FeatureWorker, FeatureWorkerContext, FeatureWorkerInput, FeatureWorkerOutput, TransportMsgHeader},
9 data_plane::NetPair,
10};
11
12use super::{
13 msg::{PubsubMessage, RelayControl, RelayId},
14 ChannelControl, ChannelEvent, Control, Event, RelayWorkerControl, ToController, ToWorker,
15};
16
17struct WorkerRelay<UserData> {
18 source: Option<NetPair>,
19 locals: Vec<FeatureControlActor<UserData>>,
20 remotes: Vec<NetPair>,
21 remotes_uuid: HashMap<NetPair, u64>,
22}
23
24impl<UserData> WorkerRelay<UserData> {
25 pub fn is_empty(&self) -> bool {
26 self.locals.is_empty() && self.remotes.is_empty()
27 }
28}
29
30pub struct PubSubFeatureWorker<UserData> {
31 relays: HashMap<RelayId, WorkerRelay<UserData>>,
32 queue: DynamicDeque<FeatureWorkerOutput<UserData, Control, Event, ToController>, 16>,
33 shutdown: bool,
34}
35
36impl<UserData> Default for PubSubFeatureWorker<UserData> {
37 fn default() -> Self {
38 Self {
39 relays: HashMap::new(),
40 queue: Default::default(),
41 shutdown: false,
42 }
43 }
44}
45
46impl<UserData: Eq + Copy + Debug> FeatureWorker<UserData, Control, Event, ToController, ToWorker<UserData>> for PubSubFeatureWorker<UserData> {
47 fn on_network_raw(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64, _conn: ConnId, remote: NetPair, _header: TransportMsgHeader, buf: Buffer) {
48 log::debug!("[PubSubWorker] on_network_raw from {}", remote);
49 let msg = return_if_err!(PubsubMessage::try_from(&buf as &[u8]));
50 match msg {
51 PubsubMessage::Control(relay_id, control) => {
52 log::debug!("[PubSubWorker] received PubsubMessage::RelayControl({:?}, {:?})", relay_id, control);
53 self.queue.push_back(FeatureWorkerOutput::ToController(ToController::RelayControl(remote, relay_id, control)));
54 }
55 PubsubMessage::SourceHint(channel, control) => {
56 log::debug!("[PubSubWorker] received PubsubMessage::SourceHint({:?}, {:?})", channel, control);
57 self.queue.push_back(FeatureWorkerOutput::ToController(ToController::SourceHint(remote, channel, control)));
58 }
59 PubsubMessage::Data(relay_id, data) => {
60 log::debug!("[PubSubWorker] received PubsubMessage::Data({:?}, size {})", relay_id, data.len());
61 let relay = return_if_none!(self.relays.get(&relay_id));
62 if relay.source == Some(remote) {
64 for actor in &relay.locals {
65 self.queue
66 .push_back(FeatureWorkerOutput::Event(*actor, Event(relay_id.0, ChannelEvent::SourceData(relay_id.1, data.to_vec()))));
67 }
68
69 if !relay.remotes.is_empty() {
70 let control = PubsubMessage::Data(relay_id, data);
71 self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
73 }
74 } else {
75 log::warn!("[PubsubWorker] Relay from untrusted source local {:?} != remote {}", relay.source, remote);
76 }
77 }
78 }
79 }
80
81 fn on_input(&mut self, ctx: &mut FeatureWorkerContext, _now: u64, input: FeatureWorkerInput<UserData, Control, ToWorker<UserData>>) {
82 match input {
83 FeatureWorkerInput::FromController(_, ToWorker::RelayControl(relay_id, control)) => match control {
84 RelayWorkerControl::SendSub(uuid, remote) => {
85 let dest = if let Some(remote) = remote {
86 log::debug!("[PubsubWorker] SendSub for {:?} to {}", relay_id, remote);
87 remote
88 } else if let Some(next) = ctx.router.next(relay_id.1) {
89 log::debug!("[PubsubWorker] SendSub for {:?} to {} by select from router table", relay_id, next);
90 next
91 } else {
92 log::warn!("[PubsubWorker] SendSub: no route for {:?}", relay_id);
93 return;
94 };
95 let control = PubsubMessage::Control(relay_id, RelayControl::Sub(uuid));
96 self.queue.push_back(FeatureWorkerOutput::RawDirect2(dest, control.into()));
97 }
98 RelayWorkerControl::SendFeedback(fb, remote) => {
99 log::debug!("[PubsubWorker] SendFeedback for {:?} to {:?}", relay_id, remote);
100 let control = PubsubMessage::Control(relay_id, RelayControl::Feedback(fb));
101 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
102 }
103 RelayWorkerControl::SendUnsub(uuid, remote) => {
104 log::debug!("[PubsubWorker] SendUnsub for {:?} to {:?}", relay_id, remote);
105 let control = PubsubMessage::Control(relay_id, RelayControl::Unsub(uuid));
106 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
107 }
108 RelayWorkerControl::SendSubOk(uuid, remote) => {
109 log::debug!("[PubsubWorker] SendSubOk for {:?} to {:?}", relay_id, remote);
110 let control = PubsubMessage::Control(relay_id, RelayControl::SubOK(uuid));
111 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
112 }
113 RelayWorkerControl::SendUnsubOk(uuid, remote) => {
114 log::debug!("[PubsubWorker] SendUnsubOk for {:?} to {:?}", relay_id, remote);
115 let control = PubsubMessage::Control(relay_id, RelayControl::UnsubOK(uuid));
116 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
117 }
118 RelayWorkerControl::SendRouteChanged => {
119 let relay = return_if_none!(self.relays.get(&relay_id));
120 log::debug!("[PubsubWorker] SendRouteChanged for {:?} to remotes {:?}", relay_id, relay.remotes);
121 for (addr, uuid) in relay.remotes_uuid.iter() {
122 let control = PubsubMessage::Control(relay_id, RelayControl::RouteChanged(*uuid));
123 self.queue.push_back(FeatureWorkerOutput::RawDirect2(*addr, control.into()));
124 }
125 }
126 RelayWorkerControl::RouteSetSource(source) => {
127 log::info!("[PubsubWorker] RouteSetSource for {:?} to {:?}", relay_id, source);
128 let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
129 source: None,
130 locals: vec![],
131 remotes: vec![],
132 remotes_uuid: HashMap::new(),
133 });
134
135 entry.source = Some(source);
136 }
137 RelayWorkerControl::RouteDelSource(source) => {
138 log::info!("[PubsubWorker] RouteDelSource for {:?} to {:?}", relay_id, source);
139 if let Some(entry) = self.relays.get_mut(&relay_id) {
140 if entry.source == Some(source) {
141 entry.source = None;
142 } else {
143 log::warn!("[PubsubWorker] RelayDel: relay {:?} source mismatch locked {:?} vs {}", relay_id, entry.source, source);
144 }
145 } else {
146 log::warn!("[PubsubWorker] RelayDel: relay not found {:?}", relay_id);
147 }
148 }
149 RelayWorkerControl::RouteSetLocal(actor) => {
150 log::debug!("[PubsubWorker] RouteSetLocal for {:?} to {:?}", relay_id, actor);
151 let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
152 source: None,
153 locals: vec![],
154 remotes: vec![],
155 remotes_uuid: HashMap::new(),
156 });
157
158 entry.locals.push(actor);
159 }
160 RelayWorkerControl::RouteDelLocal(actor) => {
161 log::debug!("[PubsubWorker] RouteDelLocal for {:?} to {:?}", relay_id, actor);
162 if let Some(entry) = self.relays.get_mut(&relay_id) {
163 if let Some(pos) = entry.locals.iter().position(|x| *x == actor) {
164 entry.locals.swap_remove(pos);
165 }
166 if entry.is_empty() {
167 self.relays.remove(&relay_id);
168 }
169 } else {
170 log::warn!("[PubsubWorker] RelayDelLocal: relay not found {:?}", relay_id);
171 }
172 }
173 RelayWorkerControl::RouteSetRemote(remote, uuid) => {
174 log::debug!("[PubsubWorker] RouteSetRemote for {:?} to {:?}", relay_id, remote);
175 let entry: &mut WorkerRelay<UserData> = self.relays.entry(relay_id).or_insert(WorkerRelay {
176 source: None,
177 locals: vec![],
178 remotes: vec![],
179 remotes_uuid: HashMap::new(),
180 });
181
182 entry.remotes.push(remote);
183 entry.remotes_uuid.insert(remote, uuid);
184 }
185 RelayWorkerControl::RouteDelRemote(remote) => {
186 log::debug!("[PubsubWorker] RouteDelRemote for {:?} to {:?}", relay_id, remote);
187 if let Some(entry) = self.relays.get_mut(&relay_id) {
188 if let Some(pos) = entry.remotes.iter().position(|x| *x == remote) {
189 entry.remotes.swap_remove(pos);
190 }
191 if entry.is_empty() {
192 self.relays.remove(&relay_id);
193 }
194 } else {
195 log::warn!("RelayDelSub: relay not found {:?}", relay_id);
196 }
197 }
198 },
199 FeatureWorkerInput::FromController(_, ToWorker::SourceHint(channel, remote, data)) => {
200 if let Some(remote) = remote {
201 let control = PubsubMessage::SourceHint(channel, data);
202 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
203 } else {
204 let next = ctx.router.path_to_key(*channel as u32);
205 if let RouteAction::Next(remote) = next {
206 let control = PubsubMessage::SourceHint(channel, data);
207 self.queue.push_back(FeatureWorkerOutput::RawDirect2(remote, control.into()));
208 }
209 }
210 }
211 FeatureWorkerInput::FromController(_, ToWorker::RelayData(relay_id, data)) => {
212 let relay = return_if_none!(self.relays.get(&relay_id));
213 if relay.remotes.is_empty() {
214 log::warn!("RelayData: no remote for {:?}", relay_id);
215 return;
216 }
217 let control = PubsubMessage::Data(relay_id, data);
218 self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
219 }
220 FeatureWorkerInput::Control(actor, control) => match control {
221 Control(channel, ChannelControl::PubData(data)) => {
222 let relay_id = RelayId(channel, ctx.node_id);
223 let relay = return_if_none!(self.relays.get(&relay_id));
224
225 for actor in &relay.locals {
226 self.queue
227 .push_back(FeatureWorkerOutput::Event(*actor, Event(channel, ChannelEvent::SourceData(ctx.node_id, data.clone()))));
228 }
229
230 if !relay.remotes.is_empty() {
231 let control = PubsubMessage::Data(relay_id, data);
232 self.queue.push_back(FeatureWorkerOutput::RawBroadcast2(relay.remotes.clone(), control.into()));
233 }
234 }
235 _ => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
236 },
237 _ => {}
238 }
239 }
240
241 fn on_shutdown(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64) {
242 self.shutdown = true;
243 }
244}
245
246impl<UserData> TaskSwitcherChild<FeatureWorkerOutput<UserData, Control, Event, ToController>> for PubSubFeatureWorker<UserData> {
247 type Time = u64;
248
249 fn is_empty(&self) -> bool {
250 self.shutdown && self.queue.is_empty()
251 }
252
253 fn empty_event(&self) -> FeatureWorkerOutput<UserData, Control, Event, ToController> {
254 FeatureWorkerOutput::OnResourceEmpty
255 }
256
257 fn pop_output(&mut self, _now: u64) -> Option<FeatureWorkerOutput<UserData, Control, Event, ToController>> {
258 self.queue.pop_front()
259 }
260}