atm0s_sdn_network/features/dht_kv/
mod.rs

1//! DHT-based key-value storage feature.
2//!
3//! This is dead-simple DHT Key-Value, which implements multi-sub key inside a key.
4//! A value is a map which is stored in a node with route key is key as u32.
5//!
6//! For solve conflict, each sub_key will attacked to a locked value, which is a pair (node, lock_session).
7//! In which, node is the node that locked the value, and session is the session of the lock.
8
9use std::fmt::Debug;
10
11use atm0s_sdn_identity::NodeId;
12use derivative::Derivative;
13use sans_io_runtime::{collections::DynamicDeque, TaskSwitcherChild};
14
15use crate::base::{Feature, FeatureContext, FeatureInput, FeatureOutput, FeatureSharedInput, FeatureWorker, FeatureWorkerInput, FeatureWorkerOutput, NetOutgoingMeta};
16
17use self::{
18    internal::InternalOutput,
19    msg::{NodeSession, Version},
20};
21
22mod client;
23mod internal;
24mod msg;
25mod server;
26
27pub use self::msg::{Key, Map};
28
29pub const FEATURE_ID: u8 = 4;
30pub const FEATURE_NAME: &str = "dht_kv";
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum MapControl {
34    Set(Key, Vec<u8>),
35    Del(Key),
36    Sub,
37    Unsub,
38}
39
40impl MapControl {
41    pub fn is_creator(&self) -> bool {
42        matches!(self, MapControl::Set(_, _) | MapControl::Sub)
43    }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum Control {
48    MapCmd(Map, MapControl),
49    MapGet(Map),
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum GetError {
54    Timeout,
55    NotFound,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum MapEvent {
60    OnSet(Key, NodeId, Vec<u8>),
61    OnDel(Key, NodeId),
62    OnRelaySelected(NodeId),
63}
64
65type MapGetRs = Result<Vec<(Key, NodeSession, Version, Vec<u8>)>, GetError>;
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum Event {
69    MapEvent(Map, MapEvent),
70    MapGetRes(Map, MapGetRs),
71}
72
73#[derive(Debug, Clone)]
74pub struct ToWorker;
75
76#[derive(Debug, Clone)]
77pub struct ToController;
78
79pub type Output<UserData> = FeatureOutput<UserData, Event, ToWorker>;
80pub type WorkerOutput<UserData> = FeatureWorkerOutput<UserData, Control, Event, ToController>;
81
82pub struct DhtKvFeature<UserData> {
83    internal: internal::DhtKvInternal<UserData>,
84    shutdown: bool,
85}
86
87impl<UserData: Eq + Copy + Debug> DhtKvFeature<UserData> {
88    pub fn new(node_id: NodeId, session: u64) -> Self {
89        Self {
90            internal: internal::DhtKvInternal::new(NodeSession(node_id, session)),
91            shutdown: false,
92        }
93    }
94}
95
96impl<UserData: Eq + Copy + Debug> Feature<UserData, Control, Event, ToController, ToWorker> for DhtKvFeature<UserData> {
97    fn on_shared_input(&mut self, _ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
98        if let FeatureSharedInput::Tick(_) = input {
99            self.internal.on_tick(now);
100        }
101    }
102
103    fn on_input(&mut self, _ctx: &FeatureContext, now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
104        match input {
105            FeatureInput::Control(actor, control) => {
106                log::debug!("[DhtKv] on ext input: actor={:?}, control={:?}", actor, control);
107                self.internal.on_local(now_ms, actor, control);
108            }
109            FeatureInput::Local(_header, buf) => {
110                if let Ok(cmd) = bincode::deserialize(&buf) {
111                    self.internal.on_remote(now_ms, cmd)
112                }
113            }
114            FeatureInput::Net(_conn, meta, buf) => {
115                if !meta.secure {
116                    //only allow secure message
117                    log::warn!("[DhtKv] reject unsecure message");
118                    return;
119                }
120                if let Ok(cmd) = bincode::deserialize(&buf) {
121                    self.internal.on_remote(now_ms, cmd)
122                }
123            }
124            _ => {}
125        }
126    }
127
128    fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
129        log::info!("[DhtKvFeature] Shutdown");
130        self.shutdown = true;
131    }
132}
133
134impl<UserData: Eq + Copy + Debug> TaskSwitcherChild<FeatureOutput<UserData, Event, ToWorker>> for DhtKvFeature<UserData> {
135    type Time = u64;
136
137    fn is_empty(&self) -> bool {
138        self.shutdown
139    }
140
141    fn empty_event(&self) -> FeatureOutput<UserData, Event, ToWorker> {
142        FeatureOutput::OnResourceEmpty
143    }
144
145    fn pop_output(&mut self, _now: u64) -> Option<FeatureOutput<UserData, Event, ToWorker>> {
146        match self.internal.pop_action()? {
147            InternalOutput::Local(service, event) => Some(FeatureOutput::Event(service, event)),
148            InternalOutput::Remote(rule, cmd) => Some(FeatureOutput::SendRoute(
149                rule,
150                NetOutgoingMeta::new(false, Default::default(), 0, true),
151                bincode::serialize(&cmd).expect("Should to bytes").into(),
152            )),
153        }
154    }
155}
156
157#[derive(Derivative)]
158#[derivative(Default(bound = ""))]
159pub struct DhtKvFeatureWorker<UserData> {
160    queue: DynamicDeque<FeatureWorkerOutput<UserData, Control, Event, ToController>, 1>,
161    shutdown: bool,
162}
163
164impl<UserData> FeatureWorker<UserData, Control, Event, ToController, ToWorker> for DhtKvFeatureWorker<UserData> {
165    fn on_input(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64, input: crate::base::FeatureWorkerInput<UserData, Control, ToWorker>) {
166        match input {
167            FeatureWorkerInput::Control(actor, control) => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
168            FeatureWorkerInput::Network(conn, header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardNetworkToController(conn, header, buf)),
169            #[cfg(feature = "vpn")]
170            FeatureWorkerInput::TunPkt(..) => {}
171            FeatureWorkerInput::FromController(..) => {
172                log::warn!("No handler for FromController");
173            }
174            FeatureWorkerInput::Local(header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardLocalToController(header, buf)),
175        }
176    }
177
178    fn on_shutdown(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64) {
179        self.shutdown = true;
180    }
181}
182
183impl<UserData> TaskSwitcherChild<FeatureWorkerOutput<UserData, Control, Event, ToController>> for DhtKvFeatureWorker<UserData> {
184    type Time = u64;
185
186    fn is_empty(&self) -> bool {
187        self.shutdown && self.queue.is_empty()
188    }
189
190    fn empty_event(&self) -> FeatureWorkerOutput<UserData, Control, Event, ToController> {
191        FeatureWorkerOutput::OnResourceEmpty
192    }
193
194    fn pop_output(&mut self, _now: u64) -> Option<FeatureWorkerOutput<UserData, Control, Event, ToController>> {
195        self.queue.pop_front()
196    }
197}