Skip to main content

reddb_server/ec/
config.rs

1use std::collections::HashMap;
2use std::sync::RwLock;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum EcReducer {
6    Sum,
7    Max,
8    Min,
9    Count,
10    Average,
11    Last,
12}
13
14impl EcReducer {
15    pub fn from_str(s: &str) -> Self {
16        match s.to_lowercase().as_str() {
17            "max" => Self::Max,
18            "min" => Self::Min,
19            "count" => Self::Count,
20            "average" | "avg" => Self::Average,
21            "last" | "lww" => Self::Last,
22            _ => Self::Sum,
23        }
24    }
25
26    pub fn as_str(&self) -> &'static str {
27        match self {
28            Self::Sum => "sum",
29            Self::Max => "max",
30            Self::Min => "min",
31            Self::Count => "count",
32            Self::Average => "average",
33            Self::Last => "last",
34        }
35    }
36
37    pub fn apply(&self, current: f64, incoming: f64, count: u64) -> f64 {
38        match self {
39            Self::Sum => current + incoming,
40            Self::Max => current.max(incoming),
41            Self::Min => {
42                if current == 0.0 && count == 0 {
43                    incoming
44                } else {
45                    current.min(incoming)
46                }
47            }
48            Self::Count => current + 1.0,
49            Self::Average => {
50                if count == 0 {
51                    incoming
52                } else {
53                    (current * count as f64 + incoming) / (count as f64 + 1.0)
54                }
55            }
56            Self::Last => incoming,
57        }
58    }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum EcMode {
63    Sync,
64    Async,
65}
66
67impl EcMode {
68    pub fn from_str(s: &str) -> Self {
69        match s.to_lowercase().as_str() {
70            "sync" | "immediate" => Self::Sync,
71            _ => Self::Async,
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct EcFieldConfig {
78    pub collection: String,
79    pub field: String,
80    pub field_path: Option<String>,
81    pub reducer: EcReducer,
82    pub initial_value: f64,
83    pub mode: EcMode,
84    pub consolidation_interval_secs: u64,
85    pub consolidation_window_hours: u64,
86    pub retention_days: u64,
87}
88
89impl EcFieldConfig {
90    pub fn new(collection: &str, field: &str) -> Self {
91        Self {
92            collection: collection.to_string(),
93            field: field.to_string(),
94            field_path: None,
95            reducer: EcReducer::Sum,
96            initial_value: 0.0,
97            mode: EcMode::Async,
98            consolidation_interval_secs: 60,
99            consolidation_window_hours: 24,
100            retention_days: 7,
101        }
102    }
103
104    pub fn tx_collection_name(&self) -> String {
105        format!("red_ec_tx_{}_{}", self.collection, self.field)
106    }
107}
108
109pub struct EcRegistry {
110    fields: RwLock<HashMap<(String, String), EcFieldConfig>>,
111}
112
113impl EcRegistry {
114    pub fn new() -> Self {
115        Self {
116            fields: RwLock::new(HashMap::new()),
117        }
118    }
119
120    pub fn register(&self, config: EcFieldConfig) {
121        let key = (config.collection.clone(), config.field.clone());
122        self.fields
123            .write()
124            .unwrap_or_else(|e| e.into_inner())
125            .insert(key, config);
126    }
127
128    pub fn get(&self, collection: &str, field: &str) -> Option<EcFieldConfig> {
129        self.fields
130            .read()
131            .unwrap_or_else(|e| e.into_inner())
132            .get(&(collection.to_string(), field.to_string()))
133            .cloned()
134    }
135
136    pub fn is_ec_field(&self, collection: &str, field: &str) -> bool {
137        self.fields
138            .read()
139            .unwrap_or_else(|e| e.into_inner())
140            .contains_key(&(collection.to_string(), field.to_string()))
141    }
142
143    pub fn all_configs(&self) -> Vec<EcFieldConfig> {
144        self.fields
145            .read()
146            .unwrap_or_else(|e| e.into_inner())
147            .values()
148            .cloned()
149            .collect()
150    }
151
152    pub fn async_configs(&self) -> Vec<EcFieldConfig> {
153        self.fields
154            .read()
155            .unwrap_or_else(|e| e.into_inner())
156            .values()
157            .filter(|c| c.mode == EcMode::Async)
158            .cloned()
159            .collect()
160    }
161
162    pub fn load_from_config_store(&self, store: &crate::storage::unified::store::UnifiedStore) {
163        let manager = match store.get_collection("red_config") {
164            Some(m) => m,
165            None => return,
166        };
167
168        let mut ec_collections: HashMap<String, Vec<String>> = HashMap::new();
169
170        manager.for_each_entity(|entity| {
171            if let Some(row) = entity.data.as_row() {
172                let key = row.get_field("key").and_then(|v| match v {
173                    crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
174                    _ => None,
175                });
176                if let Some(k) = key {
177                    if let Some(rest) = k.strip_prefix("red.config.ec.") {
178                        if let Some(val) = row.get_field("value") {
179                            if rest.ends_with(".fields") {
180                                let collection = rest.trim_end_matches(".fields");
181                                if let crate::storage::schema::Value::Text(fields_str) = val {
182                                    let fields: Vec<String> = fields_str
183                                        .trim_matches(|c| c == '[' || c == ']')
184                                        .split(',')
185                                        .map(|s| {
186                                            s.trim()
187                                                .trim_matches('"')
188                                                .trim_matches('\'')
189                                                .to_string()
190                                        })
191                                        .filter(|s| !s.is_empty())
192                                        .collect();
193                                    ec_collections.insert(collection.to_string(), fields);
194                                }
195                            }
196                        }
197                    }
198                }
199            }
200            true
201        });
202
203        for (collection, fields) in ec_collections {
204            for field in fields {
205                let mut config = EcFieldConfig::new(&collection, &field);
206
207                // Load per-field overrides from red_config
208                let prefix = format!("red.config.ec.{}.{}", collection, field);
209                manager.for_each_entity(|entity| {
210                    if let Some(row) = entity.data.as_row() {
211                        let key = row.get_field("key").and_then(|v| match v {
212                            crate::storage::schema::Value::Text(s) => Some(s.clone()),
213                            _ => None,
214                        });
215                        let val = row.get_field("value");
216                        if let (Some(k), Some(v)) = (key, val) {
217                            if &*k == format!("{}.reducer", prefix).as_str() {
218                                if let crate::storage::schema::Value::Text(s) = v {
219                                    config.reducer = EcReducer::from_str(s);
220                                }
221                            } else if &*k == format!("{}.mode", prefix).as_str() {
222                                if let crate::storage::schema::Value::Text(s) = v {
223                                    config.mode = EcMode::from_str(s);
224                                }
225                            } else if &*k == format!("{}.interval_secs", prefix).as_str() {
226                                if let crate::storage::schema::Value::Integer(n) = v {
227                                    config.consolidation_interval_secs = *n as u64;
228                                }
229                            }
230                        }
231                    }
232                    true
233                });
234
235                self.register(config);
236            }
237        }
238    }
239}