1use crate::cache::EntityCache;
7use serde_json::Value;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, PartialEq)]
14pub enum ViewEffect {
15 NoEffect,
17 Add { key: String },
19 Remove { key: String },
21 Update { key: String },
23 Replace { old_key: String, new_key: String },
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum SortOrder {
30 Asc,
31 Desc,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum CompareOp {
37 Eq,
38 Ne,
39 Gt,
40 Gte,
41 Lt,
42 Lte,
43}
44
45#[derive(Debug)]
47pub struct MaterializedView {
48 pub id: String,
50 pub source_id: String,
52 current_keys: Arc<RwLock<HashSet<String>>>,
54 pipeline: ViewPipeline,
56}
57
58#[derive(Debug, Clone, Default)]
59pub struct ViewPipeline {
60 pub filter: Option<FilterConfig>,
62 pub sort: Option<SortConfig>,
64 pub limit: Option<usize>,
66}
67
68#[derive(Debug, Clone)]
69pub struct FilterConfig {
70 pub field_path: Vec<String>,
71 pub op: CompareOp,
72 pub value: Value,
73}
74
75#[derive(Debug, Clone)]
76pub struct SortConfig {
77 pub field_path: Vec<String>,
78 pub order: SortOrder,
79}
80
81impl MaterializedView {
82 pub fn new(id: String, source_id: String, pipeline: ViewPipeline) -> Self {
84 Self {
85 id,
86 source_id,
87 current_keys: Arc::new(RwLock::new(HashSet::new())),
88 pipeline,
89 }
90 }
91
92 pub async fn get_keys(&self) -> HashSet<String> {
94 self.current_keys.read().await.clone()
95 }
96
97 pub async fn evaluate_initial(&self, cache: &EntityCache) -> Vec<(String, Value)> {
99 let entities = cache.get_all(&self.source_id).await;
100 self.evaluate_pipeline(entities).await
101 }
102
103 async fn evaluate_pipeline(&self, mut entities: Vec<(String, Value)>) -> Vec<(String, Value)> {
105 if let Some(ref filter) = self.pipeline.filter {
107 entities.retain(|(_, v)| self.matches_filter(v, filter));
108 }
109
110 if let Some(ref sort) = self.pipeline.sort {
112 entities.sort_by(|(_, a), (_, b)| {
113 let a_val = extract_field(a, &sort.field_path);
114 let b_val = extract_field(b, &sort.field_path);
115 let cmp = compare_values(&a_val, &b_val);
116 match sort.order {
117 SortOrder::Asc => cmp,
118 SortOrder::Desc => cmp.reverse(),
119 }
120 });
121 }
122
123 if let Some(limit) = self.pipeline.limit {
125 entities.truncate(limit);
126 }
127
128 let keys: HashSet<String> = entities.iter().map(|(k, _)| k.clone()).collect();
130 *self.current_keys.write().await = keys;
131
132 entities
133 }
134
135 fn matches_filter(&self, entity: &Value, filter: &FilterConfig) -> bool {
137 let field_val = extract_field(entity, &filter.field_path);
138 match filter.op {
139 CompareOp::Eq => field_val == filter.value,
140 CompareOp::Ne => field_val != filter.value,
141 CompareOp::Gt => {
142 compare_values(&field_val, &filter.value) == std::cmp::Ordering::Greater
143 }
144 CompareOp::Gte => compare_values(&field_val, &filter.value) != std::cmp::Ordering::Less,
145 CompareOp::Lt => compare_values(&field_val, &filter.value) == std::cmp::Ordering::Less,
146 CompareOp::Lte => {
147 compare_values(&field_val, &filter.value) != std::cmp::Ordering::Greater
148 }
149 }
150 }
151
152 pub async fn compute_effect(
154 &self,
155 key: &str,
156 new_value: Option<&Value>,
157 _cache: &EntityCache,
158 ) -> ViewEffect {
159 let current_keys = self.current_keys.read().await;
160 let was_in_view = current_keys.contains(key);
161 drop(current_keys);
162
163 let matches_now = match new_value {
165 Some(v) => {
166 if let Some(ref filter) = self.pipeline.filter {
167 self.matches_filter(v, filter)
168 } else {
169 true
170 }
171 }
172 None => false, };
174
175 match (was_in_view, matches_now) {
176 (false, true) => {
177 if self.pipeline.limit == Some(1) {
178 let current_keys = self.current_keys.read().await;
179 if let Some(current_key) = current_keys.iter().next() {
180 if current_key != key {
181 return ViewEffect::Replace {
182 old_key: current_key.clone(),
183 new_key: key.to_string(),
184 };
185 }
186 }
187 }
188 ViewEffect::Add {
189 key: key.to_string(),
190 }
191 }
192 (true, false) => ViewEffect::Remove {
193 key: key.to_string(),
194 },
195 (true, true) => ViewEffect::Update {
196 key: key.to_string(),
197 },
198 (false, false) => ViewEffect::NoEffect,
199 }
200 }
201
202 pub async fn apply_effect(&self, effect: &ViewEffect) {
204 let mut keys = self.current_keys.write().await;
205 match effect {
206 ViewEffect::Add { key } => {
207 keys.insert(key.clone());
208 }
209 ViewEffect::Remove { key } => {
210 keys.remove(key);
211 }
212 ViewEffect::Replace { old_key, new_key } => {
213 keys.remove(old_key);
214 keys.insert(new_key.clone());
215 }
216 ViewEffect::Update { .. } | ViewEffect::NoEffect => {}
217 }
218 }
219}
220
221fn extract_field(value: &Value, path: &[String]) -> Value {
223 let mut current = value;
224 for segment in path {
225 match current.get(segment) {
226 Some(v) => current = v,
227 None => return Value::Null,
228 }
229 }
230 current.clone()
231}
232
233fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
235 match (a, b) {
236 (Value::Number(a), Value::Number(b)) => {
237 let a_f = a.as_f64().unwrap_or(0.0);
238 let b_f = b.as_f64().unwrap_or(0.0);
239 a_f.partial_cmp(&b_f).unwrap_or(std::cmp::Ordering::Equal)
240 }
241 (Value::String(a), Value::String(b)) => a.cmp(b),
242 (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
243 _ => std::cmp::Ordering::Equal,
244 }
245}
246
247#[derive(Default)]
249pub struct MaterializedViewRegistry {
250 views: HashMap<String, Arc<MaterializedView>>,
251 dependencies: HashMap<String, Vec<String>>,
253}
254
255impl MaterializedViewRegistry {
256 pub fn new() -> Self {
257 Self::default()
258 }
259
260 pub fn register(&mut self, view: MaterializedView) {
262 let view_id = view.id.clone();
263 let source_id = view.source_id.clone();
264
265 self.dependencies
266 .entry(source_id)
267 .or_default()
268 .push(view_id.clone());
269
270 self.views.insert(view_id, Arc::new(view));
271 }
272
273 pub fn get(&self, id: &str) -> Option<Arc<MaterializedView>> {
275 self.views.get(id).cloned()
276 }
277
278 pub fn get_dependents(&self, source_id: &str) -> Vec<Arc<MaterializedView>> {
280 self.dependencies
281 .get(source_id)
282 .map(|ids| {
283 ids.iter()
284 .filter_map(|id| self.views.get(id).cloned())
285 .collect()
286 })
287 .unwrap_or_default()
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use serde_json::json;
295
296 #[tokio::test]
297 async fn test_filter_evaluation() {
298 let pipeline = ViewPipeline {
299 filter: Some(FilterConfig {
300 field_path: vec!["status".to_string()],
301 op: CompareOp::Eq,
302 value: json!("active"),
303 }),
304 sort: None,
305 limit: None,
306 };
307
308 let view =
309 MaterializedView::new("test/active".to_string(), "test/list".to_string(), pipeline);
310
311 let entities = vec![
312 ("1".to_string(), json!({"status": "active", "value": 10})),
313 ("2".to_string(), json!({"status": "inactive", "value": 20})),
314 ("3".to_string(), json!({"status": "active", "value": 30})),
315 ];
316
317 let result = view.evaluate_pipeline(entities).await;
318 assert_eq!(result.len(), 2);
319 assert_eq!(result[0].0, "1");
320 assert_eq!(result[1].0, "3");
321 }
322
323 #[tokio::test]
324 async fn test_sort_and_limit() {
325 let pipeline = ViewPipeline {
326 filter: None,
327 sort: Some(SortConfig {
328 field_path: vec!["value".to_string()],
329 order: SortOrder::Desc,
330 }),
331 limit: Some(2),
332 };
333
334 let view =
335 MaterializedView::new("test/top2".to_string(), "test/list".to_string(), pipeline);
336
337 let entities = vec![
338 ("1".to_string(), json!({"value": 10})),
339 ("2".to_string(), json!({"value": 30})),
340 ("3".to_string(), json!({"value": 20})),
341 ];
342
343 let result = view.evaluate_pipeline(entities).await;
344 assert_eq!(result.len(), 2);
345 assert_eq!(result[0].0, "2"); assert_eq!(result[1].0, "3"); }
348}