pub mod error;
mod manager;
mod router;
pub mod session;
mod table_dependencies;
mod table_extract;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
pub use error::{classify_error, classify_error_str, SubscriptionErrorKind};
pub use manager::SubscriptionManager;
pub use router::{ChangeRouter, SubscriptionUpdate as RouterUpdate};
pub use session::{SessionSubscription, SessionSubscriptionId, SessionSubscriptionManager};
pub use table_dependencies::extract_table_dependencies;
pub use table_extract::extract_table_refs;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionConfig {
#[serde(default = "default_max_per_connection")]
pub max_per_connection: usize,
#[serde(default = "default_max_global")]
pub max_global: usize,
#[serde(default = "default_max_result_rows")]
pub max_result_rows: usize,
#[serde(default = "default_rate_limit_per_second")]
pub rate_limit_per_second: u32,
#[serde(default = "default_channel_buffer_size")]
pub channel_buffer_size: usize,
#[serde(default = "default_slow_consumer_threshold_percent")]
pub slow_consumer_threshold_percent: u8,
}
fn default_max_per_connection() -> usize {
100
}
fn default_max_global() -> usize {
10_000
}
fn default_max_result_rows() -> usize {
10_000
}
fn default_rate_limit_per_second() -> u32 {
10
}
fn default_channel_buffer_size() -> usize {
64
}
fn default_slow_consumer_threshold_percent() -> u8 {
80
}
impl Default for SubscriptionConfig {
fn default() -> Self {
Self {
max_per_connection: default_max_per_connection(),
max_global: default_max_global(),
max_result_rows: default_max_result_rows(),
rate_limit_per_second: default_rate_limit_per_second(),
channel_buffer_size: default_channel_buffer_size(),
slow_consumer_threshold_percent: default_slow_consumer_threshold_percent(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SubscriptionMetrics {
pub subscription_id: Option<SubscriptionId>,
pub updates_sent: u64,
pub updates_dropped: u64,
pub channel_buffer_size: usize,
pub channel_capacity: usize,
pub slow_consumer_threshold_percent: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(u64);
impl SubscriptionId {
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl Default for SubscriptionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sub-{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SubscriptionRetryPolicy {
pub max_retries: u32,
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_multiplier: f64,
}
impl Default for SubscriptionRetryPolicy {
fn default() -> Self {
Self { max_retries: 3, base_delay_ms: 1000, max_delay_ms: 30000, backoff_multiplier: 2.0 }
}
}
impl SubscriptionRetryPolicy {
fn calculate_backoff(&self, attempt: u32) -> Duration {
let backoff_ms = self.base_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
let capped_ms = backoff_ms.min(self.max_delay_ms as f64);
Duration::from_millis(capped_ms as u64)
}
}
#[derive(Debug)]
pub struct Subscription {
pub id: SubscriptionId,
pub query: String,
pub tables: HashSet<String>,
pub last_result_hash: u64,
pub last_result: Option<Vec<crate::Row>>,
pub notify_tx: mpsc::Sender<SubscriptionUpdate>,
pub retry_policy: SubscriptionRetryPolicy,
pub retry_count: u32,
pub updates_sent: u64,
pub updates_dropped: u64,
pub channel_buffer_size: usize,
pub slow_consumer_threshold_percent: u8,
}
impl Subscription {
pub fn new(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
) -> Self {
Self::with_policy(query, tables, notify_tx, SubscriptionRetryPolicy::default())
}
pub fn with_policy(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
retry_policy: SubscriptionRetryPolicy,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
last_result: None,
notify_tx,
retry_policy,
retry_count: 0,
updates_sent: 0,
updates_dropped: 0,
channel_buffer_size: 64, slow_consumer_threshold_percent: 80,
}
}
pub fn with_config(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
config: &SubscriptionConfig,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
last_result: None,
notify_tx,
retry_policy: SubscriptionRetryPolicy::default(),
retry_count: 0,
updates_sent: 0,
updates_dropped: 0,
channel_buffer_size: config.channel_buffer_size,
slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
}
}
}
#[derive(Debug, Clone)]
pub enum SubscriptionUpdate {
Full {
rows: Vec<crate::Row>,
},
Delta {
inserts: Vec<crate::Row>,
updates: Vec<(crate::Row, crate::Row)>,
deletes: Vec<crate::Row>,
},
Error {
message: String,
},
}
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionError {
#[error("Failed to parse query: {0}")]
ParseError(String),
#[error("Query references unknown table: {0}")]
UnknownTable(String),
#[error("Subscription not found: {0}")]
NotFound(SubscriptionId),
#[error("Failed to send notification: channel closed")]
ChannelClosed,
#[error("Connection limit exceeded: {current} subscriptions (max: {max})")]
ConnectionLimitExceeded {
current: usize,
max: usize,
},
#[error("Global limit exceeded: {current} subscriptions (max: {max})")]
GlobalLimitExceeded {
current: usize,
max: usize,
},
#[error("Result set too large: {rows} rows (max: {max})")]
ResultSetTooLarge {
rows: usize,
max: usize,
},
#[error("Rate limited: retry after {retry_after_ms}ms")]
RateLimited {
retry_after_ms: u64,
},
}
pub fn hash_rows(rows: &[crate::Row]) -> u64 {
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
rows.len().hash(&mut hasher);
for row in rows {
for value in &row.values {
format!("{:?}", value).hash(&mut hasher);
}
}
hasher.finish()
}
fn hash_row(row: &crate::Row) -> u64 {
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
for value in &row.values {
value.hash(&mut hasher);
}
hasher.finish()
}
pub fn compute_delta(old: &[crate::Row], new: &[crate::Row]) -> Option<SubscriptionUpdate> {
use std::collections::HashMap;
let mut old_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
for row in old {
let hash = hash_row(row);
old_map.entry(hash).or_default().push(row);
}
let mut new_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
for row in new {
let hash = hash_row(row);
new_map.entry(hash).or_default().push(row);
}
let mut inserts = Vec::new();
let mut deletes = Vec::new();
for (hash, new_rows) in &new_map {
let old_rows = old_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
if new_rows.len() > old_rows.len() {
for row in new_rows.iter().skip(old_rows.len()) {
inserts.push((*row).clone());
}
}
}
for (hash, old_rows) in &old_map {
let new_rows = new_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
if old_rows.len() > new_rows.len() {
for row in old_rows.iter().skip(new_rows.len()) {
deletes.push((*row).clone());
}
}
}
if inserts.is_empty() && deletes.is_empty() {
return None;
}
let updates = Vec::new();
Some(SubscriptionUpdate::Delta { inserts, updates, deletes })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_id_uniqueness() {
let id1 = SubscriptionId::new();
let id2 = SubscriptionId::new();
let id3 = SubscriptionId::new();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
}
#[test]
fn test_subscription_id_display() {
let id = SubscriptionId(42);
assert_eq!(format!("{}", id), "sub-42");
}
#[test]
fn test_hash_rows_empty() {
let rows: Vec<crate::Row> = vec![];
let hash = hash_rows(&rows);
assert_eq!(hash, hash_rows(&[]));
}
#[test]
fn test_hash_rows_different_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("hello".to_string())],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar("hello".to_string())],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_ne!(hash1, hash2);
}
#[test]
fn test_hash_rows_same_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar("test".to_string())],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar("test".to_string())],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_eq!(hash1, hash2);
}
#[test]
fn test_compute_delta_no_changes() {
use vibesql_types::SqlValue;
let rows = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())] },
];
let delta = compute_delta(&rows, &rows);
assert!(delta.is_none());
}
#[test]
fn test_compute_delta_single_insert() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
}];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())] },
];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert_eq!(inserts.len(), 1);
assert_eq!(inserts[0].values[0], SqlValue::Integer(2));
assert_eq!(inserts[0].values[1], SqlValue::Varchar("Bob".to_string()));
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_single_delete() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())] },
];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
}];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert!(inserts.is_empty());
assert!(updates.is_empty());
assert_eq!(deletes.len(), 1);
assert_eq!(deletes[0].values[0], SqlValue::Integer(2));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_insert_and_delete() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())],
}];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert_eq!(inserts.len(), 1);
assert_eq!(deletes.len(), 1);
assert!(updates.is_empty());
assert_eq!(inserts[0].values[0], SqlValue::Integer(2));
assert_eq!(deletes[0].values[0], SqlValue::Integer(1));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_empty_to_rows() {
use vibesql_types::SqlValue;
let old: Vec<crate::Row> = vec![];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())] },
];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert_eq!(inserts.len(), 2);
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_rows_to_empty() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar("Alice".to_string())],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar("Bob".to_string())] },
];
let new: Vec<crate::Row> = vec![];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert!(inserts.is_empty());
assert!(updates.is_empty());
assert_eq!(deletes.len(), 2);
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_duplicate_rows() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
];
let new = vec![
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
];
let delta = compute_delta(&old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes } => {
assert_eq!(inserts.len(), 1);
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
}