pub struct SubscriptionManager { /* private fields */ }Expand description
Manager for query subscriptions
Tracks all active subscriptions, indexes them by table dependencies, and handles notifications when data changes.
§Thread Safety
The manager uses DashMap for lock-free concurrent access to subscriptions.
Multiple threads can subscribe, unsubscribe, and process change events
concurrently without explicit locking.
§Performance
The manager uses a table-based index to quickly find subscriptions affected by a change event. This allows O(1) lookup of subscriptions by table name, rather than scanning all subscriptions.
§Connection Tracking
The manager supports tracking subscriptions by connection/session ID. This enables efficient cleanup when a connection closes, and allows both HTTP SSE and wire protocol clients to use the same subscription infrastructure.
Implementations§
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub fn find_affected_subscriptions(
&self,
table_name: &str,
) -> Vec<SubscriptionId>
pub fn find_affected_subscriptions( &self, table_name: &str, ) -> Vec<SubscriptionId>
Find all subscriptions affected by a change to a given table
This is the core lookup operation for fanout during change handling. Uses the table index for O(1) lookup of the subscription ID set.
§Arguments
table_name- The table that changed
§Returns
Vector of subscription IDs that depend on this table
Sourcepub async fn handle_change(&self, event: ChangeEvent, db: &Database)
pub async fn handle_change(&self, event: ChangeEvent, db: &Database)
Handle a change event from the storage layer
Finds all subscriptions affected by the change and checks if their results have changed. Sends notifications for changed results.
§Arguments
event- The change event to process (from storage layer)db- Database to re-execute queries against
Sourcepub async fn run_event_loop(
&self,
change_rx: ChangeEventReceiver,
db: Arc<Database>,
)
pub async fn run_event_loop( &self, change_rx: ChangeEventReceiver, db: Arc<Database>, )
Run the subscription manager event loop
Listens for change events from the storage layer and processes them. This method runs indefinitely until the change channel is closed.
§Arguments
db- Database reference for re-executing subscription queries
§Note
This method should be spawned as a tokio task at server startup using tokio::spawn.
It will poll the change receiver and handle events until closed.
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub fn subscribe(
&self,
query: String,
notify_tx: Sender<SubscriptionUpdate>,
) -> Result<SubscriptionId, SubscriptionError>
pub fn subscribe( &self, query: String, notify_tx: Sender<SubscriptionUpdate>, ) -> Result<SubscriptionId, SubscriptionError>
Create a new subscription for a query
Parses the query to extract table dependencies and registers the subscription for notifications.
§Arguments
query- SQL query to monitornotify_tx- Channel to send updates to the subscriber
§Returns
The subscription ID on success, or an error if parsing fails or limits exceeded
§Errors
ParseErrorif the query cannot be parsed or references no tablesGlobalLimitExceededif the global subscription limit is reached
§Example
let manager = SubscriptionManager::new();
let (tx, mut rx) = mpsc::channel(16);
let id = manager.subscribe("SELECT * FROM users".to_string(), tx)?;
println!("Subscribed with ID: {}", id);Sourcepub fn subscribe_for_connection(
&self,
query: String,
notify_tx: Sender<SubscriptionUpdate>,
connection_id: String,
wire_subscription_id: [u8; 16],
table_dependencies: HashSet<String>,
filter: Option<String>,
) -> Result<SubscriptionId, SubscriptionError>
pub fn subscribe_for_connection( &self, query: String, notify_tx: Sender<SubscriptionUpdate>, connection_id: String, wire_subscription_id: [u8; 16], table_dependencies: HashSet<String>, filter: Option<String>, ) -> Result<SubscriptionId, SubscriptionError>
Create a new subscription for a specific connection (wire protocol)
This is the primary method for wire protocol subscriptions. It:
- Checks both global and per-connection limits
- Associates the subscription with a connection ID for cleanup
- Stores the wire protocol UUID for lookup
§Arguments
query- SQL query to monitornotify_tx- Channel to send updates to the subscriberconnection_id- The connection/session ID that owns this subscriptionwire_subscription_id- The wire protocol UUID for this subscriptiontable_dependencies- Pre-extracted table dependencies (from AST parsing)
§Returns
The subscription ID on success, or an error if limits exceeded
§Errors
GlobalLimitExceededif the global subscription limit is reachedConnectionLimitExceededif the per-connection limit is reached
Sourcepub fn unsubscribe(&self, id: SubscriptionId) -> bool
pub fn unsubscribe(&self, id: SubscriptionId) -> bool
Sourcepub fn unsubscribe_by_wire_id(&self, wire_id: &[u8; 16]) -> bool
pub fn unsubscribe_by_wire_id(&self, wire_id: &[u8; 16]) -> bool
Remove a subscription by its wire protocol ID
This is used by wire protocol clients that use UUID-based subscription IDs.
§Arguments
wire_id- The wire protocol subscription ID (UUID bytes)
§Returns
true if the removed subscription was selective-eligible, false otherwise.
Returns false if the subscription was not found.
Sourcepub fn unsubscribe_all_for_connection(
&self,
connection_id: &str,
) -> (usize, usize)
pub fn unsubscribe_all_for_connection( &self, connection_id: &str, ) -> (usize, usize)
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub fn subscription_count(&self) -> usize
pub fn subscription_count(&self) -> usize
Get the number of active subscriptions
Sourcepub fn watched_tables(&self) -> Vec<(String, usize)>
pub fn watched_tables(&self) -> Vec<(String, usize)>
Get the tables being watched and their subscription counts
Sourcepub fn config(&self) -> &SubscriptionConfig
pub fn config(&self) -> &SubscriptionConfig
Get the current configuration
Sourcepub fn limit_exceeded_count(&self) -> usize
pub fn limit_exceeded_count(&self) -> usize
Get the number of times the global limit was exceeded (for metrics)
Sourcepub fn result_set_exceeded_count(&self) -> usize
pub fn result_set_exceeded_count(&self) -> usize
Get the number of times a result set was too large (for metrics)
Sourcepub fn get_subscription_metrics(
&self,
id: SubscriptionId,
) -> Option<SubscriptionMetrics>
pub fn get_subscription_metrics( &self, id: SubscriptionId, ) -> Option<SubscriptionMetrics>
Get metrics for a specific subscription
Returns metrics including updates sent, dropped, and channel health. Returns None if the subscription doesn’t exist.
Sourcepub fn get_all_metrics(&self) -> Vec<SubscriptionMetrics>
pub fn get_all_metrics(&self) -> Vec<SubscriptionMetrics>
Get metrics for all active subscriptions
Returns a vector of metrics for all subscriptions, useful for monitoring and alerting on subscription health.
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub async fn send_initial_results(
&self,
id: SubscriptionId,
db: &Database,
) -> Result<(), SubscriptionError>
pub async fn send_initial_results( &self, id: SubscriptionId, db: &Database, ) -> Result<(), SubscriptionError>
Send initial results to a new subscriber
Executes the query and sends the initial results. This should be called right after subscribing to provide immediate data. The initial results are always sent as a Full update.
§Errors
NotFoundif the subscription doesn’t existParseErrorif the query fails to executeResultSetTooLargeif the result set exceeds the configured limitChannelClosedif the notification channel is closed
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub fn find_subscription_by_wire_id(
&self,
wire_id: &[u8; 16],
) -> Option<SubscriptionId>
pub fn find_subscription_by_wire_id( &self, wire_id: &[u8; 16], ) -> Option<SubscriptionId>
Sourcepub fn connection_subscription_count(&self, connection_id: &str) -> usize
pub fn connection_subscription_count(&self, connection_id: &str) -> usize
Sourcepub fn get_affected_subscriptions_for_wire_protocol(
&self,
table: &str,
) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>)>
pub fn get_affected_subscriptions_for_wire_protocol( &self, table: &str, ) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>)>
Get affected subscription details for wire protocol
This method finds all subscriptions that depend on the given table and returns their details needed for wire protocol notifications.
§Arguments
table- The table name to find affected subscriptions for
§Returns
Vector of (wire_subscription_id, query, last_result_hash, last_result) for each affected subscription that has a wire ID.
Sourcepub fn get_affected_subscriptions_for_connection(
&self,
table: &str,
connection_id: &str,
) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)>
pub fn get_affected_subscriptions_for_connection( &self, table: &str, connection_id: &str, ) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)>
Get affected subscription details for wire protocol filtered by connection
This method finds subscriptions for a specific connection that depend on the given table and returns their details needed for wire protocol notifications.
§Arguments
table- The table name to find affected subscriptions forconnection_id- The connection ID to filter by
§Returns
Vector of (wire_subscription_id, query, last_result_hash, last_result, filter) for each affected subscription that belongs to the specified connection.
Sourcepub fn update_result_by_wire_id(
&self,
wire_id: &[u8; 16],
result_hash: u64,
result: Vec<Row>,
)
pub fn update_result_by_wire_id( &self, wire_id: &[u8; 16], result_hash: u64, result: Vec<Row>, )
Update the stored result for a subscription by wire ID
This is used by wire protocol to store results for delta computation.
§Arguments
wire_id- The wire protocol subscription ID (UUID bytes)result_hash- Hash of the new result setresult- The new result set
Sourcepub fn update_pk_columns_by_wire_id(
&self,
wire_id: &[u8; 16],
pk_columns: Vec<usize>,
)
pub fn update_pk_columns_by_wire_id( &self, wire_id: &[u8; 16], pk_columns: Vec<usize>, )
Update the primary key columns for a subscription by wire ID
This is called after PK detection to set the actual PK column indices for selective column updates.
Sourcepub fn update_pk_columns_with_eligibility_by_wire_id(
&self,
wire_id: &[u8; 16],
pk_columns: Vec<usize>,
confident: bool,
) -> bool
pub fn update_pk_columns_with_eligibility_by_wire_id( &self, wire_id: &[u8; 16], pk_columns: Vec<usize>, confident: bool, ) -> bool
Update PK columns with eligibility tracking
Sets the PK columns and marks whether the subscription is eligible for selective column updates (based on confident PK detection).
Returns true if the subscription is newly marked as selective-eligible.
Sourcepub fn update_pk_columns_with_eligibility(
&self,
id: SubscriptionId,
pk_columns: Vec<usize>,
confident: bool,
) -> bool
pub fn update_pk_columns_with_eligibility( &self, id: SubscriptionId, pk_columns: Vec<usize>, confident: bool, ) -> bool
Update PK columns with eligibility tracking by internal subscription ID
Sets the PK columns and marks whether the subscription is eligible for selective column updates (based on confident PK detection). This variant is used for HTTP SSE subscriptions that don’t have wire IDs.
Returns true if the subscription is newly marked as selective-eligible.
Sourcepub fn update_selective_updates(
&self,
id: SubscriptionId,
config: SelectiveColumnConfig,
)
pub fn update_selective_updates( &self, id: SubscriptionId, config: SelectiveColumnConfig, )
Update selective updates configuration for a subscription
Allows per-subscription overrides of the server-level selective updates config.
§Arguments
id- The subscription IDconfig- The new selective updates configuration (Some to set, None to clear)
Sourcepub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize>
pub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize>
Get the primary key columns for a subscription by wire ID
Returns the PK column indices, or default [0] if not found.
Sourcepub fn set_selective_updates_override_by_wire_id(
&self,
wire_id: &[u8; 16],
config: SelectiveColumnConfig,
)
pub fn set_selective_updates_override_by_wire_id( &self, wire_id: &[u8; 16], config: SelectiveColumnConfig, )
Set per-subscription selective updates configuration override by wire ID
Allows clients to override server-level selective update thresholds on a per-subscription basis via the wire protocol.
Sourcepub fn get_effective_selective_config_by_wire_id(
&self,
wire_id: &[u8; 16],
server_config: &SelectiveColumnConfig,
) -> SelectiveColumnConfig
pub fn get_effective_selective_config_by_wire_id( &self, wire_id: &[u8; 16], server_config: &SelectiveColumnConfig, ) -> SelectiveColumnConfig
Get the effective selective column config for a subscription by wire ID
Returns the per-subscription override config if set, otherwise creates a config using the server-level settings with subscription-specific pk_columns.
Source§impl SubscriptionManager
impl SubscriptionManager
Sourcepub fn with_config(config: SubscriptionConfig) -> Self
pub fn with_config(config: SubscriptionConfig) -> Self
Create a new subscription manager with custom configuration
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for SubscriptionManager
impl !RefUnwindSafe for SubscriptionManager
impl Send for SubscriptionManager
impl Sync for SubscriptionManager
impl Unpin for SubscriptionManager
impl UnsafeUnpin for SubscriptionManager
impl UnwindSafe for SubscriptionManager
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
impl<A, B, T> HttpServerConnExec<A, B> for Twhere
B: Body,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request