1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use crate::db::MMAVDatabase;
use crate::traits::IDatabase;
use crate::types::AggregateFn;
pub struct DatabaseTestFactory {
db_path: String,
databases: std::collections::HashMap<
String,
std::sync::Arc<std::sync::RwLock<dyn IDatabase>>,
>,
}
impl Drop for DatabaseTestFactory {
fn drop(&mut self) {
for db_path in self.databases.keys() {
std::fs::remove_dir_all(db_path).unwrap_or_default();
}
std::fs::remove_dir_all(&self.db_path).unwrap_or_default();
}
}
impl DatabaseTestFactory {
pub fn new(db_path: &str) -> Self {
let mut databases: std::collections::HashMap<
String,
std::sync::Arc<std::sync::RwLock<dyn IDatabase>>,
> = Default::default();
let mut aggregates_fn: HashMap<String, AggregateFn> = Default::default();
let test_fn = Arc::new(Mutex::new(
|_: &str, value: &[u8], aggregate: &Arc<Mutex<Vec<u8>>>| {
let obj = serde_json::from_slice::<serde_json::Value>(value)
.unwrap_or_default();
if obj["temp"].is_null() {
return;
}
aggregate
.lock()
.map(|mut x| {
let mut aggregate_obj =
serde_json::from_slice::<serde_json::Value>(&x)
.unwrap_or_default();
let mut temp_sum =
aggregate_obj["temp_sum"].as_f64().unwrap_or_default();
let mut temp_sum_count =
aggregate_obj["temp_sum_count"].as_f64().unwrap_or_default();
temp_sum += obj["temp"].as_f64().unwrap_or_default();
temp_sum_count += 1.;
let temp_avg = temp_sum / temp_sum_count;
aggregate_obj["temp_sum"] = serde_json::json!(temp_sum);
aggregate_obj["temp_sum_count"] = serde_json::json!(temp_sum_count);
aggregate_obj["temp_avg"] = serde_json::json!(temp_avg);
*x = aggregate_obj.to_string().as_bytes().to_vec();
})
.err();
},
));
aggregates_fn.insert("test-0".to_string(), test_fn.clone());
aggregates_fn.insert("test-1".to_string(), test_fn);
let mmav_db_path = format!("{db_path}_mmav");
databases.insert(
mmav_db_path.clone(),
std::sync::Arc::new(std::sync::RwLock::new(MMAVDatabase::new_with_all(
&mmav_db_path,
aggregates_fn.clone(),
))),
);
Self { db_path: db_path.to_string(), databases }
}
pub fn get_instance(
&self,
) -> &std::collections::HashMap<
String,
std::sync::Arc<std::sync::RwLock<dyn IDatabase>>,
> {
&self.databases
}
}