atm0s_sdn_network/
data_plane.rs

1use std::{
2    collections::HashMap,
3    fmt::{Debug, Display},
4    hash::Hash,
5    net::{AddrParseError, SocketAddr},
6    sync::Arc,
7};
8
9use atm0s_sdn_identity::{ConnId, NodeId};
10use atm0s_sdn_router::{
11    shadow::{ShadowRouter, ShadowRouterHistory},
12    RouteAction, RouteRule, RouterTable,
13};
14use sans_io_runtime::{collections::DynamicDeque, return_if_err, return_if_none, return_if_some, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};
15
16use crate::{
17    base::{
18        Buffer, FeatureControlActor, FeatureWorkerContext, FeatureWorkerInput, FeatureWorkerOutput, NeighboursControl, NetOutgoingMeta, ServiceBuilder, ServiceControlActor, ServiceId,
19        ServiceWorkerCtx, ServiceWorkerInput, ServiceWorkerOutput, TransportMsg, TransportMsgHeader,
20    },
21    features::{Features, FeaturesControl, FeaturesEvent},
22    ExtIn, ExtOut, LogicControl, LogicEvent,
23};
24
25use self::{connection::DataPlaneConnection, features::FeatureWorkerManager, services::ServiceWorkerManager};
26
27mod connection;
28mod features;
29mod services;
30
31/// NetPair is a pair between remote addr and local addr.
32/// This is for solving problems with multi-ip-addresses system.
33#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)]
34pub struct NetPair {
35    pub local: SocketAddr,
36    pub remote: SocketAddr,
37}
38
39impl NetPair {
40    pub fn new(local: SocketAddr, remote: SocketAddr) -> Self {
41        Self { local, remote }
42    }
43
44    pub fn new_str(local: &str, remote: &str) -> Result<Self, AddrParseError> {
45        Ok(Self {
46            local: local.parse::<SocketAddr>()?,
47            remote: remote.parse::<SocketAddr>()?,
48        })
49    }
50}
51
52impl Display for NetPair {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.write_fmt(format_args!("[{}-{}]", self.local, self.remote))
55    }
56}
57
58#[derive(Debug)]
59pub enum NetInput {
60    UdpPacket(NetPair, Buffer),
61    #[cfg(feature = "vpn")]
62    TunPacket(Buffer),
63}
64
65#[derive(Debug, Clone)]
66pub enum CrossWorker<UserData, SE> {
67    Feature(UserData, FeaturesEvent),
68    Service(ServiceId, UserData, SE),
69}
70
71#[derive(Debug)]
72pub enum Input<UserData, SC, SE, TW> {
73    Ext(ExtIn<UserData, SC>),
74    Net(NetInput),
75    Event(LogicEvent<UserData, SE, TW>),
76    Worker(CrossWorker<UserData, SE>),
77}
78
79#[derive(Debug)]
80pub enum NetOutput {
81    UdpPacket(NetPair, Buffer),
82    UdpPackets(Vec<NetPair>, Buffer),
83    #[cfg(feature = "vpn")]
84    TunPacket(Buffer),
85}
86
87#[derive(convert_enum::From)]
88pub enum Output<UserData, SC, SE, TC> {
89    Ext(ExtOut<UserData, SE>),
90    Net(NetOutput),
91    Control(LogicControl<UserData, SC, SE, TC>),
92    #[convert_enum(optout)]
93    Worker(u16, CrossWorker<UserData, SE>),
94    #[convert_enum(optout)]
95    OnResourceEmpty,
96    #[convert_enum(optout)]
97    Continue,
98}
99
100#[derive(num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
101#[repr(usize)]
102enum TaskType {
103    Feature = 0,
104    Service = 1,
105}
106
107pub struct DataPlaneCfg<UserData, SC, SE, TC, TW> {
108    pub worker_id: u16,
109    #[allow(clippy::type_complexity)]
110    pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
111    pub history: Arc<dyn ShadowRouterHistory>,
112}
113
114pub struct DataPlane<UserData, SC, SE, TC, TW> {
115    tick_count: u64,
116    worker_id: u16,
117    feature_ctx: FeatureWorkerContext,
118    service_ctx: ServiceWorkerCtx,
119    features: TaskSwitcherBranch<FeatureWorkerManager<UserData>, features::Output<UserData>>,
120    #[allow(clippy::type_complexity)]
121    services: TaskSwitcherBranch<ServiceWorkerManager<UserData, SC, SE, TC, TW>, services::Output<UserData, SC, SE, TC>>,
122    conns: HashMap<NetPair, DataPlaneConnection>,
123    conns_reverse: HashMap<ConnId, NetPair>,
124    queue: DynamicDeque<Output<UserData, SC, SE, TC>, 16>,
125    shutdown: bool,
126    switcher: TaskSwitcher,
127}
128
129impl<UserData, SC, SE, TC, TW> DataPlane<UserData, SC, SE, TC, TW>
130where
131    UserData: 'static + Copy + Eq + Hash + Debug,
132{
133    pub fn new(node_id: NodeId, cfg: DataPlaneCfg<UserData, SC, SE, TC, TW>) -> Self {
134        log::info!("Create DataPlane for node: {}", node_id);
135
136        Self {
137            worker_id: cfg.worker_id,
138            tick_count: 0,
139            feature_ctx: FeatureWorkerContext {
140                node_id,
141                router: ShadowRouter::new(node_id, cfg.history),
142            },
143            service_ctx: ServiceWorkerCtx { node_id },
144            features: TaskSwitcherBranch::new(FeatureWorkerManager::new(), TaskType::Feature),
145            services: TaskSwitcherBranch::new(ServiceWorkerManager::new(cfg.services), TaskType::Service),
146            conns: HashMap::new(),
147            conns_reverse: HashMap::new(),
148            queue: DynamicDeque::default(),
149            shutdown: false,
150            switcher: TaskSwitcher::new(2),
151        }
152    }
153
154    pub fn route(&self, rule: RouteRule, source: Option<NodeId>, relay_from: Option<NodeId>) -> RouteAction<NetPair> {
155        self.feature_ctx.router.derive_action(&rule, source, relay_from)
156    }
157
158    pub fn on_tick(&mut self, now_ms: u64) {
159        log::trace!("[DataPlane] on_tick: {}", now_ms);
160        self.features.input(&mut self.switcher).on_tick(&mut self.feature_ctx, now_ms, self.tick_count);
161        self.services.input(&mut self.switcher).on_tick(&self.service_ctx, now_ms, self.tick_count);
162        self.tick_count += 1;
163    }
164
165    pub fn on_event(&mut self, now_ms: u64, event: Input<UserData, SC, SE, TW>) {
166        match event {
167            Input::Ext(ext) => match ext {
168                ExtIn::FeaturesControl(userdata, control) => {
169                    let feature: Features = control.to_feature();
170                    let actor = FeatureControlActor::Worker(self.worker_id, userdata);
171                    self.features
172                        .input(&mut self.switcher)
173                        .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Control(actor, control));
174                }
175                ExtIn::ServicesControl(service, userdata, control) => {
176                    let actor = ServiceControlActor::Worker(self.worker_id, userdata);
177                    self.services
178                        .input(&mut self.switcher)
179                        .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::Control(actor, control));
180                }
181            },
182            Input::Worker(CrossWorker::Feature(userdata, event)) => self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event))),
183            Input::Worker(CrossWorker::Service(service, userdata, event)) => self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event))),
184            Input::Net(NetInput::UdpPacket(pair, buf)) => {
185                if buf.is_empty() {
186                    return;
187                }
188                if let Ok(control) = NeighboursControl::try_from(&*buf) {
189                    self.queue.push_back(LogicControl::NetNeighbour(pair, control).into());
190                } else {
191                    self.incoming_route(now_ms, pair, buf);
192                }
193            }
194            #[cfg(feature = "vpn")]
195            Input::Net(NetInput::TunPacket(pkt)) => {
196                self.features
197                    .input(&mut self.switcher)
198                    .on_input(&mut self.feature_ctx, Features::Vpn, now_ms, FeatureWorkerInput::TunPkt(pkt));
199            }
200            Input::Event(LogicEvent::Feature(is_broadcast, to)) => {
201                let feature = to.to_feature();
202                self.features
203                    .input(&mut self.switcher)
204                    .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::FromController(is_broadcast, to));
205            }
206            Input::Event(LogicEvent::Service(service, to)) => {
207                self.services
208                    .input(&mut self.switcher)
209                    .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::FromController(to));
210            }
211            Input::Event(LogicEvent::ExtFeaturesEvent(worker, userdata, event)) => {
212                assert_eq!(self.worker_id, worker);
213                self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
214            }
215            Input::Event(LogicEvent::ExtServicesEvent(worker, service, userdata, event)) => {
216                assert_eq!(self.worker_id, worker);
217                self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
218            }
219            Input::Event(LogicEvent::NetNeighbour(pair, control)) => {
220                let buf: Result<Vec<u8>, ()> = (&control).try_into();
221                if let Ok(buf) = buf {
222                    self.queue.push_back(NetOutput::UdpPacket(pair, buf.into()).into());
223                }
224            }
225            Input::Event(LogicEvent::NetDirect(feature, pair, _conn, meta, buf)) => {
226                let header = meta.to_header(feature as u8, RouteRule::Direct, self.feature_ctx.node_id);
227                let conn = return_if_none!(self.conns.get_mut(&pair));
228                let msg = TransportMsg::build_raw(header, buf);
229                if let Some(pkt) = Self::build_send_to_from_mut(now_ms, conn, pair, msg.take()) {
230                    self.queue.push_back(pkt.into());
231                }
232            }
233            Input::Event(LogicEvent::NetRoute(feature, rule, meta, buf)) => self.outgoing_route(now_ms, feature, rule, meta, buf),
234            Input::Event(LogicEvent::Pin(conn, node, pair, secure)) => {
235                self.conns.insert(pair, DataPlaneConnection::new(node, conn, pair, secure));
236                self.conns_reverse.insert(conn, pair);
237            }
238            Input::Event(LogicEvent::UnPin(conn)) => {
239                if let Some(addr) = self.conns_reverse.remove(&conn) {
240                    log::info!("UnPin: conn: {} <--> addr: {}", conn, addr);
241                    self.conns.remove(&addr);
242                }
243            }
244        }
245    }
246
247    pub fn on_shutdown(&mut self, now_ms: u64) {
248        if self.shutdown {
249            return;
250        }
251        log::info!("[DataPlane] Shutdown");
252        self.features.input(&mut self.switcher).on_shutdown(&mut self.feature_ctx, now_ms);
253        self.services.input(&mut self.switcher).on_shutdown(&self.service_ctx, now_ms);
254        self.shutdown = true;
255    }
256
257    fn incoming_route(&mut self, now_ms: u64, pair: NetPair, mut buf: Buffer) {
258        let conn = return_if_none!(self.conns.get_mut(&pair));
259        if TransportMsgHeader::is_secure(buf[0]) {
260            return_if_none!(conn.decrypt_if_need(now_ms, &mut buf));
261        }
262        let header = return_if_err!(TransportMsgHeader::try_from(&buf as &[u8]));
263        let action = self.feature_ctx.router.derive_action(&header.route, header.from_node, Some(conn.node()));
264        log::debug!("[DataPlane] Incoming rule: {:?} from: {pair}, node {:?} => action {:?}", header.route, header.from_node, action);
265        match action {
266            RouteAction::Reject => {}
267            RouteAction::Local => {
268                let feature = return_if_none!(header.feature.try_into().ok());
269                log::debug!("Incoming message for feature: {feature:?} from: {pair}");
270                self.features
271                    .input(&mut self.switcher)
272                    .on_network_raw(&mut self.feature_ctx, feature, now_ms, conn.conn(), pair, header, buf);
273            }
274            RouteAction::Next(pair) => {
275                if !TransportMsgHeader::decrease_ttl(&mut buf) {
276                    log::debug!("TTL is 0, drop packet");
277                }
278                let target_conn = return_if_none!(self.conns.get_mut(&pair));
279                if let Some(out) = Self::build_send_to_from_mut(now_ms, target_conn, pair, buf) {
280                    self.queue.push_back(out.into());
281                }
282            }
283            RouteAction::Broadcast(local, pairs) => {
284                if !TransportMsgHeader::decrease_ttl(&mut buf) {
285                    log::debug!("TTL is 0, drop packet");
286                    return;
287                }
288                if local {
289                    if let Ok(feature) = header.feature.try_into() {
290                        log::debug!("Incoming broadcast feature: {feature:?} from: {pair}");
291                        self.features
292                            .input(&mut self.switcher)
293                            .on_network_raw(&mut self.feature_ctx, feature, now_ms, conn.conn(), pair, header, buf.clone());
294                    }
295                }
296                if !pairs.is_empty() {
297                    log::debug!("Incoming broadcast from: {pair} forward to: {pairs:?}");
298                    if let Some(out) = self.build_send_to_multi_from_mut(now_ms, pairs, buf) {
299                        self.queue.push_back(out.into());
300                    }
301                }
302            }
303        }
304    }
305
306    fn outgoing_route(&mut self, now_ms: u64, feature: Features, rule: RouteRule, mut meta: NetOutgoingMeta, buf: Buffer) {
307        match self.feature_ctx.router.derive_action(&rule, Some(self.feature_ctx.node_id), None) {
308            RouteAction::Reject => {
309                log::debug!("[DataPlane] outgoing route rule {:?} is rejected", rule);
310            }
311            RouteAction::Local => {
312                log::debug!("[DataPlane] outgoing route rule {:?} is processed locally", rule);
313                let meta = meta.to_incoming(self.feature_ctx.node_id);
314                self.features
315                    .input(&mut self.switcher)
316                    .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Local(meta, buf));
317            }
318            RouteAction::Next(remote) => {
319                log::debug!("[DataPlane] outgoing route rule {:?} is go with remote {remote}", rule);
320                let header = meta.to_header(feature as u8, rule, self.feature_ctx.node_id);
321                let msg = TransportMsg::build_raw(header, buf);
322                let conn = return_if_none!(self.conns.get_mut(&remote));
323                if let Some(out) = Self::build_send_to_from_mut(now_ms, conn, remote, msg.take()) {
324                    self.queue.push_back(out.into());
325                }
326            }
327            RouteAction::Broadcast(local, remotes) => {
328                log::debug!("[DataPlane] outgoing route rule {:?} is go with local {local} and remotes {:?}", rule, remotes);
329                meta.source = true; //Force enable source for broadcast
330
331                let header = meta.to_header(feature as u8, rule, self.feature_ctx.node_id);
332                if local {
333                    let meta = meta.to_incoming(self.feature_ctx.node_id);
334                    self.features
335                        .input(&mut self.switcher)
336                        .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Local(meta, buf.clone()));
337                }
338                let msg = TransportMsg::build_raw(header, buf);
339                if let Some(out) = self.build_send_to_multi_from_mut(now_ms, remotes, msg.take()) {
340                    self.queue.push_back(out.into());
341                }
342            }
343        }
344    }
345
346    fn pop_features(&mut self, now_ms: u64) {
347        let out = return_if_none!(self.features.pop_output(now_ms, &mut self.switcher));
348        let (feature, out) = match out {
349            features::Output::Output(feature, out) => (feature, out),
350            features::Output::OnResourceEmpty => {
351                log::info!("[DataPlane] Features OnResourceEmpty");
352                return;
353            }
354        };
355        match out {
356            FeatureWorkerOutput::ForwardControlToController(service, control) => self.queue.push_back(LogicControl::FeaturesControl(service, control).into()),
357            FeatureWorkerOutput::ForwardNetworkToController(conn, header, msg) => self.queue.push_back(LogicControl::NetRemote(feature, conn, header, msg).into()),
358            FeatureWorkerOutput::ForwardLocalToController(header, buf) => self.queue.push_back(LogicControl::NetLocal(feature, header, buf).into()),
359            FeatureWorkerOutput::ToController(control) => self.queue.push_back(LogicControl::Feature(control).into()),
360            FeatureWorkerOutput::Event(actor, event) => match actor {
361                FeatureControlActor::Controller(userdata) => self.queue.push_back(Output::Control(LogicControl::ExtFeaturesEvent(userdata, event))),
362                FeatureControlActor::Worker(worker, userdata) => {
363                    if self.worker_id == worker {
364                        self.queue.push_back(Output::Ext(ExtOut::FeaturesEvent(userdata, event)));
365                    } else {
366                        self.queue.push_back(Output::Worker(worker, CrossWorker::Feature(userdata, event)));
367                    }
368                }
369                FeatureControlActor::Service(service) => {
370                    self.services
371                        .input(&mut self.switcher)
372                        .on_input(&self.service_ctx, now_ms, service, ServiceWorkerInput::FeatureEvent(event));
373                }
374            },
375            FeatureWorkerOutput::SendDirect(conn, meta, buf) => {
376                if let Some(addr) = self.conns_reverse.get(&conn) {
377                    let conn = self.conns.get_mut(addr).expect("Should have");
378                    let header = meta.to_header(feature as u8, RouteRule::Direct, self.feature_ctx.node_id);
379                    let msg = TransportMsg::build_raw(header, buf);
380                    self.queue.push_back(Self::build_send_to_from_mut(now_ms, conn, *addr, msg.take()).expect("Should have output").into())
381                }
382            }
383            FeatureWorkerOutput::SendRoute(rule, ttl, buf) => {
384                log::info!("SendRoute: {:?}", rule);
385                self.outgoing_route(now_ms, feature, rule, ttl, buf);
386            }
387            FeatureWorkerOutput::RawDirect(conn, buf) => {
388                if let Some(pair) = self.conns_reverse.get(&conn) {
389                    let conn = self.conns.get_mut(pair).expect("Should have conn");
390                    self.queue.push_back(Self::build_send_to(now_ms, conn, *pair, buf).expect("Should ok for convert RawDirect").into());
391                }
392            }
393            FeatureWorkerOutput::RawBroadcast(conns, buf) => {
394                let addrs = conns.iter().filter_map(|conn| self.conns_reverse.get(conn)).cloned().collect();
395                let out = self.build_send_to_multi(now_ms, addrs, buf).map(|e| e.into()).unwrap_or(Output::Continue);
396                self.queue.push_back(out);
397            }
398            FeatureWorkerOutput::RawDirect2(pair, buf) => {
399                if let Some(conn) = self.conns.get_mut(&pair) {
400                    self.queue.push_back(Self::build_send_to(now_ms, conn, pair, buf).expect("Should ok for convert RawDirect2").into());
401                }
402            }
403            FeatureWorkerOutput::RawBroadcast2(pairs, buf) => {
404                let out = self.build_send_to_multi(now_ms, pairs, buf).map(|e| e.into()).unwrap_or(Output::Continue);
405                self.queue.push_back(out);
406            }
407            #[cfg(feature = "vpn")]
408            FeatureWorkerOutput::TunPkt(pkt) => self.queue.push_back(NetOutput::TunPacket(pkt).into()),
409            FeatureWorkerOutput::OnResourceEmpty => {
410                log::info!("[DataPlane] Feature {feature:?} OnResourceEmpty");
411            }
412        }
413    }
414
415    fn pop_services(&mut self, now_ms: u64) {
416        let out = return_if_none!(self.services.pop_output(now_ms, &mut self.switcher));
417        let (service, out) = match out {
418            services::Output::Output(service, out) => (service, out),
419            services::Output::OnResourceEmpty => {
420                log::info!("[DataPlane] Services OnResourceEmpty");
421                return;
422            }
423        };
424        match out {
425            ServiceWorkerOutput::ForwardControlToController(actor, control) => self.queue.push_back(LogicControl::ServicesControl(actor, service, control).into()),
426            ServiceWorkerOutput::ForwardFeatureEventToController(event) => self.queue.push_back(LogicControl::ServiceEvent(service, event).into()),
427            ServiceWorkerOutput::ToController(tc) => self.queue.push_back(LogicControl::Service(service, tc).into()),
428            ServiceWorkerOutput::FeatureControl(control) => {
429                let feature = control.to_feature();
430                self.features
431                    .input(&mut self.switcher)
432                    .on_input(&mut self.feature_ctx, feature, now_ms, FeatureWorkerInput::Control(FeatureControlActor::Service(service), control));
433            }
434            ServiceWorkerOutput::Event(actor, event) => match actor {
435                ServiceControlActor::Controller(userdata) => self.queue.push_back(Output::Control(LogicControl::ExtServicesEvent(service, userdata, event))),
436                ServiceControlActor::Worker(worker, userdata) => {
437                    if self.worker_id == worker {
438                        self.queue.push_back(Output::Ext(ExtOut::ServicesEvent(service, userdata, event)));
439                    } else {
440                        self.queue.push_back(Output::Worker(worker, CrossWorker::Service(service, userdata, event)));
441                    }
442                }
443            },
444            ServiceWorkerOutput::OnResourceEmpty => {
445                log::info!("[DataPlane] Service {service} OnResourceEmpty");
446            }
447        }
448    }
449
450    fn build_send_to_from_mut(now: u64, conn: &mut DataPlaneConnection, pair: NetPair, mut buf: Buffer) -> Option<NetOutput> {
451        conn.encrypt_if_need(now, &mut buf)?;
452        Some(NetOutput::UdpPacket(pair, buf))
453    }
454
455    fn build_send_to_multi_from_mut(&mut self, now: u64, mut pairs: Vec<NetPair>, mut buf: Buffer) -> Option<NetOutput> {
456        if TransportMsgHeader::is_secure(buf[0]) {
457            let first = pairs.pop()?;
458            for pair in pairs {
459                if let Some(conn) = self.conns.get_mut(&pair) {
460                    let mut buf = Buffer::build(&buf, 0, 12 + 16);
461                    if conn.encrypt_if_need(now, &mut buf).is_some() {
462                        let out = NetOutput::UdpPacket(pair, buf);
463                        self.queue.push_back(Output::Net(out));
464                    }
465                }
466            }
467            let conn = self.conns.get_mut(&first)?;
468            conn.encrypt_if_need(now, &mut buf)?;
469            Some(NetOutput::UdpPacket(first, buf))
470        } else {
471            Some(NetOutput::UdpPackets(pairs, buf))
472        }
473    }
474
475    fn build_send_to_multi(&mut self, now: u64, pairs: Vec<NetPair>, buf: Buffer) -> Option<NetOutput> {
476        if TransportMsgHeader::is_secure(buf[0]) {
477            let buf = Buffer::build(&buf, 0, 12 + 16);
478            self.build_send_to_multi_from_mut(now, pairs, buf)
479        } else {
480            Some(NetOutput::UdpPackets(pairs, buf))
481        }
482    }
483
484    fn build_send_to(now: u64, conn: &mut DataPlaneConnection, pair: NetPair, buf: Buffer) -> Option<NetOutput> {
485        if TransportMsgHeader::is_secure(buf[0]) {
486            let buf = Buffer::build(&buf, 0, 12 + 16);
487            Self::build_send_to_from_mut(now, conn, pair, buf)
488        } else {
489            Some(NetOutput::UdpPacket(pair, buf))
490        }
491    }
492}
493
494impl<UserData, SC, SE, TC, TW> TaskSwitcherChild<Output<UserData, SC, SE, TC>> for DataPlane<UserData, SC, SE, TC, TW>
495where
496    UserData: 'static + Copy + Eq + Hash + Debug,
497{
498    type Time = u64;
499
500    fn empty_event(&self) -> Output<UserData, SC, SE, TC> {
501        Output::OnResourceEmpty
502    }
503
504    fn is_empty(&self) -> bool {
505        self.shutdown && self.queue.is_empty() && self.features.is_empty() && self.services.is_empty()
506    }
507
508    fn pop_output(&mut self, now: u64) -> Option<Output<UserData, SC, SE, TC>> {
509        return_if_some!(self.queue.pop_front());
510
511        while let Some(current) = self.switcher.current() {
512            match current.try_into().ok()? {
513                TaskType::Feature => self.pop_features(now),
514                TaskType::Service => self.pop_services(now),
515            }
516
517            return_if_some!(self.queue.pop_front());
518        }
519
520        None
521    }
522}