use chrono::{DateTime, Utc};
use uuid::Uuid;
use super::readset::ReadSet;
use super::session::SessionId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub Uuid);
impl SubscriptionId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn from_uuid(id: Uuid) -> Self {
Self(id)
}
pub fn as_uuid(&self) -> Uuid {
self.0
}
}
impl Default for SubscriptionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionState<T> {
pub loading: bool,
pub data: Option<T>,
pub error: Option<String>,
pub stale: bool,
}
impl<T> Default for SubscriptionState<T> {
fn default() -> Self {
Self {
loading: true,
data: None,
error: None,
stale: false,
}
}
}
impl<T> SubscriptionState<T> {
pub fn loading() -> Self {
Self::default()
}
pub fn with_data(data: T) -> Self {
Self {
loading: false,
data: Some(data),
error: None,
stale: false,
}
}
pub fn with_error(error: impl Into<String>) -> Self {
Self {
loading: false,
data: None,
error: Some(error.into()),
stale: false,
}
}
pub fn mark_stale(&mut self) {
self.stale = true;
}
pub fn clear_stale(&mut self) {
self.stale = false;
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionInfo {
pub id: SubscriptionId,
pub session_id: SessionId,
pub query_name: String,
pub args: serde_json::Value,
pub query_hash: String,
pub read_set: ReadSet,
pub last_result_hash: Option<String>,
pub created_at: DateTime<Utc>,
pub last_executed_at: Option<DateTime<Utc>>,
pub execution_count: u64,
pub memory_bytes: usize,
}
impl SubscriptionInfo {
pub fn new(
session_id: SessionId,
query_name: impl Into<String>,
args: serde_json::Value,
) -> Self {
let query_name = query_name.into();
let query_hash = compute_query_hash(&query_name, &args);
Self {
id: SubscriptionId::new(),
session_id,
query_name,
args,
query_hash,
read_set: ReadSet::new(),
last_result_hash: None,
created_at: Utc::now(),
last_executed_at: None,
execution_count: 0,
memory_bytes: 0,
}
}
pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
self.read_set = read_set;
self.memory_bytes = self.read_set.memory_bytes() + self.query_name.len() + 128;
self.last_result_hash = Some(result_hash);
self.last_executed_at = Some(Utc::now());
self.execution_count += 1;
}
pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
change.invalidates(&self.read_set)
}
}
fn compute_query_hash(query_name: &str, args: &serde_json::Value) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query_name.hash(&mut hasher);
args.to_string().hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Delta<T> {
pub added: Vec<T>,
pub removed: Vec<String>,
pub updated: Vec<T>,
}
impl<T> Default for Delta<T> {
fn default() -> Self {
Self {
added: Vec::new(),
removed: Vec::new(),
updated: Vec::new(),
}
}
}
impl<T> Delta<T> {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
}
pub fn change_count(&self) -> usize {
self.added.len() + self.removed.len() + self.updated.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_id_generation() {
let id1 = SubscriptionId::new();
let id2 = SubscriptionId::new();
assert_ne!(id1, id2);
}
#[test]
fn test_subscription_state_default() {
let state: SubscriptionState<String> = SubscriptionState::default();
assert!(state.loading);
assert!(state.data.is_none());
assert!(state.error.is_none());
assert!(!state.stale);
}
#[test]
fn test_subscription_state_with_data() {
let state = SubscriptionState::with_data(vec![1, 2, 3]);
assert!(!state.loading);
assert_eq!(state.data, Some(vec![1, 2, 3]));
assert!(state.error.is_none());
}
#[test]
fn test_subscription_info_creation() {
let session_id = SessionId::new();
let info = SubscriptionInfo::new(
session_id,
"get_projects",
serde_json::json!({"userId": "abc"}),
);
assert_eq!(info.query_name, "get_projects");
assert_eq!(info.execution_count, 0);
assert!(!info.query_hash.is_empty());
}
#[test]
fn test_query_hash_consistency() {
let hash1 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
let hash2 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
let hash3 = compute_query_hash("get_projects", &serde_json::json!({"userId": "xyz"}));
assert_eq!(hash1, hash2);
assert_ne!(hash1, hash3);
}
#[test]
fn test_delta_empty() {
let delta: Delta<String> = Delta::empty();
assert!(delta.is_empty());
assert_eq!(delta.change_count(), 0);
}
#[test]
fn test_delta_with_changes() {
let delta = Delta {
added: vec!["a".to_string()],
removed: vec!["b".to_string()],
updated: vec!["c".to_string()],
};
assert!(!delta.is_empty());
assert_eq!(delta.change_count(), 3);
}
}