mod manager;
mod router;
pub mod session;
mod table_dependencies;
mod table_extract;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use tokio::sync::mpsc;
pub use manager::SubscriptionManager;
pub use router::{ChangeRouter, SubscriptionUpdate as RouterUpdate};
pub use session::{SessionSubscription, SessionSubscriptionId, SessionSubscriptionManager};
pub use table_dependencies::extract_table_dependencies;
pub use table_extract::extract_table_refs;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(u64);
impl SubscriptionId {
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl Default for SubscriptionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sub-{}", self.0)
}
}
#[derive(Debug)]
pub struct Subscription {
pub id: SubscriptionId,
pub query: String,
pub tables: HashSet<String>,
pub last_result_hash: u64,
pub notify_tx: mpsc::Sender<SubscriptionUpdate>,
}
impl Subscription {
pub fn new(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
notify_tx,
}
}
}
#[derive(Debug, Clone)]
pub enum SubscriptionUpdate {
Full {
rows: Vec<crate::Row>,
},
#[allow(dead_code)]
Delta {
inserts: Vec<crate::Row>,
updates: Vec<(crate::Row, crate::Row)>,
deletes: Vec<crate::Row>,
},
Error {
message: String,
},
}
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionError {
#[error("Failed to parse query: {0}")]
ParseError(String),
#[error("Query references unknown table: {0}")]
UnknownTable(String),
#[error("Subscription not found: {0}")]
NotFound(SubscriptionId),
#[error("Failed to send notification: channel closed")]
ChannelClosed,
}
pub fn hash_rows(rows: &[crate::Row]) -> u64 {
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
rows.len().hash(&mut hasher);
for row in rows {
for value in &row.values {
format!("{:?}", value).hash(&mut hasher);
}
}
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_id_uniqueness() {
let id1 = SubscriptionId::new();
let id2 = SubscriptionId::new();
let id3 = SubscriptionId::new();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
}
#[test]
fn test_subscription_id_display() {
let id = SubscriptionId(42);
assert_eq!(format!("{}", id), "sub-42");
}
#[test]
fn test_hash_rows_empty() {
let rows: Vec<crate::Row> = vec![];
let hash = hash_rows(&rows);
assert_eq!(hash, hash_rows(&[]));
}
#[test]
fn test_hash_rows_different_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("hello".to_string())],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar("hello".to_string())],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_ne!(hash1, hash2);
}
#[test]
fn test_hash_rows_same_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar("test".to_string())],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar("test".to_string())],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_eq!(hash1, hash2);
}
}