Skip to main content

aurum_actors/cluster/devices/
client.rs

1use crate as aurum_actors;
2use crate::cluster::devices::{
3  Device, DeviceInterval, DeviceServerMsg, DeviceServerRemoteMsg, HBReqSenderRemoteMsg, LOG_LEVEL,
4};
5use crate::cluster::{IntervalStorage, FAILURE_MODE};
6use crate::core::{
7  Actor, ActorContext, ActorRef, Destination, LocalRef, Node, Socket, UdpSerial, UnifiedType,
8};
9use crate::testkit::FailureConfigMap;
10use crate::{info, trace, AurumInterface};
11use async_trait::async_trait;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use std::collections::{hash_map::Entry::*, HashMap, VecDeque};
15use std::hash::Hash;
16use std::sync::Arc;
17use std::time::Duration;
18use DeviceClientCmd::*;
19use DeviceClientMsg::*;
20use DeviceClientRemoteMsg::*;
21use DeviceServerRemoteMsg::*;
22use HBReqSenderRemoteMsg::*;
23
24#[derive(Clone, Serialize, Deserialize)]
25pub struct DeviceClientConfig {
26  pub phi: f64,
27  pub storage_capacity: u32,
28  pub times: u32,
29  pub log_capacity: u32,
30  pub seeds: im::HashSet<Socket>,
31  pub initial_interval: Duration,
32}
33impl Default for DeviceClientConfig {
34  fn default() -> Self {
35    Self {
36      phi: 0.995,
37      storage_capacity: 10,
38      times: 5,
39      log_capacity: 10,
40      seeds: im::hashset![],
41      initial_interval: Duration::from_millis(1000),
42    }
43  }
44}
45impl DeviceClientConfig {
46  fn new_storage(&self, init: Duration) -> IntervalStorage {
47    IntervalStorage::new(self.storage_capacity as usize, init * 2, self.times as usize, None)
48  }
49}
50
51pub struct Manager(pub Option<Socket>);
52
53#[derive(AurumInterface)]
54#[aurum(local)]
55pub enum DeviceClientMsg<U: UnifiedType> {
56  Tick,
57  #[aurum]
58  Remote(DeviceClientRemoteMsg<U>),
59  #[aurum(local)]
60  Cmd(DeviceClientCmd),
61}
62
63#[derive(Serialize, Deserialize)]
64#[serde(bound = "U: UnifiedType")]
65pub enum DeviceClientRemoteMsg<U: UnifiedType> {
66  HeartbeatRequest(ActorRef<U, HBReqSenderRemoteMsg>),
67  IntervalAck(Socket, DeviceInterval),
68}
69
70pub enum DeviceClientCmd {
71  SetInterval(Duration),
72  Subscribe(LocalRef<Manager>),
73}
74
75pub struct DeviceClient<U: UnifiedType> {
76  my_info: Device,
77  interval: DeviceInterval,
78  config: DeviceClientConfig,
79  svr_dest: Destination<U, DeviceServerRemoteMsg>,
80  server: Option<Socket>,
81  server_pov: DeviceInterval,
82  storage: IntervalStorage,
83  server_log: FrequencyBuffer<ActorRef<U, HBReqSenderRemoteMsg>>,
84  fail_map: FailureConfigMap,
85  subscribers: Vec<LocalRef<Manager>>,
86}
87impl<U: UnifiedType> DeviceClient<U> {
88  pub fn new(
89    node: &Node<U>,
90    config: DeviceClientConfig,
91    name: String,
92    fail_map: FailureConfigMap,
93    subscribers: Vec<LocalRef<Manager>>,
94  ) -> LocalRef<DeviceClientCmd> {
95    let actor = Self {
96      my_info: Device {
97        socket: node.socket().clone(),
98      },
99      interval: DeviceInterval {
100        clock: 1,
101        interval: config.initial_interval,
102      },
103      server: None,
104      server_pov: DeviceInterval {
105        clock: 0,
106        interval: config.initial_interval,
107      },
108      svr_dest: Destination::new::<DeviceServerMsg>(name.clone()),
109      storage: config.new_storage(config.initial_interval),
110      server_log: FrequencyBuffer::new(config.log_capacity),
111      config: config,
112      fail_map: fail_map,
113      subscribers: subscribers,
114    };
115    node.spawn(false, actor, name, true).local().clone().unwrap().transform()
116  }
117
118  async fn notify_server(&self, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
119    let msg = SetHeartbeatInterval(self.my_info.clone(), self.interval);
120    let ser = Arc::new(UdpSerial::msg(&self.svr_dest, &msg));
121    match &self.server {
122      Some(svr) => {
123        trace!(
124          LOG_LEVEL,
125          &ctx.node,
126          format!("On server: SETTING {} to {:?}", svr.udp, self.interval)
127        );
128        ctx.node.udp_select(svr, &ser, FAILURE_MODE, &self.fail_map).await;
129      }
130      None => {
131        trace!(
132          LOG_LEVEL,
133          &ctx.node,
134          format!(
135            "Server undefined: on {:?}, SETTING to {:?}",
136            self.config.seeds.iter().map(|x| x.udp).collect::<Vec<_>>(),
137            self.interval
138          )
139        );
140        for seed in self.config.seeds.iter() {
141          ctx.node.udp_select(seed, &ser, FAILURE_MODE, &self.fail_map).await;
142        }
143      }
144    }
145  }
146
147  async fn new_interval(&mut self, dur: Duration, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
148    self.interval.clock += 1;
149    self.interval.interval = dur;
150    self.notify_server(ctx).await;
151  }
152
153  fn set_server(&mut self, svr: Option<Socket>) {
154    self.subscribers.retain(|s| s.send(Manager(svr.clone())));
155    self.server = svr;
156  }
157}
158#[async_trait]
159impl<U: UnifiedType> Actor<U, DeviceClientMsg<U>> for DeviceClient<U> {
160  async fn pre_start(&mut self, ctx: &ActorContext<U, DeviceClientMsg<U>>) {
161    self.notify_server(ctx).await;
162    ctx.node.schedule_local_msg(self.interval.interval, ctx.local_interface(), Tick);
163  }
164
165  async fn recv(&mut self, ctx: &ActorContext<U, DeviceClientMsg<U>>, msg: DeviceClientMsg<U>) {
166    match msg {
167      Tick => {
168        let phi = self.storage.phi();
169        trace!(LOG_LEVEL, &ctx.node, format!("Received tick; {:?}", self.storage));
170        if phi > self.config.phi {
171          info!(LOG_LEVEL, &ctx.node, "Assuming the server is down");
172          self.set_server(None);
173          self.notify_server(ctx).await;
174        }
175        ctx.node.schedule_local_msg(self.interval.interval, ctx.local_interface(), Tick);
176      }
177      Remote(HeartbeatRequest(sender)) => {
178        trace!(
179          LOG_LEVEL,
180          &ctx.node,
181          format!("Heartbeat request from {} received:", sender.socket.udp)
182        );
183        if self.server.as_ref().filter(|x| *x == &sender.socket).is_none() {
184          self.storage = self.config.new_storage(self.interval.interval);
185          self.set_server(Some(sender.socket.clone()));
186          self.config.seeds.insert(sender.socket.clone());
187        } else {
188          self.storage.push();
189        }
190        let ser = Arc::new(UdpSerial::msg(&sender.dest, &Heartbeat));
191        ctx.node.udp_select(&sender.socket, &ser, FAILURE_MODE, &self.fail_map).await;
192        self.server_log.push(sender);
193        if self.server_log.changes + 1 != self.server_log.frequencies.len() as u32 {
194          let log = format!(
195            "Multiple senders detected: {:?}",
196            self.server_log.frequencies.keys().map(|x| x.socket.to_string()).collect_vec()
197          );
198          info!(LOG_LEVEL, &ctx.node, log);
199          for svr in self.server_log.frequencies.keys() {
200            let ser = Arc::new(UdpSerial::msg(&svr.dest, &MultipleSenders));
201            ctx.node.udp_select(&svr.socket, &ser, FAILURE_MODE, &self.fail_map).await;
202          }
203        }
204      }
205      Remote(IntervalAck(socket, interval)) => {
206        trace!(
207          LOG_LEVEL,
208          &ctx.node,
209          format!(
210            "IntervalAck from {} received: {:?}; previous: {:?}; current: {:?}",
211            socket.udp, interval, self.server_pov, self.interval
212          )
213        );
214        self.set_server(Some(socket));
215        self.server_pov = interval;
216        if self.server_pov == self.interval {
217          self.storage = self.config.new_storage(self.interval.interval);
218        } else if self.server_pov.clock < self.interval.clock {
219          self.notify_server(ctx).await;
220        } else {
221          self.interval.clock = self.server_pov.clock;
222          self.new_interval(self.interval.interval, ctx).await;
223        }
224      }
225      Cmd(SetInterval(dur)) => self.new_interval(dur, ctx).await,
226      Cmd(Subscribe(subr)) => {
227        subr.send(Manager(self.server.clone()));
228        self.subscribers.push(subr);
229      }
230    }
231  }
232}
233
234struct FrequencyBuffer<T: Eq + PartialEq + Clone + Hash> {
235  buffer: VecDeque<T>,
236  frequencies: HashMap<T, u32>,
237  capacity: u32,
238  changes: u32,
239}
240impl<T: Eq + PartialEq + Clone + Hash> FrequencyBuffer<T> {
241  fn new(cap: u32) -> Self {
242    Self {
243      buffer: VecDeque::new(),
244      frequencies: HashMap::new(),
245      capacity: cap,
246      changes: 0,
247    }
248  }
249
250  fn pop(&mut self) {
251    if let Some(removed) = self.buffer.pop_back() {
252      if self.buffer.back().filter(|x| *x != &removed).is_some() {
253        self.changes -= 1;
254      }
255      if let Occupied(mut o) = self.frequencies.entry(removed) {
256        let m = o.get_mut();
257        if *m == 1 {
258          o.remove();
259        } else {
260          *m -= 1
261        }
262      }
263    }
264  }
265
266  fn push(&mut self, item: T) {
267    while self.buffer.len() >= self.capacity as usize {
268      self.pop();
269    }
270    if self.buffer.front().filter(|x| *x != &item).is_some() {
271      self.changes += 1;
272    }
273    let count = self.frequencies.entry(item.clone()).or_insert(0);
274    *count += 1;
275    self.buffer.push_front(item);
276  }
277}