phi_accrual_detector/
lib.rs1use std::error::Error;
71use std::ops::Sub;
72use std::sync::{Arc};
73use tokio::sync::{RwLock, RwLockReadGuard};
74use async_trait::async_trait;
75use libm::{erf, log10};
76use chrono::{DateTime, Local, TimeDelta};
77
78#[derive(Clone, Debug)]
80pub struct Statistics {
81 arrival_intervals: Vec<u64>,
82 last_arrived_at: DateTime<Local>,
83 window_length: u32,
84 n: u32,
85}
86
87#[derive(Debug)]
89pub struct Detector {
90 statistics: RwLock<Statistics>,
91 acceptable_pause: TimeDelta,
92}
93
94impl Detector {
95 pub fn new(window_length: u32) -> Self {
97 Detector {
98 statistics: RwLock::new(Statistics::new(window_length)),
99 acceptable_pause: TimeDelta::milliseconds(0),
100 }
101 }
102
103 pub fn with_acceptable_pause(window_length: u32, acceptable_pause: TimeDelta) -> Self {
105 Detector {
106 statistics: RwLock::new(Statistics::new(window_length)),
107 acceptable_pause,
108 }
109 }
110}
111
112impl Statistics {
113 pub fn new(window_length: u32) -> Self {
115 Self {
116 arrival_intervals: vec![],
117 last_arrived_at: Local::now(),
118 window_length,
119 n: 0,
120 }
121 }
122
123 pub fn insert(&mut self, arrived_at: DateTime<Local>) {
125
126 if self.n == 0 {
128 self.last_arrived_at = arrived_at;
129 self.n += 1;
130 return;
131 }
132
133
134 if self.n - 1 == self.window_length {
135 self.arrival_intervals.remove(0);
136 self.n -= 1;
137 }
138 if self.n != 0 {
139 let arrival_interval = arrived_at.sub(self.last_arrived_at).num_milliseconds() as u64;
140 self.arrival_intervals.push(arrival_interval);
141 }
142 self.last_arrived_at = arrived_at;
143 self.n += 1;
144 }
145}
146
147#[async_trait]
149trait PhiCore {
150 async fn mean_with_stats<'a>(&self, stats: Arc<RwLockReadGuard<'a, Statistics>>) -> Result<f64, Box<dyn Error>>;
152
153 async fn variance_and_mean(&self) -> Result<(f64, f64), Box<dyn Error>>;
155}
156
157#[async_trait]
159pub trait PhiInteraction {
160 async fn insert(&self, arrived_at: DateTime<Local>) -> Result<(), Box<dyn Error>>;
162
163 async fn phi(&self, t: DateTime<Local>) -> Result<f64, Box<dyn Error>>;
165
166 async fn last_arrived_at(&self) -> Result<DateTime<Local>, Box<dyn Error>>;
168}
169
170#[async_trait]
172impl PhiCore for Detector {
173 async fn mean_with_stats<'a>(&self, stats: Arc<RwLockReadGuard<'a, Statistics>>) -> Result<f64, Box<dyn Error>> {
174 let mut mean: f64 = 0.;
175 let len = &stats.arrival_intervals.len();
176 for v in &stats.arrival_intervals {
177 mean += *v as f64 / *len as f64;
178 }
179 Ok(mean)
180 }
181
182 async fn variance_and_mean(&self) -> Result<(f64, f64), Box<dyn Error>> {
183 let mut variance: f64 = 0.;
184 let stats = Arc::new(self.statistics.read().await);
185 let mu = self.mean_with_stats(Arc::clone(&stats)).await?;
186 let len = &stats.arrival_intervals.len();
187 for v in &stats.arrival_intervals {
188 let val = ((*v as f64 - mu) * (*v as f64 - mu)) / *len as f64;
189 variance += val;
190 }
191 Ok((variance, mu))
192 }
193}
194
195fn normal_cdf(t: f64, mu: f64, sigma: f64) -> f64 {
197 if sigma == 0. {
198 return if t == mu {
199 1.
200 } else {
201 0.
202 };
203 }
204
205 let z = (t - mu) / sigma;
206 0.5 + 0.5 * (erf(z))
207}
208
209#[async_trait]
211impl PhiInteraction for Detector {
212 async fn insert(&self, arrived_at: DateTime<Local>) -> Result<(), Box<dyn Error>> {
213 let mut stats = self.statistics.write().await;
214 stats.insert(arrived_at);
215 Ok(())
216 }
217
218 async fn phi(&self, t: DateTime<Local>) -> Result<f64, Box<dyn Error>> {
219 let (sigma_sq, mu) = self.variance_and_mean().await?;
220 let sigma = sigma_sq.sqrt();
221 let last_arrived_at = self.last_arrived_at().await?;
222 let time_diff = t.sub(last_arrived_at).sub(self.acceptable_pause);
223 let ft = normal_cdf(time_diff.num_milliseconds() as f64, mu, sigma);
224 let phi = -log10(1. - ft);
225 Ok(phi)
226 }
227
228 async fn last_arrived_at(&self) -> Result<DateTime<Local>, Box<dyn Error>> {
229 Ok(self.statistics.read().await.last_arrived_at)
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::ops::Add;
236 use chrono::{Duration, Local, TimeDelta};
237 use tokio::sync::RwLock;
238 use crate::{Detector, PhiCore, PhiInteraction, Statistics};
239
240 #[tokio::test]
241 async fn test_variant_mean_and_variance_combo_calculation() {
242 let mut stats = Statistics::new(10);
243 let mut i = 0;
244 let mut curr_time = Local::now();
245 &stats.insert(curr_time.clone());
246 let expect_vals = [1630, 4421, 1514, 216, 231, 931, 4182, 102, 104, 241, 5132];
247 while i < expect_vals.len() {
248 curr_time = curr_time.add(Duration::milliseconds(expect_vals[i]));
249 let arrived_at = curr_time;
250 &stats.insert(arrived_at);
251 i += 1;
252 }
253 let detector = Detector {
254 statistics: RwLock::new(stats),
255 acceptable_pause: TimeDelta::milliseconds(0),
256 };
257 let (mut variance, mut mean) = detector.variance_and_mean().await.unwrap();
258 mean = (mean * 100.0).round() * 0.01;
259 variance = (variance * 100.0).round() * 0.01;
260 assert_eq!(1707.4, mean);
261 assert_eq!(3755791.64, variance);
262
263 let mut suspicion_level: Vec<f64> = vec![];
264 for i in 1..10 {
265 curr_time = curr_time.add(Duration::milliseconds(250));
266 suspicion_level.push(detector.phi(curr_time).await.unwrap())
267 }
268 println!("suspicion -> {:?}", suspicion_level);
269 for i in 1..suspicion_level.len() {
270 assert!(suspicion_level[i] > suspicion_level[i - 1]);
271 }
272 }
273
274 #[tokio::test]
275 async fn test_constant_phi_with_constant_pings_calculation() {
276 let stats = Statistics::new(10);
277 let detector = Detector {
278 statistics: RwLock::new(stats),
279 acceptable_pause: TimeDelta::milliseconds(0),
280 };
281 let mut i = 0;
282 let mut curr_time = Local::now();
283 while i <= 100 {
284 let arrived_at = curr_time;
285 &detector.insert(arrived_at).await;
286 curr_time = curr_time.add(Duration::milliseconds(10));
287 i += 10;
288 }
289 let (mut variance, mut mean) = detector.variance_and_mean().await.unwrap();
290 mean = (mean * 100.0).round() * 0.01;
291 variance = (variance * 100.0).round() * 0.01;
292 assert_eq!(10., mean);
293 assert_eq!(0., variance);
294 curr_time = curr_time.add(Duration::milliseconds(10));
295 assert_eq!(0., detector.phi(curr_time).await.unwrap());
296 }
297}