atm0s_sdn_network/features/dht_kv/
mod.rs1use std::fmt::Debug;
10
11use atm0s_sdn_identity::NodeId;
12use derivative::Derivative;
13use sans_io_runtime::{collections::DynamicDeque, TaskSwitcherChild};
14
15use crate::base::{Feature, FeatureContext, FeatureInput, FeatureOutput, FeatureSharedInput, FeatureWorker, FeatureWorkerInput, FeatureWorkerOutput, NetOutgoingMeta};
16
17use self::{
18 internal::InternalOutput,
19 msg::{NodeSession, Version},
20};
21
22mod client;
23mod internal;
24mod msg;
25mod server;
26
27pub use self::msg::{Key, Map};
28
29pub const FEATURE_ID: u8 = 4;
30pub const FEATURE_NAME: &str = "dht_kv";
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum MapControl {
34 Set(Key, Vec<u8>),
35 Del(Key),
36 Sub,
37 Unsub,
38}
39
40impl MapControl {
41 pub fn is_creator(&self) -> bool {
42 matches!(self, MapControl::Set(_, _) | MapControl::Sub)
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum Control {
48 MapCmd(Map, MapControl),
49 MapGet(Map),
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum GetError {
54 Timeout,
55 NotFound,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum MapEvent {
60 OnSet(Key, NodeId, Vec<u8>),
61 OnDel(Key, NodeId),
62 OnRelaySelected(NodeId),
63}
64
65type MapGetRs = Result<Vec<(Key, NodeSession, Version, Vec<u8>)>, GetError>;
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum Event {
69 MapEvent(Map, MapEvent),
70 MapGetRes(Map, MapGetRs),
71}
72
73#[derive(Debug, Clone)]
74pub struct ToWorker;
75
76#[derive(Debug, Clone)]
77pub struct ToController;
78
79pub type Output<UserData> = FeatureOutput<UserData, Event, ToWorker>;
80pub type WorkerOutput<UserData> = FeatureWorkerOutput<UserData, Control, Event, ToController>;
81
82pub struct DhtKvFeature<UserData> {
83 internal: internal::DhtKvInternal<UserData>,
84 shutdown: bool,
85}
86
87impl<UserData: Eq + Copy + Debug> DhtKvFeature<UserData> {
88 pub fn new(node_id: NodeId, session: u64) -> Self {
89 Self {
90 internal: internal::DhtKvInternal::new(NodeSession(node_id, session)),
91 shutdown: false,
92 }
93 }
94}
95
96impl<UserData: Eq + Copy + Debug> Feature<UserData, Control, Event, ToController, ToWorker> for DhtKvFeature<UserData> {
97 fn on_shared_input(&mut self, _ctx: &FeatureContext, now: u64, input: FeatureSharedInput) {
98 if let FeatureSharedInput::Tick(_) = input {
99 self.internal.on_tick(now);
100 }
101 }
102
103 fn on_input(&mut self, _ctx: &FeatureContext, now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
104 match input {
105 FeatureInput::Control(actor, control) => {
106 log::debug!("[DhtKv] on ext input: actor={:?}, control={:?}", actor, control);
107 self.internal.on_local(now_ms, actor, control);
108 }
109 FeatureInput::Local(_header, buf) => {
110 if let Ok(cmd) = bincode::deserialize(&buf) {
111 self.internal.on_remote(now_ms, cmd)
112 }
113 }
114 FeatureInput::Net(_conn, meta, buf) => {
115 if !meta.secure {
116 log::warn!("[DhtKv] reject unsecure message");
118 return;
119 }
120 if let Ok(cmd) = bincode::deserialize(&buf) {
121 self.internal.on_remote(now_ms, cmd)
122 }
123 }
124 _ => {}
125 }
126 }
127
128 fn on_shutdown(&mut self, _ctx: &FeatureContext, _now: u64) {
129 log::info!("[DhtKvFeature] Shutdown");
130 self.shutdown = true;
131 }
132}
133
134impl<UserData: Eq + Copy + Debug> TaskSwitcherChild<FeatureOutput<UserData, Event, ToWorker>> for DhtKvFeature<UserData> {
135 type Time = u64;
136
137 fn is_empty(&self) -> bool {
138 self.shutdown
139 }
140
141 fn empty_event(&self) -> FeatureOutput<UserData, Event, ToWorker> {
142 FeatureOutput::OnResourceEmpty
143 }
144
145 fn pop_output(&mut self, _now: u64) -> Option<FeatureOutput<UserData, Event, ToWorker>> {
146 match self.internal.pop_action()? {
147 InternalOutput::Local(service, event) => Some(FeatureOutput::Event(service, event)),
148 InternalOutput::Remote(rule, cmd) => Some(FeatureOutput::SendRoute(
149 rule,
150 NetOutgoingMeta::new(false, Default::default(), 0, true),
151 bincode::serialize(&cmd).expect("Should to bytes").into(),
152 )),
153 }
154 }
155}
156
157#[derive(Derivative)]
158#[derivative(Default(bound = ""))]
159pub struct DhtKvFeatureWorker<UserData> {
160 queue: DynamicDeque<FeatureWorkerOutput<UserData, Control, Event, ToController>, 1>,
161 shutdown: bool,
162}
163
164impl<UserData> FeatureWorker<UserData, Control, Event, ToController, ToWorker> for DhtKvFeatureWorker<UserData> {
165 fn on_input(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64, input: crate::base::FeatureWorkerInput<UserData, Control, ToWorker>) {
166 match input {
167 FeatureWorkerInput::Control(actor, control) => self.queue.push_back(FeatureWorkerOutput::ForwardControlToController(actor, control)),
168 FeatureWorkerInput::Network(conn, header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardNetworkToController(conn, header, buf)),
169 #[cfg(feature = "vpn")]
170 FeatureWorkerInput::TunPkt(..) => {}
171 FeatureWorkerInput::FromController(..) => {
172 log::warn!("No handler for FromController");
173 }
174 FeatureWorkerInput::Local(header, buf) => self.queue.push_back(FeatureWorkerOutput::ForwardLocalToController(header, buf)),
175 }
176 }
177
178 fn on_shutdown(&mut self, _ctx: &mut crate::base::FeatureWorkerContext, _now: u64) {
179 self.shutdown = true;
180 }
181}
182
183impl<UserData> TaskSwitcherChild<FeatureWorkerOutput<UserData, Control, Event, ToController>> for DhtKvFeatureWorker<UserData> {
184 type Time = u64;
185
186 fn is_empty(&self) -> bool {
187 self.shutdown && self.queue.is_empty()
188 }
189
190 fn empty_event(&self) -> FeatureWorkerOutput<UserData, Control, Event, ToController> {
191 FeatureWorkerOutput::OnResourceEmpty
192 }
193
194 fn pop_output(&mut self, _now: u64) -> Option<FeatureWorkerOutput<UserData, Control, Event, ToController>> {
195 self.queue.pop_front()
196 }
197}