1use std::collections::HashMap;
2use std::sync::RwLock;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum EcReducer {
6 Sum,
7 Max,
8 Min,
9 Count,
10 Average,
11 Last,
12}
13
14impl EcReducer {
15 pub fn from_str(s: &str) -> Self {
16 match s.to_lowercase().as_str() {
17 "max" => Self::Max,
18 "min" => Self::Min,
19 "count" => Self::Count,
20 "average" | "avg" => Self::Average,
21 "last" | "lww" => Self::Last,
22 _ => Self::Sum,
23 }
24 }
25
26 pub fn as_str(&self) -> &'static str {
27 match self {
28 Self::Sum => "sum",
29 Self::Max => "max",
30 Self::Min => "min",
31 Self::Count => "count",
32 Self::Average => "average",
33 Self::Last => "last",
34 }
35 }
36
37 pub fn apply(&self, current: f64, incoming: f64, count: u64) -> f64 {
38 match self {
39 Self::Sum => current + incoming,
40 Self::Max => current.max(incoming),
41 Self::Min => {
42 if current == 0.0 && count == 0 {
43 incoming
44 } else {
45 current.min(incoming)
46 }
47 }
48 Self::Count => current + 1.0,
49 Self::Average => {
50 if count == 0 {
51 incoming
52 } else {
53 (current * count as f64 + incoming) / (count as f64 + 1.0)
54 }
55 }
56 Self::Last => incoming,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum EcMode {
63 Sync,
64 Async,
65}
66
67impl EcMode {
68 pub fn from_str(s: &str) -> Self {
69 match s.to_lowercase().as_str() {
70 "sync" | "immediate" => Self::Sync,
71 _ => Self::Async,
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct EcFieldConfig {
78 pub collection: String,
79 pub field: String,
80 pub field_path: Option<String>,
81 pub reducer: EcReducer,
82 pub initial_value: f64,
83 pub mode: EcMode,
84 pub consolidation_interval_secs: u64,
85 pub consolidation_window_hours: u64,
86 pub retention_days: u64,
87}
88
89impl EcFieldConfig {
90 pub fn new(collection: &str, field: &str) -> Self {
91 Self {
92 collection: collection.to_string(),
93 field: field.to_string(),
94 field_path: None,
95 reducer: EcReducer::Sum,
96 initial_value: 0.0,
97 mode: EcMode::Async,
98 consolidation_interval_secs: 60,
99 consolidation_window_hours: 24,
100 retention_days: 7,
101 }
102 }
103
104 pub fn tx_collection_name(&self) -> String {
105 format!("red_ec_tx_{}_{}", self.collection, self.field)
106 }
107}
108
109pub struct EcRegistry {
110 fields: RwLock<HashMap<(String, String), EcFieldConfig>>,
111}
112
113impl EcRegistry {
114 pub fn new() -> Self {
115 Self {
116 fields: RwLock::new(HashMap::new()),
117 }
118 }
119
120 pub fn register(&self, config: EcFieldConfig) {
121 let key = (config.collection.clone(), config.field.clone());
122 self.fields
123 .write()
124 .unwrap_or_else(|e| e.into_inner())
125 .insert(key, config);
126 }
127
128 pub fn get(&self, collection: &str, field: &str) -> Option<EcFieldConfig> {
129 self.fields
130 .read()
131 .unwrap_or_else(|e| e.into_inner())
132 .get(&(collection.to_string(), field.to_string()))
133 .cloned()
134 }
135
136 pub fn is_ec_field(&self, collection: &str, field: &str) -> bool {
137 self.fields
138 .read()
139 .unwrap_or_else(|e| e.into_inner())
140 .contains_key(&(collection.to_string(), field.to_string()))
141 }
142
143 pub fn all_configs(&self) -> Vec<EcFieldConfig> {
144 self.fields
145 .read()
146 .unwrap_or_else(|e| e.into_inner())
147 .values()
148 .cloned()
149 .collect()
150 }
151
152 pub fn async_configs(&self) -> Vec<EcFieldConfig> {
153 self.fields
154 .read()
155 .unwrap_or_else(|e| e.into_inner())
156 .values()
157 .filter(|c| c.mode == EcMode::Async)
158 .cloned()
159 .collect()
160 }
161
162 pub fn load_from_config_store(&self, store: &crate::storage::unified::store::UnifiedStore) {
163 let manager = match store.get_collection("red_config") {
164 Some(m) => m,
165 None => return,
166 };
167
168 let mut ec_collections: HashMap<String, Vec<String>> = HashMap::new();
169
170 manager.for_each_entity(|entity| {
171 if let Some(row) = entity.data.as_row() {
172 let key = row.get_field("key").and_then(|v| match v {
173 crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
174 _ => None,
175 });
176 if let Some(k) = key {
177 if let Some(rest) = k.strip_prefix("red.config.ec.") {
178 if let Some(val) = row.get_field("value") {
179 if rest.ends_with(".fields") {
180 let collection = rest.trim_end_matches(".fields");
181 if let crate::storage::schema::Value::Text(fields_str) = val {
182 let fields: Vec<String> = fields_str
183 .trim_matches(|c| c == '[' || c == ']')
184 .split(',')
185 .map(|s| {
186 s.trim()
187 .trim_matches('"')
188 .trim_matches('\'')
189 .to_string()
190 })
191 .filter(|s| !s.is_empty())
192 .collect();
193 ec_collections.insert(collection.to_string(), fields);
194 }
195 }
196 }
197 }
198 }
199 }
200 true
201 });
202
203 for (collection, fields) in ec_collections {
204 for field in fields {
205 let mut config = EcFieldConfig::new(&collection, &field);
206
207 let prefix = format!("red.config.ec.{}.{}", collection, field);
209 manager.for_each_entity(|entity| {
210 if let Some(row) = entity.data.as_row() {
211 let key = row.get_field("key").and_then(|v| match v {
212 crate::storage::schema::Value::Text(s) => Some(s.clone()),
213 _ => None,
214 });
215 let val = row.get_field("value");
216 if let (Some(k), Some(v)) = (key, val) {
217 if &*k == format!("{}.reducer", prefix).as_str() {
218 if let crate::storage::schema::Value::Text(s) = v {
219 config.reducer = EcReducer::from_str(s);
220 }
221 } else if &*k == format!("{}.mode", prefix).as_str() {
222 if let crate::storage::schema::Value::Text(s) = v {
223 config.mode = EcMode::from_str(s);
224 }
225 } else if &*k == format!("{}.interval_secs", prefix).as_str() {
226 if let crate::storage::schema::Value::Integer(n) = v {
227 config.consolidation_interval_secs = *n as u64;
228 }
229 }
230 }
231 }
232 true
233 });
234
235 self.register(config);
236 }
237 }
238 }
239}