atm0s_sdn_router/shadow/
mod.rs

1use std::{fmt::Debug, hash::Hash, sync::Arc};
2
3use atm0s_sdn_identity::{NodeId, NodeIdType};
4
5use crate::{RouteAction, RouterTable, ServiceBroadcastLevel};
6
7use self::{service::Service, table::ShadowTable};
8
9mod service;
10mod table;
11
12#[mockall::automock]
13pub trait ShadowRouterHistory: Send + Sync {
14    /// This method will check if the broadcast message is already received or not
15    /// If not received, it will cache the message and return true
16    fn already_received_broadcast(&self, from: Option<NodeId>, service: u8, seq: u16) -> bool;
17
18    /// For set current time ms
19    fn set_ts(&self, now: u64);
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum ShadowRouterDelta<Conn, Remote> {
24    SetTable {
25        layer: u8,
26        index: u8,
27        conn: Conn,
28        remote: Remote,
29    },
30    DelTable {
31        layer: u8,
32        index: u8,
33    },
34    SetServiceRemote {
35        service: u8,
36        conn: Conn,
37        remote: Remote,
38        next: NodeId,
39        dest: NodeId,
40        score: u32,
41    },
42    DelServiceRemote {
43        service: u8,
44        conn: Conn,
45    },
46    SetServiceLocal {
47        service: u8,
48    },
49    DelServiceLocal {
50        service: u8,
51    },
52}
53
54pub struct ShadowRouter<Conn: Debug + Hash + Eq + Clone + Copy, Remote: Debug + Hash + Eq + Clone + Copy> {
55    node_id: NodeId,
56    local_registries: [bool; 256],
57    remote_registry: [Service<Conn, Remote>; 256],
58    tables: [ShadowTable<Remote>; 4],
59    cached: Arc<dyn ShadowRouterHistory>,
60}
61
62impl<Conn: Debug + Hash + Eq + Clone + Copy, Remote: Debug + Hash + Eq + Clone + Copy> ShadowRouter<Conn, Remote> {
63    pub fn new(node_id: NodeId, cached: Arc<dyn ShadowRouterHistory>) -> Self {
64        Self {
65            node_id,
66            local_registries: [false; 256],
67            remote_registry: std::array::from_fn(|_| Service::new()),
68            tables: [ShadowTable::new(0), ShadowTable::new(1), ShadowTable::new(2), ShadowTable::new(3)],
69            cached,
70        }
71    }
72
73    pub fn apply_delta(&mut self, delta: ShadowRouterDelta<Conn, Remote>) {
74        match delta {
75            ShadowRouterDelta::SetTable { layer, index, conn: _, remote } => {
76                self.tables[layer as usize].set(index, remote);
77            }
78            ShadowRouterDelta::DelTable { layer, index } => {
79                self.tables[layer as usize].del(index);
80            }
81            ShadowRouterDelta::SetServiceRemote {
82                service,
83                conn,
84                remote,
85                next,
86                dest,
87                score,
88            } => {
89                self.remote_registry[service as usize].set_conn(conn, remote, next, dest, score);
90            }
91            ShadowRouterDelta::DelServiceRemote { service, conn } => {
92                self.remote_registry[service as usize].del_conn(conn);
93            }
94            ShadowRouterDelta::SetServiceLocal { service } => {
95                self.local_registries[service as usize] = true;
96            }
97            ShadowRouterDelta::DelServiceLocal { service } => {
98                self.local_registries[service as usize] = false;
99            }
100        }
101    }
102}
103
104impl<Conn: Debug + Hash + Eq + Clone + Copy, Remote: Debug + Hash + Eq + Clone + Copy> RouterTable<Remote> for ShadowRouter<Conn, Remote> {
105    fn next(&self, dest: NodeId) -> Option<Remote> {
106        let eq_util_layer = self.node_id.eq_util_layer(&dest) as usize;
107        debug_assert!(eq_util_layer <= 4);
108        if eq_util_layer == 0 {
109            None
110        } else {
111            self.tables[eq_util_layer - 1].next(dest)
112        }
113    }
114
115    fn closest_for(&self, key: NodeId) -> Option<Remote> {
116        for i in [3, 2, 1, 0] {
117            let key_index = key.layer(i);
118            if let Some((remote, _next_index, next_distance)) = self.tables[i as usize].closest_for(key_index) {
119                let current_index = self.node_id.layer(i);
120                let current_distance = key_index ^ current_index;
121                if current_distance > next_distance {
122                    return Some(remote);
123                }
124            } else {
125                //if find nothing => that mean this layer is empty trying to find closest node in next layer
126                continue;
127            };
128        }
129        None
130    }
131
132    fn path_to_key(&self, key: NodeId) -> RouteAction<Remote> {
133        match self.closest_for(key) {
134            Some(remote) => RouteAction::Next(remote),
135            None => RouteAction::Local,
136        }
137    }
138
139    fn path_to_node(&self, dest: NodeId) -> RouteAction<Remote> {
140        if dest == self.node_id {
141            return RouteAction::Local;
142        }
143        match self.next(dest) {
144            Some(remote) => RouteAction::Next(remote),
145            None => RouteAction::Reject,
146        }
147    }
148
149    fn path_to_service(&self, service_id: u8) -> RouteAction<Remote> {
150        if self.local_registries[service_id as usize] {
151            RouteAction::Local
152        } else {
153            self.remote_registry[service_id as usize].best_conn().map(RouteAction::Next).unwrap_or(RouteAction::Reject)
154        }
155    }
156
157    fn path_to_services(&self, service_id: u8, seq: u16, level: ServiceBroadcastLevel, source: Option<NodeId>, relay_from: Option<NodeId>) -> RouteAction<Remote> {
158        if self.cached.already_received_broadcast(source, service_id, seq) {
159            return RouteAction::Reject;
160        }
161        let local = self.local_registries[service_id as usize];
162        if let Some(nexts) = self.remote_registry[service_id as usize].broadcast_dests(self.node_id, level, relay_from) {
163            RouteAction::Broadcast(local, nexts)
164        } else if local {
165            RouteAction::Local
166        } else {
167            RouteAction::Reject
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::sync::Arc;
175
176    use crate::{shadow::MockShadowRouterHistory, RouteAction, RouterTable, ServiceBroadcastLevel};
177
178    use super::{ShadowRouter, ShadowRouterDelta};
179
180    #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
181    struct Conn(u8);
182
183    #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
184    struct Remote(u8);
185
186    #[test]
187    fn should_route_to_next_service_local() {
188        let history = MockShadowRouterHistory::new();
189        let mut router = ShadowRouter::<Conn, Remote>::new(1, Arc::new(history));
190        router.apply_delta(ShadowRouterDelta::SetServiceLocal { service: 1 });
191
192        assert_eq!(router.path_to_service(0), RouteAction::Reject);
193        assert_eq!(router.path_to_service(1), RouteAction::Local);
194    }
195
196    #[test]
197    fn should_route_to_next_service_remote() {
198        let history = MockShadowRouterHistory::new();
199        let mut router = ShadowRouter::<Conn, Remote>::new(1, Arc::new(history));
200        router.apply_delta(ShadowRouterDelta::SetServiceRemote {
201            service: 1,
202            conn: Conn(2),
203            remote: Remote(2),
204            next: 2,
205            dest: 3,
206            score: 4,
207        });
208
209        assert_eq!(router.path_to_service(0), RouteAction::Reject);
210        assert_eq!(router.path_to_service(1), RouteAction::Next(Remote(2)));
211    }
212
213    #[test]
214    fn should_broadcast_to_next_service_local() {
215        let mut history = MockShadowRouterHistory::new();
216        history.expect_already_received_broadcast().return_const(false);
217        let mut router = ShadowRouter::<Conn, Remote>::new(1, Arc::new(history));
218        router.apply_delta(ShadowRouterDelta::SetServiceLocal { service: 1 });
219
220        assert_eq!(router.path_to_services(1, 1, ServiceBroadcastLevel::Global, None, None), RouteAction::Local);
221    }
222
223    #[test]
224    fn should_broadcast_to_next_service_remote() {
225        let mut history = MockShadowRouterHistory::new();
226        history.expect_already_received_broadcast().return_const(false);
227
228        let mut router = ShadowRouter::<Conn, Remote>::new(1, Arc::new(history));
229        router.apply_delta(ShadowRouterDelta::SetServiceRemote {
230            service: 1,
231            conn: Conn(2),
232            remote: Remote(2),
233            next: 2,
234            dest: 3,
235            score: 4,
236        });
237        router.apply_delta(ShadowRouterDelta::SetServiceRemote {
238            service: 1,
239            conn: Conn(3),
240            remote: Remote(3),
241            next: 3,
242            dest: 6,
243            score: 2,
244        });
245        router.apply_delta(ShadowRouterDelta::SetServiceRemote {
246            service: 1,
247            conn: Conn(4),
248            remote: Remote(4),
249            next: 4,
250            dest: 3,
251            score: 1,
252        });
253
254        assert_eq!(
255            router.path_to_services(1, 1, ServiceBroadcastLevel::Global, None, None),
256            RouteAction::Broadcast(false, vec![Remote(4), Remote(3)])
257        );
258
259        router.apply_delta(ShadowRouterDelta::SetServiceLocal { service: 1 });
260        assert_eq!(
261            router.path_to_services(1, 2, ServiceBroadcastLevel::Global, None, None),
262            RouteAction::Broadcast(true, vec![Remote(4), Remote(3)])
263        );
264
265        router.apply_delta(ShadowRouterDelta::SetServiceRemote {
266            service: 1,
267            conn: Conn(4),
268            remote: Remote(4),
269            next: 4,
270            dest: 5,
271            score: 1,
272        });
273        assert_eq!(
274            router.path_to_services(1, 3, ServiceBroadcastLevel::Global, None, Some(4)),
275            RouteAction::Broadcast(true, vec![Remote(3), Remote(2)])
276        );
277    }
278
279    #[test]
280    fn reject_received_broadcast_message() {
281        let mut history = MockShadowRouterHistory::new();
282        history.expect_already_received_broadcast().return_const(true);
283
284        let mut router = ShadowRouter::<Conn, Remote>::new(1, Arc::new(history));
285        router.apply_delta(ShadowRouterDelta::SetServiceLocal { service: 100 });
286
287        // should not broadcast if already received
288        assert_eq!(router.path_to_services(100, 1, ServiceBroadcastLevel::Global, None, None), RouteAction::Reject);
289    }
290}