use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use tracing::{info, span, Level};
use crate::annotations::TripleAnnotation;
use crate::StarResult;
type AnnotationFilter = Arc<dyn Fn(&TripleAnnotation) -> bool + Send + Sync>;
#[derive(Clone)]
pub struct ViewDefinition {
pub name: String,
pub description: Option<String>,
filter: Option<AnnotationFilter>,
projection: Option<Vec<String>>,
aggregation: Option<AggregationType>,
pub refresh_strategy: RefreshStrategy,
pub dependencies: Vec<String>,
}
impl std::fmt::Debug for ViewDefinition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ViewDefinition")
.field("name", &self.name)
.field("description", &self.description)
.field("has_filter", &self.filter.is_some())
.field("projection", &self.projection)
.field("aggregation", &self.aggregation)
.field("refresh_strategy", &self.refresh_strategy)
.field("dependencies", &self.dependencies)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AggregationType {
Count,
AverageConfidence,
SumQuality,
MinTrust,
MaxTrust,
GroupBySource,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RefreshStrategy {
Immediate,
Deferred,
Periodic(u64),
}
impl ViewDefinition {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
description: None,
filter: None,
projection: None,
aggregation: None,
refresh_strategy: RefreshStrategy::Immediate,
dependencies: Vec::new(),
}
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = Some(desc.into());
self
}
pub fn with_filter<F>(mut self, filter: F) -> Self
where
F: Fn(&TripleAnnotation) -> bool + Send + Sync + 'static,
{
self.filter = Some(Arc::new(filter));
self
}
pub fn with_refresh_strategy(mut self, strategy: RefreshStrategy) -> Self {
self.refresh_strategy = strategy;
self
}
pub fn with_aggregation(mut self, aggregation: AggregationType) -> Self {
self.aggregation = Some(aggregation);
self
}
pub fn with_dependencies(mut self, deps: Vec<String>) -> Self {
self.dependencies = deps;
self
}
}
#[derive(Debug, Clone)]
struct MaterializedView {
definition: ViewDefinition,
data: HashMap<u64, TripleAnnotation>,
aggregated_result: Option<AggregatedResult>,
last_refresh: std::time::Instant,
hit_count: usize,
miss_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AggregatedResult {
Count(usize),
Average(f64),
Sum(f64),
Min(f64),
Max(f64),
GroupBy(HashMap<String, usize>),
}
impl MaterializedView {
fn new(definition: ViewDefinition) -> Self {
Self {
definition,
data: HashMap::new(),
aggregated_result: None,
last_refresh: std::time::Instant::now(),
hit_count: 0,
miss_count: 0,
}
}
fn refresh(&mut self, all_annotations: &HashMap<u64, TripleAnnotation>) {
self.data.clear();
if let Some(ref filter) = self.definition.filter {
for (hash, annotation) in all_annotations {
if filter(annotation) {
self.data.insert(*hash, annotation.clone());
}
}
} else {
self.data = all_annotations.clone();
}
if let Some(agg_type) = self.definition.aggregation {
self.aggregated_result = Some(self.compute_aggregation(agg_type));
}
self.last_refresh = std::time::Instant::now();
}
fn compute_aggregation(&self, agg_type: AggregationType) -> AggregatedResult {
match agg_type {
AggregationType::Count => AggregatedResult::Count(self.data.len()),
AggregationType::AverageConfidence => {
let sum: f64 = self.data.values().filter_map(|ann| ann.confidence).sum();
let count = self
.data
.values()
.filter(|ann| ann.confidence.is_some())
.count();
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
AggregatedResult::Average(avg)
}
AggregationType::SumQuality => {
let sum: f64 = self.data.values().filter_map(|ann| ann.quality_score).sum();
AggregatedResult::Sum(sum)
}
AggregationType::MinTrust => {
let min = self
.data
.values()
.map(|ann| ann.trust_score())
.min_by(|a, b| a.partial_cmp(b).expect("f64 comparison"))
.unwrap_or(0.0);
AggregatedResult::Min(min)
}
AggregationType::MaxTrust => {
let max = self
.data
.values()
.map(|ann| ann.trust_score())
.max_by(|a, b| a.partial_cmp(b).expect("f64 comparison"))
.unwrap_or(1.0);
AggregatedResult::Max(max)
}
AggregationType::GroupBySource => {
let mut groups: HashMap<String, usize> = HashMap::new();
for annotation in self.data.values() {
if let Some(ref source) = annotation.source {
*groups.entry(source.clone()).or_insert(0) += 1;
}
}
AggregatedResult::GroupBy(groups)
}
}
}
}
pub struct MaterializedViewManager {
views: Arc<RwLock<HashMap<String, MaterializedView>>>,
source_data: Arc<RwLock<HashMap<u64, TripleAnnotation>>>,
dependency_graph: HashMap<String, HashSet<String>>,
stats: ViewStatistics,
}
#[derive(Debug, Clone, Default)]
pub struct ViewStatistics {
pub view_count: usize,
pub total_hits: usize,
pub total_misses: usize,
pub hit_rate: f64,
pub memory_bytes: usize,
}
impl MaterializedViewManager {
pub fn new() -> Self {
Self {
views: Arc::new(RwLock::new(HashMap::new())),
source_data: Arc::new(RwLock::new(HashMap::new())),
dependency_graph: HashMap::new(),
stats: ViewStatistics::default(),
}
}
pub fn create_view(&mut self, definition: ViewDefinition) -> StarResult<()> {
let span = span!(Level::INFO, "create_view");
let _enter = span.enter();
let name = definition.name.clone();
for dep in &definition.dependencies {
self.dependency_graph
.entry(dep.clone())
.or_default()
.insert(name.clone());
}
let mut view = MaterializedView::new(definition);
{
let source_data = self.source_data.read().unwrap_or_else(|e| e.into_inner());
view.refresh(&source_data);
}
self.views
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(name.clone(), view);
info!("Created materialized view: {}", name);
self.update_statistics();
Ok(())
}
pub fn drop_view(&mut self, name: &str) -> StarResult<()> {
self.views
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(name);
self.dependency_graph.remove(name);
for deps in self.dependency_graph.values_mut() {
deps.remove(name);
}
self.update_statistics();
Ok(())
}
pub fn insert_annotation(
&mut self,
triple_hash: u64,
annotation: TripleAnnotation,
) -> StarResult<()> {
self.source_data
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(triple_hash, annotation);
self.refresh_immediate_views()?;
Ok(())
}
pub fn query_view(&self, view_name: &str) -> Option<Vec<TripleAnnotation>> {
let mut views = self.views.write().unwrap_or_else(|e| e.into_inner());
if let Some(view) = views.get_mut(view_name) {
view.hit_count += 1;
Some(view.data.values().cloned().collect())
} else {
None
}
}
pub fn get_aggregated_result(&self, view_name: &str) -> Option<AggregatedResult> {
let mut views = self.views.write().unwrap_or_else(|e| e.into_inner());
if let Some(view) = views.get_mut(view_name) {
view.hit_count += 1;
view.aggregated_result.clone()
} else {
None
}
}
pub fn refresh_view(&mut self, view_name: &str) -> StarResult<()> {
let source_data = self.source_data.read().unwrap_or_else(|e| e.into_inner());
let mut views = self.views.write().unwrap_or_else(|e| e.into_inner());
if let Some(view) = views.get_mut(view_name) {
view.refresh(&source_data);
if let Some(dependents) = self.dependency_graph.get(view_name) {
for dep_name in dependents {
if let Some(dep_view) = views.get_mut(dep_name) {
dep_view.refresh(&source_data);
}
}
}
}
Ok(())
}
pub fn refresh_all_views(&mut self) -> StarResult<()> {
let source_data = self.source_data.read().unwrap_or_else(|e| e.into_inner());
let mut views = self.views.write().unwrap_or_else(|e| e.into_inner());
for view in views.values_mut() {
view.refresh(&source_data);
}
Ok(())
}
fn refresh_immediate_views(&mut self) -> StarResult<()> {
let source_data = self.source_data.read().unwrap_or_else(|e| e.into_inner());
let mut views = self.views.write().unwrap_or_else(|e| e.into_inner());
for view in views.values_mut() {
if matches!(view.definition.refresh_strategy, RefreshStrategy::Immediate) {
view.refresh(&source_data);
}
}
Ok(())
}
pub fn statistics(&self) -> ViewStatistics {
self.stats.clone()
}
fn update_statistics(&mut self) {
let views = self.views.read().unwrap_or_else(|e| e.into_inner());
self.stats.view_count = views.len();
self.stats.total_hits = views.values().map(|v| v.hit_count).sum();
self.stats.total_misses = views.values().map(|v| v.miss_count).sum();
let total_requests = self.stats.total_hits + self.stats.total_misses;
self.stats.hit_rate = if total_requests > 0 {
self.stats.total_hits as f64 / total_requests as f64
} else {
0.0
};
self.stats.memory_bytes = views
.values()
.map(|v| v.data.len() * std::mem::size_of::<TripleAnnotation>())
.sum();
}
pub fn list_views(&self) -> Vec<String> {
self.views
.read()
.unwrap_or_else(|e| e.into_inner())
.keys()
.cloned()
.collect()
}
}
impl Default for MaterializedViewManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_view_manager_creation() {
let manager = MaterializedViewManager::new();
assert_eq!(manager.statistics().view_count, 0);
}
#[test]
fn test_create_view() {
let mut manager = MaterializedViewManager::new();
let view_def = ViewDefinition::new("test_view").with_description("Test view");
manager.create_view(view_def).unwrap();
assert_eq!(manager.statistics().view_count, 1);
assert!(manager.list_views().contains(&"test_view".to_string()));
}
#[test]
fn test_view_with_filter() {
let mut manager = MaterializedViewManager::new();
let view_def = ViewDefinition::new("high_confidence")
.with_filter(|ann| ann.confidence.is_some_and(|c| c > 0.8));
manager.create_view(view_def).unwrap();
let ann1 = TripleAnnotation::new().with_confidence(0.9);
let ann2 = TripleAnnotation::new().with_confidence(0.7);
manager.insert_annotation(1, ann1).unwrap();
manager.insert_annotation(2, ann2).unwrap();
let results = manager.query_view("high_confidence").unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].confidence, Some(0.9));
}
#[test]
fn test_aggregation_count() {
let mut manager = MaterializedViewManager::new();
let view_def = ViewDefinition::new("count_view").with_aggregation(AggregationType::Count);
manager.create_view(view_def).unwrap();
for i in 0..5 {
let ann = TripleAnnotation::new().with_confidence(0.8);
manager.insert_annotation(i, ann).unwrap();
}
if let Some(AggregatedResult::Count(count)) = manager.get_aggregated_result("count_view") {
assert_eq!(count, 5);
} else {
panic!("Expected count result");
}
}
#[test]
fn test_aggregation_average() {
let mut manager = MaterializedViewManager::new();
let view_def =
ViewDefinition::new("avg_view").with_aggregation(AggregationType::AverageConfidence);
manager.create_view(view_def).unwrap();
manager
.insert_annotation(1, TripleAnnotation::new().with_confidence(0.8))
.unwrap();
manager
.insert_annotation(2, TripleAnnotation::new().with_confidence(0.9))
.unwrap();
manager
.insert_annotation(3, TripleAnnotation::new().with_confidence(0.7))
.unwrap();
if let Some(AggregatedResult::Average(avg)) = manager.get_aggregated_result("avg_view") {
assert!((avg - 0.8).abs() < 0.01);
} else {
panic!("Expected average result");
}
}
#[test]
fn test_drop_view() {
let mut manager = MaterializedViewManager::new();
let view_def = ViewDefinition::new("temp_view");
manager.create_view(view_def).unwrap();
assert_eq!(manager.statistics().view_count, 1);
manager.drop_view("temp_view").unwrap();
assert_eq!(manager.statistics().view_count, 0);
}
#[test]
fn test_refresh_strategy() {
let mut manager = MaterializedViewManager::new();
let view_def = ViewDefinition::new("deferred_view")
.with_refresh_strategy(RefreshStrategy::Deferred)
.with_aggregation(AggregationType::Count);
manager.create_view(view_def).unwrap();
manager
.insert_annotation(1, TripleAnnotation::new().with_confidence(0.9))
.unwrap();
manager.refresh_view("deferred_view").unwrap();
if let Some(AggregatedResult::Count(count)) = manager.get_aggregated_result("deferred_view")
{
assert_eq!(count, 1);
}
}
}