aurum_actors/cluster/devices/
client.rs1use crate as aurum_actors;
2use crate::cluster::devices::{
3 Device, DeviceInterval, DeviceServerMsg, DeviceServerRemoteMsg, HBReqSenderRemoteMsg, LOG_LEVEL,
4};
5use crate::cluster::{IntervalStorage, FAILURE_MODE};
6use crate::core::{
7 Actor, ActorContext, ActorRef, Destination, LocalRef, Node, Socket, UdpSerial, UnifiedType,
8};
9use crate::testkit::FailureConfigMap;
10use crate::{info, trace, AurumInterface};
11use async_trait::async_trait;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use std::collections::{hash_map::Entry::*, HashMap, VecDeque};
15use std::hash::Hash;
16use std::sync::Arc;
17use std::time::Duration;
18use DeviceClientCmd::*;
19use DeviceClientMsg::*;
20use DeviceClientRemoteMsg::*;
21use DeviceServerRemoteMsg::*;
22use HBReqSenderRemoteMsg::*;
23
24#[derive(Clone, Serialize, Deserialize)]
25pub struct DeviceClientConfig {
26 pub phi: f64,
27 pub storage_capacity: u32,
28 pub times: u32,
29 pub log_capacity: u32,
30 pub seeds: im::HashSet<Socket>,
31 pub initial_interval: Duration,
32}
33impl Default for DeviceClientConfig {
34 fn default() -> Self {
35 Self {
36 phi: 0.995,
37 storage_capacity: 10,
38 times: 5,
39 log_capacity: 10,
40 seeds: im::hashset![],
41 initial_interval: Duration::from_millis(1000),
42 }
43 }
44}
45impl DeviceClientConfig {
46 fn new_storage(&self, init: Duration) -> IntervalStorage {
47 IntervalStorage::new(self.storage_capacity as usize, init * 2, self.times as usize, None)
48 }
49}
50
51pub struct Manager(pub Option<Socket>);
52
53#[derive(AurumInterface)]
54#[aurum(local)]
55pub enum DeviceClientMsg<U: UnifiedType> {
56 Tick,
57 #[aurum]
58 Remote(DeviceClientRemoteMsg<U>),
59 #[aurum(local)]
60 Cmd(DeviceClientCmd),
61}
62
63#[derive(Serialize, Deserialize)]
64#[serde(bound = "U: UnifiedType")]
65pub enum DeviceClientRemoteMsg<U: UnifiedType> {
66 HeartbeatRequest(ActorRef<U, HBReqSenderRemoteMsg>),
67 IntervalAck(Socket, DeviceInterval),
68}
69
70pub enum DeviceClientCmd {
71 SetInterval(Duration),
72 Subscribe(LocalRef<Manager>),
73}
74
75pub struct DeviceClient<U: UnifiedType> {
76 my_info: Device,
77 interval: DeviceInterval,
78 config: DeviceClientConfig,
79 svr_dest: Destination<U, DeviceServerRemoteMsg>,
80 server: Option<Socket>,
81 server_pov: DeviceInterval,
82 storage: IntervalStorage,
83 server_log: FrequencyBuffer<ActorRef<U, HBReqSenderRemoteMsg>>,
84 fail_map: FailureConfigMap,
85 subscribers: Vec<LocalRef<Manager>>,
86}
87impl<U: UnifiedType> DeviceClient<U> {
88 pub fn new(
89 node: &Node<U>,
90 config: DeviceClientConfig,
91 name: String,
92 fail_map: FailureConfigMap,
93 subscribers: Vec<LocalRef<Manager>>,
94 ) -> LocalRef<DeviceClientCmd> {
95 let actor = Self {
96 my_info: Device {
97 socket: node.socket().clone(),
98 },
99 interval: DeviceInterval {
100 clock: 1,
101 interval: config.initial_interval,
102 },
103 server: None,
104 server_pov: DeviceInterval {
105 clock: 0,
106 interval: config.initial_interval,
107 },
108 svr_dest: Destination::new::<DeviceServerMsg>(name.clone()),
109 storage: config.new_storage(config.initial_interval),
110 server_log: FrequencyBuffer::new(config.log_capacity),
111 config: config,
112 fail_map: fail_map,
113 subscribers: subscribers,
114 };
115 node.spawn(false, actor, name, true).local().clone().unwrap().transform()
116 }
117
118 async fn notify_server(&self, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
119 let msg = SetHeartbeatInterval(self.my_info.clone(), self.interval);
120 let ser = Arc::new(UdpSerial::msg(&self.svr_dest, &msg));
121 match &self.server {
122 Some(svr) => {
123 trace!(
124 LOG_LEVEL,
125 &ctx.node,
126 format!("On server: SETTING {} to {:?}", svr.udp, self.interval)
127 );
128 ctx.node.udp_select(svr, &ser, FAILURE_MODE, &self.fail_map).await;
129 }
130 None => {
131 trace!(
132 LOG_LEVEL,
133 &ctx.node,
134 format!(
135 "Server undefined: on {:?}, SETTING to {:?}",
136 self.config.seeds.iter().map(|x| x.udp).collect::<Vec<_>>(),
137 self.interval
138 )
139 );
140 for seed in self.config.seeds.iter() {
141 ctx.node.udp_select(seed, &ser, FAILURE_MODE, &self.fail_map).await;
142 }
143 }
144 }
145 }
146
147 async fn new_interval(&mut self, dur: Duration, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
148 self.interval.clock += 1;
149 self.interval.interval = dur;
150 self.notify_server(ctx).await;
151 }
152
153 fn set_server(&mut self, svr: Option<Socket>) {
154 self.subscribers.retain(|s| s.send(Manager(svr.clone())));
155 self.server = svr;
156 }
157}
158#[async_trait]
159impl<U: UnifiedType> Actor<U, DeviceClientMsg<U>> for DeviceClient<U> {
160 async fn pre_start(&mut self, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
161 self.notify_server(ctx).await;
162 ctx.node.schedule_local_msg(self.interval.interval, ctx.local_interface(), Tick);
163 }
164
165 async fn recv(&mut self, ctx: &ActorContext<U, DeviceClientMsg<U>>, msg: DeviceClientMsg<U>) {
166 match msg {
167 Tick => {
168 let phi = self.storage.phi();
169 trace!(LOG_LEVEL, &ctx.node, format!("Received tick; {:?}", self.storage));
170 if phi > self.config.phi {
171 info!(LOG_LEVEL, &ctx.node, "Assuming the server is down");
172 self.set_server(None);
173 self.notify_server(ctx).await;
174 }
175 ctx.node.schedule_local_msg(self.interval.interval, ctx.local_interface(), Tick);
176 }
177 Remote(HeartbeatRequest(sender)) => {
178 trace!(
179 LOG_LEVEL,
180 &ctx.node,
181 format!("Heartbeat request from {} received:", sender.socket.udp)
182 );
183 if self.server.as_ref().filter(|x| *x == &sender.socket).is_none() {
184 self.storage = self.config.new_storage(self.interval.interval);
185 self.set_server(Some(sender.socket.clone()));
186 self.config.seeds.insert(sender.socket.clone());
187 } else {
188 self.storage.push();
189 }
190 let ser = Arc::new(UdpSerial::msg(&sender.dest, &Heartbeat));
191 ctx.node.udp_select(&sender.socket, &ser, FAILURE_MODE, &self.fail_map).await;
192 self.server_log.push(sender);
193 if self.server_log.changes + 1 != self.server_log.frequencies.len() as u32 {
194 let log = format!(
195 "Multiple senders detected: {:?}",
196 self.server_log.frequencies.keys().map(|x| x.socket.to_string()).collect_vec()
197 );
198 info!(LOG_LEVEL, &ctx.node, log);
199 for svr in self.server_log.frequencies.keys() {
200 let ser = Arc::new(UdpSerial::msg(&svr.dest, &MultipleSenders));
201 ctx.node.udp_select(&svr.socket, &ser, FAILURE_MODE, &self.fail_map).await;
202 }
203 }
204 }
205 Remote(IntervalAck(socket, interval)) => {
206 trace!(
207 LOG_LEVEL,
208 &ctx.node,
209 format!(
210 "IntervalAck from {} received: {:?}; previous: {:?}; current: {:?}",
211 socket.udp, interval, self.server_pov, self.interval
212 )
213 );
214 self.set_server(Some(socket));
215 self.server_pov = interval;
216 if self.server_pov == self.interval {
217 self.storage = self.config.new_storage(self.interval.interval);
218 } else if self.server_pov.clock < self.interval.clock {
219 self.notify_server(ctx).await;
220 } else {
221 self.interval.clock = self.server_pov.clock;
222 self.new_interval(self.interval.interval, ctx).await;
223 }
224 }
225 Cmd(SetInterval(dur)) => self.new_interval(dur, ctx).await,
226 Cmd(Subscribe(subr)) => {
227 subr.send(Manager(self.server.clone()));
228 self.subscribers.push(subr);
229 }
230 }
231 }
232}
233
234struct FrequencyBuffer<T: Eq + PartialEq + Clone + Hash> {
235 buffer: VecDeque<T>,
236 frequencies: HashMap<T, u32>,
237 capacity: u32,
238 changes: u32,
239}
240impl<T: Eq + PartialEq + Clone + Hash> FrequencyBuffer<T> {
241 fn new(cap: u32) -> Self {
242 Self {
243 buffer: VecDeque::new(),
244 frequencies: HashMap::new(),
245 capacity: cap,
246 changes: 0,
247 }
248 }
249
250 fn pop(&mut self) {
251 if let Some(removed) = self.buffer.pop_back() {
252 if self.buffer.back().filter(|x| *x != &removed).is_some() {
253 self.changes -= 1;
254 }
255 if let Occupied(mut o) = self.frequencies.entry(removed) {
256 let m = o.get_mut();
257 if *m == 1 {
258 o.remove();
259 } else {
260 *m -= 1
261 }
262 }
263 }
264 }
265
266 fn push(&mut self, item: T) {
267 while self.buffer.len() >= self.capacity as usize {
268 self.pop();
269 }
270 if self.buffer.front().filter(|x| *x != &item).is_some() {
271 self.changes += 1;
272 }
273 let count = self.frequencies.entry(item.clone()).or_insert(0);
274 *count += 1;
275 self.buffer.push_front(item);
276 }
277}