use crate::cache::EntityCache;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq)]
pub enum ViewEffect {
NoEffect,
Add { key: String },
Remove { key: String },
Update { key: String },
Replace { old_key: String, new_key: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortOrder {
Asc,
Desc,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompareOp {
Eq,
Ne,
Gt,
Gte,
Lt,
Lte,
}
#[derive(Debug)]
pub struct MaterializedView {
pub id: String,
pub source_id: String,
current_keys: Arc<RwLock<HashSet<String>>>,
pipeline: ViewPipeline,
}
#[derive(Debug, Clone, Default)]
pub struct ViewPipeline {
pub filter: Option<FilterConfig>,
pub sort: Option<SortConfig>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct FilterConfig {
pub field_path: Vec<String>,
pub op: CompareOp,
pub value: Value,
}
#[derive(Debug, Clone)]
pub struct SortConfig {
pub field_path: Vec<String>,
pub order: SortOrder,
}
impl MaterializedView {
pub fn new(id: String, source_id: String, pipeline: ViewPipeline) -> Self {
Self {
id,
source_id,
current_keys: Arc::new(RwLock::new(HashSet::new())),
pipeline,
}
}
pub async fn get_keys(&self) -> HashSet<String> {
self.current_keys.read().await.clone()
}
pub async fn evaluate_initial(&self, cache: &EntityCache) -> Vec<(String, Value)> {
let entities = cache.get_all(&self.source_id).await;
self.evaluate_pipeline(entities).await
}
async fn evaluate_pipeline(&self, mut entities: Vec<(String, Value)>) -> Vec<(String, Value)> {
if let Some(ref filter) = self.pipeline.filter {
entities.retain(|(_, v)| self.matches_filter(v, filter));
}
if let Some(ref sort) = self.pipeline.sort {
entities.sort_by(|(_, a), (_, b)| {
let a_val = extract_field(a, &sort.field_path);
let b_val = extract_field(b, &sort.field_path);
let cmp = compare_values(&a_val, &b_val);
match sort.order {
SortOrder::Asc => cmp,
SortOrder::Desc => cmp.reverse(),
}
});
}
if let Some(limit) = self.pipeline.limit {
entities.truncate(limit);
}
let keys: HashSet<String> = entities.iter().map(|(k, _)| k.clone()).collect();
*self.current_keys.write().await = keys;
entities
}
fn matches_filter(&self, entity: &Value, filter: &FilterConfig) -> bool {
let field_val = extract_field(entity, &filter.field_path);
match filter.op {
CompareOp::Eq => field_val == filter.value,
CompareOp::Ne => field_val != filter.value,
CompareOp::Gt => {
compare_values(&field_val, &filter.value) == std::cmp::Ordering::Greater
}
CompareOp::Gte => compare_values(&field_val, &filter.value) != std::cmp::Ordering::Less,
CompareOp::Lt => compare_values(&field_val, &filter.value) == std::cmp::Ordering::Less,
CompareOp::Lte => {
compare_values(&field_val, &filter.value) != std::cmp::Ordering::Greater
}
}
}
pub async fn compute_effect(
&self,
key: &str,
new_value: Option<&Value>,
_cache: &EntityCache,
) -> ViewEffect {
let current_keys = self.current_keys.read().await;
let was_in_view = current_keys.contains(key);
drop(current_keys);
let matches_now = match new_value {
Some(v) => {
if let Some(ref filter) = self.pipeline.filter {
self.matches_filter(v, filter)
} else {
true
}
}
None => false, };
match (was_in_view, matches_now) {
(false, true) => {
if self.pipeline.limit == Some(1) {
let current_keys = self.current_keys.read().await;
if let Some(current_key) = current_keys.iter().next() {
if current_key != key {
return ViewEffect::Replace {
old_key: current_key.clone(),
new_key: key.to_string(),
};
}
}
}
ViewEffect::Add {
key: key.to_string(),
}
}
(true, false) => ViewEffect::Remove {
key: key.to_string(),
},
(true, true) => ViewEffect::Update {
key: key.to_string(),
},
(false, false) => ViewEffect::NoEffect,
}
}
pub async fn apply_effect(&self, effect: &ViewEffect) {
let mut keys = self.current_keys.write().await;
match effect {
ViewEffect::Add { key } => {
keys.insert(key.clone());
}
ViewEffect::Remove { key } => {
keys.remove(key);
}
ViewEffect::Replace { old_key, new_key } => {
keys.remove(old_key);
keys.insert(new_key.clone());
}
ViewEffect::Update { .. } | ViewEffect::NoEffect => {}
}
}
}
fn extract_field(value: &Value, path: &[String]) -> Value {
let mut current = value;
for segment in path {
match current.get(segment) {
Some(v) => current = v,
None => return Value::Null,
}
}
current.clone()
}
fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
match (a, b) {
(Value::Number(a), Value::Number(b)) => {
let a_f = a.as_f64().unwrap_or(0.0);
let b_f = b.as_f64().unwrap_or(0.0);
a_f.partial_cmp(&b_f).unwrap_or(std::cmp::Ordering::Equal)
}
(Value::String(a), Value::String(b)) => a.cmp(b),
(Value::Bool(a), Value::Bool(b)) => a.cmp(b),
_ => std::cmp::Ordering::Equal,
}
}
#[derive(Default)]
pub struct MaterializedViewRegistry {
views: HashMap<String, Arc<MaterializedView>>,
dependencies: HashMap<String, Vec<String>>,
}
impl MaterializedViewRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn register(&mut self, view: MaterializedView) {
let view_id = view.id.clone();
let source_id = view.source_id.clone();
self.dependencies
.entry(source_id)
.or_default()
.push(view_id.clone());
self.views.insert(view_id, Arc::new(view));
}
pub fn get(&self, id: &str) -> Option<Arc<MaterializedView>> {
self.views.get(id).cloned()
}
pub fn get_dependents(&self, source_id: &str) -> Vec<Arc<MaterializedView>> {
self.dependencies
.get(source_id)
.map(|ids| {
ids.iter()
.filter_map(|id| self.views.get(id).cloned())
.collect()
})
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn test_filter_evaluation() {
let pipeline = ViewPipeline {
filter: Some(FilterConfig {
field_path: vec!["status".to_string()],
op: CompareOp::Eq,
value: json!("active"),
}),
sort: None,
limit: None,
};
let view =
MaterializedView::new("test/active".to_string(), "test/list".to_string(), pipeline);
let entities = vec![
("1".to_string(), json!({"status": "active", "value": 10})),
("2".to_string(), json!({"status": "inactive", "value": 20})),
("3".to_string(), json!({"status": "active", "value": 30})),
];
let result = view.evaluate_pipeline(entities).await;
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "1");
assert_eq!(result[1].0, "3");
}
#[tokio::test]
async fn test_sort_and_limit() {
let pipeline = ViewPipeline {
filter: None,
sort: Some(SortConfig {
field_path: vec!["value".to_string()],
order: SortOrder::Desc,
}),
limit: Some(2),
};
let view =
MaterializedView::new("test/top2".to_string(), "test/list".to_string(), pipeline);
let entities = vec![
("1".to_string(), json!({"value": 10})),
("2".to_string(), json!({"value": 30})),
("3".to_string(), json!({"value": 20})),
];
let result = view.evaluate_pipeline(entities).await;
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "2"); assert_eq!(result[1].0, "3"); }
}