use crate::ast::Document;
use crate::execution::{ExecutionContext, ExecutionResult, QueryExecutor};
use crate::types::Schema;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock};
use tokio::time::interval;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveQueryConfig {
pub enabled: bool,
pub throttle_interval_ms: u64,
pub batch_interval_ms: u64,
pub max_queries_per_connection: usize,
pub enable_diffing: bool,
pub max_diff_size: usize,
pub cleanup_interval_secs: u64,
pub query_timeout_secs: u64,
}
impl Default for LiveQueryConfig {
fn default() -> Self {
Self {
enabled: true,
throttle_interval_ms: 100,
batch_interval_ms: 50,
max_queries_per_connection: 100,
enable_diffing: true,
max_diff_size: 10000,
cleanup_interval_secs: 300,
query_timeout_secs: 3600,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TriplePattern {
pub subject: Option<String>,
pub predicate: Option<String>,
pub object: Option<String>,
}
impl TriplePattern {
pub fn matches(&self, subject: &str, predicate: &str, object: &str) -> bool {
let subject_matches = self.subject.as_ref().map_or(true, |s| s == subject);
let predicate_matches = self.predicate.as_ref().map_or(true, |p| p == predicate);
let object_matches = self.object.as_ref().map_or(true, |o| o == object);
subject_matches && predicate_matches && object_matches
}
pub fn wildcard() -> Self {
Self {
subject: None,
predicate: None,
object: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryDependencies {
pub exact_triples: HashSet<(String, String, String)>,
pub patterns: HashSet<TriplePattern>,
pub graphs: HashSet<String>,
}
impl Default for QueryDependencies {
fn default() -> Self {
Self::new()
}
}
impl QueryDependencies {
pub fn new() -> Self {
Self {
exact_triples: HashSet::new(),
patterns: HashSet::new(),
graphs: HashSet::new(),
}
}
pub fn is_affected_by(&self, subject: &str, predicate: &str, object: &str) -> bool {
if self.exact_triples.contains(&(
subject.to_string(),
predicate.to_string(),
object.to_string(),
)) {
return true;
}
for pattern in &self.patterns {
if pattern.matches(subject, predicate, object) {
return true;
}
}
false
}
}
#[derive(Debug, Clone)]
pub struct LiveQuery {
pub id: String,
pub connection_id: String,
pub document: Document,
pub context: ExecutionContext,
pub dependencies: QueryDependencies,
pub last_executed: Instant,
pub last_result: Option<serde_json::Value>,
pub execution_count: u64,
pub pending_update: bool,
pub created_at: Instant,
}
impl LiveQuery {
pub fn new(
connection_id: String,
document: Document,
context: ExecutionContext,
dependencies: QueryDependencies,
) -> Self {
Self {
id: Uuid::new_v4().to_string(),
connection_id,
document,
context,
dependencies,
last_executed: Instant::now(),
last_result: None,
execution_count: 0,
pending_update: false,
created_at: Instant::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveQueryUpdate {
pub query_id: String,
pub update_type: UpdateType,
pub data: serde_json::Value,
pub timestamp: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpdateType {
Full,
Diff,
Error,
}
#[derive(Debug, Clone)]
pub struct RdfChange {
pub change_type: ChangeType,
pub subject: String,
pub predicate: String,
pub object: String,
pub graph: Option<String>,
pub timestamp: Instant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeType {
Insert,
Delete,
Update,
}
pub struct LiveQueryManager {
config: LiveQueryConfig,
queries: Arc<RwLock<HashMap<String, LiveQuery>>>,
query_senders: Arc<RwLock<HashMap<String, mpsc::Sender<LiveQueryUpdate>>>>,
change_queue: Arc<RwLock<Vec<RdfChange>>>,
executor: Arc<RwLock<Option<QueryExecutor>>>,
}
impl LiveQueryManager {
pub fn new(config: LiveQueryConfig) -> Self {
Self {
config,
queries: Arc::new(RwLock::new(HashMap::new())),
query_senders: Arc::new(RwLock::new(HashMap::new())),
change_queue: Arc::new(RwLock::new(Vec::new())),
executor: Arc::new(RwLock::new(None)),
}
}
pub fn with_schema(config: LiveQueryConfig, schema: Schema) -> Self {
Self {
config,
queries: Arc::new(RwLock::new(HashMap::new())),
query_senders: Arc::new(RwLock::new(HashMap::new())),
change_queue: Arc::new(RwLock::new(Vec::new())),
executor: Arc::new(RwLock::new(Some(QueryExecutor::new(schema)))),
}
}
pub async fn set_executor(&self, executor: QueryExecutor) {
let mut guard = self.executor.write().await;
*guard = Some(executor);
}
pub async fn set_schema(&self, schema: Schema) {
let mut guard = self.executor.write().await;
*guard = Some(QueryExecutor::new(schema));
}
pub async fn register_query(
&self,
connection_id: String,
document: Document,
context: ExecutionContext,
dependencies: QueryDependencies,
) -> Result<(String, mpsc::Receiver<LiveQueryUpdate>)> {
if !self.config.enabled {
return Err(anyhow!("Live queries are disabled"));
}
let queries = self.queries.read().await;
let connection_query_count = queries
.values()
.filter(|q| q.connection_id == connection_id)
.count();
if connection_query_count >= self.config.max_queries_per_connection {
return Err(anyhow!(
"Maximum live queries per connection exceeded: {}",
self.config.max_queries_per_connection
));
}
drop(queries);
let live_query = LiveQuery::new(connection_id, document, context, dependencies);
let query_id = live_query.id.clone();
let (tx, rx) = mpsc::channel(100);
self.queries
.write()
.await
.insert(query_id.clone(), live_query);
self.query_senders
.write()
.await
.insert(query_id.clone(), tx);
info!("Live query registered: {}", query_id);
Ok((query_id, rx))
}
pub async fn unregister_query(&self, query_id: &str) -> Result<()> {
self.queries.write().await.remove(query_id);
self.query_senders.write().await.remove(query_id);
info!("Live query unregistered: {}", query_id);
Ok(())
}
pub async fn notify_change(&self, change: RdfChange) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
self.change_queue.write().await.push(change);
Ok(())
}
pub async fn process_changes(&self) -> Result<usize> {
let changes = {
let mut queue = self.change_queue.write().await;
let changes = queue.drain(..).collect::<Vec<_>>();
changes
};
if changes.is_empty() {
return Ok(0);
}
debug!("Processing {} RDF changes", changes.len());
let mut affected_queries = HashSet::new();
let queries = self.queries.read().await;
for change in &changes {
for (query_id, query) in queries.iter() {
if query.dependencies.is_affected_by(
&change.subject,
&change.predicate,
&change.object,
) {
affected_queries.insert(query_id.clone());
}
}
}
drop(queries);
let mut queries_mut = self.queries.write().await;
for query_id in &affected_queries {
if let Some(query) = queries_mut.get_mut(query_id) {
query.pending_update = true;
}
}
Ok(affected_queries.len())
}
pub async fn execute_pending_updates(&self) -> Result<usize> {
let mut updated_count = 0;
let queries = self.queries.read().await;
let pending_query_ids: Vec<String> = queries
.iter()
.filter(|(_, q)| {
q.pending_update
&& q.last_executed.elapsed()
>= Duration::from_millis(self.config.throttle_interval_ms)
})
.map(|(id, _)| id.clone())
.collect();
drop(queries);
for query_id in pending_query_ids {
let queries = self.queries.read().await;
let query = match queries.get(&query_id) {
Some(q) => q.clone(),
None => continue,
};
drop(queries);
let new_result = match self.execute_query(&query).await {
Ok(result) => result,
Err(e) => {
warn!("Failed to execute live query {}: {}", query_id, e);
let query_senders = self.query_senders.read().await;
if let Some(sender) = query_senders.get(&query_id) {
let error_update = LiveQueryUpdate {
query_id: query_id.clone(),
update_type: UpdateType::Error,
data: serde_json::json!({
"errors": [{
"message": e.to_string()
}]
}),
timestamp: chrono::Utc::now().to_rfc3339(),
};
let _ = sender.send(error_update).await;
}
continue;
}
};
let (update_type, update_data) = if let Some(ref last_result) = query.last_result {
if let Some(diff) = self.compute_diff(last_result, &new_result) {
let diff_size = diff.to_string().len();
let full_size = new_result.to_string().len();
if diff_size < full_size / 2 {
(UpdateType::Diff, diff)
} else {
(UpdateType::Full, new_result.clone())
}
} else {
continue;
}
} else {
(UpdateType::Full, new_result.clone())
};
let query_senders = self.query_senders.read().await;
if let Some(sender) = query_senders.get(&query_id) {
let update = LiveQueryUpdate {
query_id: query_id.clone(),
update_type,
data: update_data,
timestamp: chrono::Utc::now().to_rfc3339(),
};
if sender.send(update).await.is_ok() {
updated_count += 1;
let mut queries_mut = self.queries.write().await;
if let Some(query) = queries_mut.get_mut(&query_id) {
query.pending_update = false;
query.last_executed = Instant::now();
query.execution_count += 1;
query.last_result = Some(new_result);
}
}
}
}
Ok(updated_count)
}
async fn execute_query(&self, query: &LiveQuery) -> Result<serde_json::Value> {
let executor_guard = self.executor.read().await;
if let Some(ref executor) = *executor_guard {
let result = executor.execute(&query.document, &query.context).await?;
let json_result = self.execution_result_to_json(&result);
debug!(
"Executed live query {}: success={}",
query.id,
!result.has_errors()
);
Ok(json_result)
} else {
debug!(
"No executor configured for live query {}, returning placeholder",
query.id
);
Ok(serde_json::json!({
"data": null,
"errors": [{
"message": "Query executor not configured"
}]
}))
}
}
fn execution_result_to_json(&self, result: &ExecutionResult) -> serde_json::Value {
let mut json = serde_json::json!({});
if let Some(ref data) = result.data {
json["data"] = data.clone();
} else {
json["data"] = serde_json::Value::Null;
}
if !result.errors.is_empty() {
let errors: Vec<serde_json::Value> = result
.errors
.iter()
.map(|e| {
serde_json::json!({
"message": e.message,
"path": e.path,
"locations": e.locations.iter().map(|l| {
serde_json::json!({
"line": l.line,
"column": l.column
})
}).collect::<Vec<_>>(),
"extensions": e.extensions
})
})
.collect();
json["errors"] = serde_json::json!(errors);
}
json
}
fn compute_diff(
&self,
old_result: &serde_json::Value,
new_result: &serde_json::Value,
) -> Option<serde_json::Value> {
if old_result == new_result {
return None;
}
if !self.config.enable_diffing {
return Some(new_result.clone());
}
let old_size = old_result.to_string().len();
let new_size = new_result.to_string().len();
if old_size > self.config.max_diff_size || new_size > self.config.max_diff_size {
return Some(new_result.clone());
}
self.compute_json_diff(old_result, new_result)
}
#[allow(clippy::only_used_in_recursion)]
fn compute_json_diff(
&self,
old: &serde_json::Value,
new: &serde_json::Value,
) -> Option<serde_json::Value> {
use serde_json::Value;
match (old, new) {
(Value::Object(old_obj), Value::Object(new_obj)) => {
let mut diff = serde_json::Map::new();
let mut has_changes = false;
for (key, new_val) in new_obj {
if let Some(old_val) = old_obj.get(key) {
if old_val != new_val {
if let Some(nested_diff) = self.compute_json_diff(old_val, new_val) {
diff.insert(key.clone(), nested_diff);
has_changes = true;
}
}
} else {
diff.insert(key.clone(), new_val.clone());
has_changes = true;
}
}
for key in old_obj.keys() {
if !new_obj.contains_key(key) {
diff.insert(key.clone(), Value::Null);
has_changes = true;
}
}
if has_changes {
Some(Value::Object(diff))
} else {
None
}
}
_ => {
if old != new {
Some(new.clone())
} else {
None
}
}
}
}
pub async fn cleanup_stale_queries(&self) -> Result<usize> {
let timeout = Duration::from_secs(self.config.query_timeout_secs);
let mut removed_count = 0;
let mut queries = self.queries.write().await;
let mut senders = self.query_senders.write().await;
let to_remove: Vec<String> = queries
.iter()
.filter(|(_, q)| q.last_executed.elapsed() > timeout)
.map(|(id, _)| id.clone())
.collect();
for query_id in &to_remove {
queries.remove(query_id);
senders.remove(query_id);
removed_count += 1;
}
if removed_count > 0 {
info!("Cleaned up {} stale live queries", removed_count);
}
Ok(removed_count)
}
pub async fn start_background_tasks(self: Arc<Self>) {
let manager = Arc::clone(&self);
tokio::spawn(async move {
let mut batch_interval =
interval(Duration::from_millis(manager.config.batch_interval_ms));
loop {
batch_interval.tick().await;
if let Err(e) = manager.process_changes().await {
warn!("Error processing changes: {}", e);
}
if let Err(e) = manager.execute_pending_updates().await {
warn!("Error executing updates: {}", e);
}
}
});
let manager = Arc::clone(&self);
tokio::spawn(async move {
let mut cleanup_interval =
interval(Duration::from_secs(manager.config.cleanup_interval_secs));
loop {
cleanup_interval.tick().await;
if let Err(e) = manager.cleanup_stale_queries().await {
warn!("Error cleaning up queries: {}", e);
}
}
});
}
pub async fn get_stats(&self) -> LiveQueryStats {
let queries = self.queries.read().await;
LiveQueryStats {
total_queries: queries.len(),
total_executions: queries.values().map(|q| q.execution_count).sum(),
pending_updates: queries.values().filter(|q| q.pending_update).count(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveQueryStats {
pub total_queries: usize,
pub total_executions: u64,
pub pending_updates: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_live_query_config_default() {
let config = LiveQueryConfig::default();
assert!(config.enabled);
assert_eq!(config.throttle_interval_ms, 100);
assert_eq!(config.batch_interval_ms, 50);
}
#[test]
fn test_triple_pattern_wildcard() {
let pattern = TriplePattern::wildcard();
assert!(pattern.matches("s", "p", "o"));
assert!(pattern.matches("x", "y", "z"));
}
#[test]
fn test_triple_pattern_subject_only() {
let pattern = TriplePattern {
subject: Some("http://example.org/s".to_string()),
predicate: None,
object: None,
};
assert!(pattern.matches("http://example.org/s", "p", "o"));
assert!(!pattern.matches("http://example.org/x", "p", "o"));
}
#[test]
fn test_triple_pattern_full_match() {
let pattern = TriplePattern {
subject: Some("s".to_string()),
predicate: Some("p".to_string()),
object: Some("o".to_string()),
};
assert!(pattern.matches("s", "p", "o"));
assert!(!pattern.matches("s", "p", "x"));
assert!(!pattern.matches("x", "p", "o"));
}
#[test]
fn test_query_dependencies_exact_triple() {
let mut deps = QueryDependencies::new();
deps.exact_triples
.insert(("s".to_string(), "p".to_string(), "o".to_string()));
assert!(deps.is_affected_by("s", "p", "o"));
assert!(!deps.is_affected_by("x", "p", "o"));
}
#[test]
fn test_query_dependencies_pattern() {
let mut deps = QueryDependencies::new();
deps.patterns.insert(TriplePattern {
subject: Some("s".to_string()),
predicate: None,
object: None,
});
assert!(deps.is_affected_by("s", "p1", "o1"));
assert!(deps.is_affected_by("s", "p2", "o2"));
assert!(!deps.is_affected_by("x", "p1", "o1"));
}
#[tokio::test]
async fn test_live_query_manager_creation() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let stats = manager.get_stats().await;
assert_eq!(stats.total_queries, 0);
}
#[tokio::test]
async fn test_live_query_registration() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let document = Document {
definitions: Vec::new(),
};
let context = ExecutionContext {
variables: HashMap::new(),
operation_name: None,
request_id: Uuid::new_v4().to_string(),
fragments: HashMap::new(),
};
let dependencies = QueryDependencies::new();
let result = manager
.register_query("conn-1".to_string(), document, context, dependencies)
.await;
assert!(result.is_ok());
let stats = manager.get_stats().await;
assert_eq!(stats.total_queries, 1);
}
#[tokio::test]
async fn test_live_query_limit() {
let config = LiveQueryConfig {
max_queries_per_connection: 2,
..Default::default()
};
let manager = LiveQueryManager::new(config);
let _ = manager
.register_query(
"conn-1".to_string(),
Document {
definitions: Vec::new(),
},
ExecutionContext {
variables: HashMap::new(),
operation_name: None,
request_id: Uuid::new_v4().to_string(),
fragments: HashMap::new(),
},
QueryDependencies::new(),
)
.await;
let _ = manager
.register_query(
"conn-1".to_string(),
Document {
definitions: Vec::new(),
},
ExecutionContext {
variables: HashMap::new(),
operation_name: None,
request_id: Uuid::new_v4().to_string(),
fragments: HashMap::new(),
},
QueryDependencies::new(),
)
.await;
let result = manager
.register_query(
"conn-1".to_string(),
Document {
definitions: Vec::new(),
},
ExecutionContext {
variables: HashMap::new(),
operation_name: None,
request_id: Uuid::new_v4().to_string(),
fragments: HashMap::new(),
},
QueryDependencies::new(),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_rdf_change_notification() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let change = RdfChange {
change_type: ChangeType::Insert,
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
graph: None,
timestamp: Instant::now(),
};
let result = manager.notify_change(change).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_change_processing() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let mut dependencies = QueryDependencies::new();
dependencies
.exact_triples
.insert(("s".to_string(), "p".to_string(), "o".to_string()));
let (_query_id, _rx) = manager
.register_query(
"conn-1".to_string(),
Document {
definitions: Vec::new(),
},
ExecutionContext {
variables: HashMap::new(),
operation_name: None,
request_id: Uuid::new_v4().to_string(),
fragments: HashMap::new(),
},
dependencies,
)
.await
.expect("should succeed");
let change = RdfChange {
change_type: ChangeType::Insert,
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
graph: None,
timestamp: Instant::now(),
};
manager.notify_change(change).await.expect("should succeed");
let affected = manager.process_changes().await.expect("should succeed");
assert_eq!(affected, 1);
}
#[test]
fn test_update_type() {
assert_eq!(UpdateType::Full, UpdateType::Full);
assert_ne!(UpdateType::Full, UpdateType::Diff);
}
#[test]
fn test_change_type() {
assert_eq!(ChangeType::Insert, ChangeType::Insert);
assert_ne!(ChangeType::Insert, ChangeType::Delete);
}
#[test]
fn test_live_query_manager_with_schema() {
use crate::types::Schema;
let config = LiveQueryConfig::default();
let schema = Schema::new();
let manager = LiveQueryManager::with_schema(config, schema);
assert!(manager.config.enabled);
}
#[test]
fn test_compute_json_diff_same_values() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": 1, "b": 2});
let new = serde_json::json!({"a": 1, "b": 2});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_none()); }
#[test]
fn test_compute_json_diff_changed_value() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": 1, "b": 2});
let new = serde_json::json!({"a": 1, "b": 3});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
let diff_value = diff.expect("should succeed");
assert_eq!(diff_value["b"], 3);
}
#[test]
fn test_compute_json_diff_added_key() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": 1});
let new = serde_json::json!({"a": 1, "b": 2});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
let diff_value = diff.expect("should succeed");
assert_eq!(diff_value["b"], 2);
}
#[test]
fn test_compute_json_diff_removed_key() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": 1, "b": 2});
let new = serde_json::json!({"a": 1});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
let diff_value = diff.expect("should succeed");
assert!(diff_value["b"].is_null());
}
#[test]
fn test_compute_json_diff_nested_change() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": {"b": 1}});
let new = serde_json::json!({"a": {"b": 2}});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
let diff_value = diff.expect("should succeed");
assert_eq!(diff_value["a"]["b"], 2);
}
#[test]
fn test_compute_json_diff_array_change() {
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"items": [1, 2, 3]});
let new = serde_json::json!({"items": [1, 2, 4]});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
}
#[test]
fn test_diffing_disabled() {
let config = LiveQueryConfig {
enable_diffing: false,
..Default::default()
};
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"a": 1});
let new = serde_json::json!({"a": 2});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
assert_eq!(diff.expect("should succeed"), new);
}
#[test]
fn test_max_diff_size() {
let config = LiveQueryConfig {
enable_diffing: true,
max_diff_size: 10, ..Default::default()
};
let manager = LiveQueryManager::new(config);
let old = serde_json::json!({"key": "old_value_that_is_quite_long"});
let new = serde_json::json!({"key": "new_value_that_is_also_long"});
let diff = manager.compute_diff(&old, &new);
assert!(diff.is_some());
assert_eq!(diff.expect("should succeed"), new);
}
#[tokio::test]
async fn test_set_schema() {
use crate::types::Schema;
let config = LiveQueryConfig::default();
let manager = LiveQueryManager::new(config);
let schema = Schema::new();
manager.set_schema(schema).await;
}
}