use std::sync::Arc;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use super::readset::ReadSet;
use super::session::SessionId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub Uuid);
impl SubscriptionId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn from_uuid(id: Uuid) -> Self {
Self(id)
}
pub fn as_uuid(&self) -> Uuid {
self.0
}
}
impl Default for SubscriptionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct QueryGroupId(pub u32);
impl std::fmt::Display for QueryGroupId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "qg-{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriberId(pub u32);
impl std::fmt::Display for SubscriberId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sub-{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AuthScope {
pub principal_id: Option<String>,
pub tenant_id: Option<String>,
}
impl AuthScope {
pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
Self {
principal_id: auth.principal_id(),
tenant_id: auth
.claim("tenant_id")
.and_then(|v| v.as_str())
.map(ToString::to_string),
}
}
}
pub struct QueryGroup {
pub id: QueryGroupId,
pub query_name: String,
pub args: Arc<serde_json::Value>,
pub auth_scope: AuthScope,
pub auth_context: crate::function::AuthContext,
pub table_deps: &'static [&'static str],
pub selected_cols: &'static [&'static str],
pub read_set: ReadSet,
pub last_result_hash: Option<String>,
pub subscribers: Vec<SubscriberId>,
pub created_at: DateTime<Utc>,
pub execution_count: u64,
}
impl QueryGroup {
pub fn compute_lookup_key(
query_name: &str,
args: &serde_json::Value,
auth_scope: &AuthScope,
) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query_name.hash(&mut hasher);
Self::hash_json_canonical(args, &mut hasher);
auth_scope.hash(&mut hasher);
hasher.finish()
}
fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
use std::hash::Hash;
match value {
serde_json::Value::Object(map) => {
let mut keys: Vec<&String> = map.keys().collect();
keys.sort();
for key in keys {
key.hash(hasher);
Self::hash_json_canonical(&map[key], hasher);
}
}
other => other.to_string().hash(hasher),
}
}
pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
self.read_set = read_set;
self.last_result_hash = Some(result_hash);
self.execution_count += 1;
}
pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
if !change.invalidates(&self.read_set) {
return false;
}
if !change.invalidates_columns(self.selected_cols) {
return false;
}
true
}
}
pub struct Subscriber {
pub id: SubscriberId,
pub session_id: SessionId,
pub client_sub_id: String,
pub group_id: QueryGroupId,
pub subscription_id: SubscriptionId,
}
#[derive(Debug, Clone)]
pub struct SubscriptionState<T> {
pub loading: bool,
pub data: Option<T>,
pub error: Option<String>,
pub stale: bool,
}
impl<T> Default for SubscriptionState<T> {
fn default() -> Self {
Self {
loading: true,
data: None,
error: None,
stale: false,
}
}
}
impl<T> SubscriptionState<T> {
pub fn loading() -> Self {
Self::default()
}
pub fn with_data(data: T) -> Self {
Self {
loading: false,
data: Some(data),
error: None,
stale: false,
}
}
pub fn with_error(error: impl Into<String>) -> Self {
Self {
loading: false,
data: None,
error: Some(error.into()),
stale: false,
}
}
pub fn mark_stale(&mut self) {
self.stale = true;
}
pub fn clear_stale(&mut self) {
self.stale = false;
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Delta<T> {
pub added: Vec<T>,
pub removed: Vec<String>,
pub updated: Vec<T>,
}
impl<T> Default for Delta<T> {
fn default() -> Self {
Self {
added: Vec::new(),
removed: Vec::new(),
updated: Vec::new(),
}
}
}
impl<T> Delta<T> {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
}
pub fn change_count(&self) -> usize {
self.added.len() + self.removed.len() + self.updated.len()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use super::*;
#[test]
fn test_subscription_id_generation() {
let id1 = SubscriptionId::new();
let id2 = SubscriptionId::new();
assert_ne!(id1, id2);
}
#[test]
fn test_subscription_state_default() {
let state: SubscriptionState<String> = SubscriptionState::default();
assert!(state.loading);
assert!(state.data.is_none());
assert!(state.error.is_none());
assert!(!state.stale);
}
#[test]
fn test_subscription_state_with_data() {
let state = SubscriptionState::with_data(vec![1, 2, 3]);
assert!(!state.loading);
assert_eq!(state.data, Some(vec![1, 2, 3]));
assert!(state.error.is_none());
}
#[test]
fn test_query_group_lookup_key() {
let scope = AuthScope {
principal_id: Some("user-1".to_string()),
tenant_id: None,
};
let key1 = QueryGroup::compute_lookup_key(
"get_projects",
&serde_json::json!({"userId": "abc"}),
&scope,
);
let key2 = QueryGroup::compute_lookup_key(
"get_projects",
&serde_json::json!({"userId": "abc"}),
&scope,
);
assert_eq!(key1, key2);
let other_scope = AuthScope {
principal_id: Some("user-2".to_string()),
tenant_id: None,
};
let key3 = QueryGroup::compute_lookup_key(
"get_projects",
&serde_json::json!({"userId": "abc"}),
&other_scope,
);
assert_ne!(key1, key3);
}
#[test]
fn test_delta_empty() {
let delta: Delta<String> = Delta::empty();
assert!(delta.is_empty());
assert_eq!(delta.change_count(), 0);
}
#[test]
fn test_delta_with_changes() {
let delta = Delta {
added: vec!["a".to_string()],
removed: vec!["b".to_string()],
updated: vec!["c".to_string()],
};
assert!(!delta.is_empty());
assert_eq!(delta.change_count(), 3);
}
}