use std::collections::HashSet;
use tokio::sync::mpsc;
use super::config::{SubscriptionConfig, SubscriptionRetryPolicy};
use super::delta::PartialRowDelta;
use super::selective::SelectiveColumnConfig;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(u64);
impl SubscriptionId {
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl Default for SubscriptionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sub-{}", self.0)
}
}
#[derive(Debug, Clone, Default)]
pub struct SubscriptionMetrics {
pub subscription_id: Option<SubscriptionId>,
pub updates_sent: u64,
pub updates_dropped: u64,
pub channel_buffer_size: usize,
pub channel_capacity: usize,
pub slow_consumer_threshold_percent: u8,
}
#[derive(Debug)]
pub struct Subscription {
pub id: SubscriptionId,
pub query: String,
pub tables: HashSet<String>,
pub last_result_hash: u64,
pub last_result: Option<Vec<crate::Row>>,
pub notify_tx: mpsc::Sender<SubscriptionUpdate>,
pub retry_policy: SubscriptionRetryPolicy,
pub retry_count: u32,
pub updates_sent: u64,
pub updates_dropped: u64,
pub channel_buffer_size: usize,
pub slow_consumer_threshold_percent: u8,
pub connection_id: Option<String>,
pub wire_subscription_id: Option<[u8; 16]>,
pub filter: Option<String>,
pub pk_columns: Vec<usize>,
pub selective_eligible: bool,
pub selective_updates_override: Option<SelectiveColumnConfig>,
}
impl Subscription {
pub fn new(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
) -> Self {
Self::with_policy(query, tables, notify_tx, SubscriptionRetryPolicy::default())
}
pub fn with_policy(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
retry_policy: SubscriptionRetryPolicy,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
last_result: None,
notify_tx,
retry_policy,
retry_count: 0,
updates_sent: 0,
updates_dropped: 0,
channel_buffer_size: 64, slow_consumer_threshold_percent: 80,
connection_id: None,
wire_subscription_id: None,
filter: None,
pk_columns: vec![0], selective_eligible: false,
selective_updates_override: None,
}
}
pub fn with_config(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
config: &SubscriptionConfig,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
last_result: None,
notify_tx,
retry_policy: SubscriptionRetryPolicy::default(),
retry_count: 0,
updates_sent: 0,
updates_dropped: 0,
channel_buffer_size: config.channel_buffer_size,
slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
connection_id: None,
wire_subscription_id: None,
filter: None,
pk_columns: vec![0], selective_eligible: false,
selective_updates_override: None,
}
}
pub fn for_connection(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
connection_id: String,
wire_subscription_id: [u8; 16],
filter: Option<String>,
config: &SubscriptionConfig,
) -> Self {
Self::for_connection_with_pk(
query,
tables,
notify_tx,
connection_id,
wire_subscription_id,
filter,
config,
vec![0], )
}
#[allow(clippy::too_many_arguments)]
pub fn for_connection_with_pk(
query: String,
tables: HashSet<String>,
notify_tx: mpsc::Sender<SubscriptionUpdate>,
connection_id: String,
wire_subscription_id: [u8; 16],
filter: Option<String>,
config: &SubscriptionConfig,
pk_columns: Vec<usize>,
) -> Self {
Self {
id: SubscriptionId::new(),
query,
tables,
last_result_hash: 0,
last_result: None,
notify_tx,
retry_policy: SubscriptionRetryPolicy::default(),
retry_count: 0,
updates_sent: 0,
updates_dropped: 0,
channel_buffer_size: config.channel_buffer_size,
slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
connection_id: Some(connection_id),
wire_subscription_id: Some(wire_subscription_id),
filter,
pk_columns,
selective_eligible: false,
selective_updates_override: None,
}
}
pub fn set_pk_columns(&mut self, pk_columns: Vec<usize>) {
self.pk_columns = pk_columns;
}
pub fn set_pk_columns_with_eligibility(
&mut self,
pk_columns: Vec<usize>,
confident: bool,
) -> bool {
self.pk_columns = pk_columns;
let was_eligible = self.selective_eligible;
self.selective_eligible = confident;
!was_eligible && confident
}
pub fn set_selective_updates_override(&mut self, config: SelectiveColumnConfig) {
self.selective_updates_override = Some(config);
}
pub fn clear_selective_updates_override(&mut self) {
self.selective_updates_override = None;
}
pub fn get_effective_selective_config(
&self,
server_config: &SelectiveColumnConfig,
) -> SelectiveColumnConfig {
match &self.selective_updates_override {
Some(override_config) => {
override_config.with_pk_columns(self.pk_columns.clone())
}
None => {
server_config.with_pk_columns(self.pk_columns.clone())
}
}
}
}
#[derive(Debug, Clone)]
pub enum SubscriptionUpdate {
Full {
subscription_id: SubscriptionId,
rows: Vec<crate::Row>,
},
Delta {
subscription_id: SubscriptionId,
inserts: Vec<crate::Row>,
updates: Vec<(crate::Row, crate::Row)>,
deletes: Vec<crate::Row>,
},
Error {
subscription_id: SubscriptionId,
message: String,
},
Partial {
subscription_id: SubscriptionId,
updates: Vec<PartialRowDelta>,
},
}
impl SubscriptionUpdate {
pub fn subscription_id(&self) -> SubscriptionId {
match self {
SubscriptionUpdate::Full { subscription_id, .. } => *subscription_id,
SubscriptionUpdate::Delta { subscription_id, .. } => *subscription_id,
SubscriptionUpdate::Error { subscription_id, .. } => *subscription_id,
SubscriptionUpdate::Partial { subscription_id, .. } => *subscription_id,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionError {
#[error("Failed to parse query: {0}")]
ParseError(String),
#[error("Query references unknown table: {0}")]
UnknownTable(String),
#[error("Subscription not found: {0}")]
NotFound(SubscriptionId),
#[error("Failed to send notification: channel closed")]
ChannelClosed,
#[error("Connection limit exceeded: {current} subscriptions (max: {max})")]
ConnectionLimitExceeded {
current: usize,
max: usize,
},
#[error("Global limit exceeded: {current} subscriptions (max: {max})")]
GlobalLimitExceeded {
current: usize,
max: usize,
},
#[error("Result set too large: {rows} rows (max: {max})")]
ResultSetTooLarge {
rows: usize,
max: usize,
},
#[error("Rate limited: retry after {retry_after_ms}ms")]
RateLimited {
retry_after_ms: u64,
},
}