Skip to main content

SubscriptionManager

Struct SubscriptionManager 

Source
pub struct SubscriptionManager { /* private fields */ }
Expand description

Manager for query subscriptions

Tracks all active subscriptions, indexes them by table dependencies, and handles notifications when data changes.

§Thread Safety

The manager uses DashMap for lock-free concurrent access to subscriptions. Multiple threads can subscribe, unsubscribe, and process change events concurrently without explicit locking.

§Performance

The manager uses a table-based index to quickly find subscriptions affected by a change event. This allows O(1) lookup of subscriptions by table name, rather than scanning all subscriptions.

§Connection Tracking

The manager supports tracking subscriptions by connection/session ID. This enables efficient cleanup when a connection closes, and allows both HTTP SSE and wire protocol clients to use the same subscription infrastructure.

Implementations§

Source§

impl SubscriptionManager

Source

pub fn find_affected_subscriptions( &self, table_name: &str, ) -> Vec<SubscriptionId>

Find all subscriptions affected by a change to a given table

This is the core lookup operation for fanout during change handling. Uses the table index for O(1) lookup of the subscription ID set.

§Arguments
  • table_name - The table that changed
§Returns

Vector of subscription IDs that depend on this table

Source

pub async fn handle_change(&self, event: ChangeEvent, db: &Database)

Handle a change event from the storage layer

Finds all subscriptions affected by the change and checks if their results have changed. Sends notifications for changed results.

§Arguments
  • event - The change event to process (from storage layer)
  • db - Database to re-execute queries against
Source

pub async fn run_event_loop( &self, change_rx: ChangeEventReceiver, db: Arc<Database>, )

Run the subscription manager event loop

Listens for change events from the storage layer and processes them. This method runs indefinitely until the change channel is closed.

§Arguments
  • db - Database reference for re-executing subscription queries
§Note

This method should be spawned as a tokio task at server startup using tokio::spawn. It will poll the change receiver and handle events until closed.

Source§

impl SubscriptionManager

Source

pub fn subscribe( &self, query: String, notify_tx: Sender<SubscriptionUpdate>, ) -> Result<SubscriptionId, SubscriptionError>

Create a new subscription for a query

Parses the query to extract table dependencies and registers the subscription for notifications.

§Arguments
  • query - SQL query to monitor
  • notify_tx - Channel to send updates to the subscriber
§Returns

The subscription ID on success, or an error if parsing fails or limits exceeded

§Errors
  • ParseError if the query cannot be parsed or references no tables
  • GlobalLimitExceeded if the global subscription limit is reached
§Example
let manager = SubscriptionManager::new();
let (tx, mut rx) = mpsc::channel(16);

let id = manager.subscribe("SELECT * FROM users".to_string(), tx)?;
println!("Subscribed with ID: {}", id);
Source

pub fn subscribe_for_connection( &self, query: String, notify_tx: Sender<SubscriptionUpdate>, connection_id: String, wire_subscription_id: [u8; 16], table_dependencies: HashSet<String>, filter: Option<String>, ) -> Result<SubscriptionId, SubscriptionError>

Create a new subscription for a specific connection (wire protocol)

This is the primary method for wire protocol subscriptions. It:

  • Checks both global and per-connection limits
  • Associates the subscription with a connection ID for cleanup
  • Stores the wire protocol UUID for lookup
§Arguments
  • query - SQL query to monitor
  • notify_tx - Channel to send updates to the subscriber
  • connection_id - The connection/session ID that owns this subscription
  • wire_subscription_id - The wire protocol UUID for this subscription
  • table_dependencies - Pre-extracted table dependencies (from AST parsing)
§Returns

The subscription ID on success, or an error if limits exceeded

§Errors
  • GlobalLimitExceeded if the global subscription limit is reached
  • ConnectionLimitExceeded if the per-connection limit is reached
Source

pub fn unsubscribe(&self, id: SubscriptionId) -> bool

Remove a subscription

Unregisters the subscription and removes it from all indexes.

§Arguments
  • id - The subscription ID to remove
§Returns

true if the removed subscription was selective-eligible, false otherwise

Source

pub fn unsubscribe_by_wire_id(&self, wire_id: &[u8; 16]) -> bool

Remove a subscription by its wire protocol ID

This is used by wire protocol clients that use UUID-based subscription IDs.

§Arguments
  • wire_id - The wire protocol subscription ID (UUID bytes)
§Returns

true if the removed subscription was selective-eligible, false otherwise. Returns false if the subscription was not found.

Source

pub fn unsubscribe_all_for_connection( &self, connection_id: &str, ) -> (usize, usize)

Remove all subscriptions for a connection

This should be called when a connection closes to clean up all its subscriptions. This is important for wire protocol connections.

§Arguments
  • connection_id - The connection ID to clean up
§Returns

A tuple of (total_removed, selective_eligible_removed)

Source§

impl SubscriptionManager

Source

pub fn subscription_count(&self) -> usize

Get the number of active subscriptions

Source

pub fn watched_tables(&self) -> Vec<(String, usize)>

Get the tables being watched and their subscription counts

Source

pub fn config(&self) -> &SubscriptionConfig

Get the current configuration

Source

pub fn limit_exceeded_count(&self) -> usize

Get the number of times the global limit was exceeded (for metrics)

Source

pub fn result_set_exceeded_count(&self) -> usize

Get the number of times a result set was too large (for metrics)

Source

pub fn get_subscription_metrics( &self, id: SubscriptionId, ) -> Option<SubscriptionMetrics>

Get metrics for a specific subscription

Returns metrics including updates sent, dropped, and channel health. Returns None if the subscription doesn’t exist.

Source

pub fn get_all_metrics(&self) -> Vec<SubscriptionMetrics>

Get metrics for all active subscriptions

Returns a vector of metrics for all subscriptions, useful for monitoring and alerting on subscription health.

Source§

impl SubscriptionManager

Source

pub async fn send_initial_results( &self, id: SubscriptionId, db: &Database, ) -> Result<(), SubscriptionError>

Send initial results to a new subscriber

Executes the query and sends the initial results. This should be called right after subscribing to provide immediate data. The initial results are always sent as a Full update.

§Errors
  • NotFound if the subscription doesn’t exist
  • ParseError if the query fails to execute
  • ResultSetTooLarge if the result set exceeds the configured limit
  • ChannelClosed if the notification channel is closed
Source§

impl SubscriptionManager

Source

pub fn find_subscription_by_wire_id( &self, wire_id: &[u8; 16], ) -> Option<SubscriptionId>

Find a subscription by its wire protocol ID

§Arguments
  • wire_id - The wire protocol subscription ID (UUID bytes)
§Returns

The internal subscription ID if found

Source

pub fn connection_subscription_count(&self, connection_id: &str) -> usize

Get the subscription count for a specific connection

§Arguments
  • connection_id - The connection ID to check
§Returns

Number of subscriptions for this connection

Source

pub fn get_affected_subscriptions_for_wire_protocol( &self, table: &str, ) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>)>

Get affected subscription details for wire protocol

This method finds all subscriptions that depend on the given table and returns their details needed for wire protocol notifications.

§Arguments
  • table - The table name to find affected subscriptions for
§Returns

Vector of (wire_subscription_id, query, last_result_hash, last_result) for each affected subscription that has a wire ID.

Source

pub fn get_affected_subscriptions_for_connection( &self, table: &str, connection_id: &str, ) -> Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)>

Get affected subscription details for wire protocol filtered by connection

This method finds subscriptions for a specific connection that depend on the given table and returns their details needed for wire protocol notifications.

§Arguments
  • table - The table name to find affected subscriptions for
  • connection_id - The connection ID to filter by
§Returns

Vector of (wire_subscription_id, query, last_result_hash, last_result, filter) for each affected subscription that belongs to the specified connection.

Source

pub fn update_result_by_wire_id( &self, wire_id: &[u8; 16], result_hash: u64, result: Vec<Row>, )

Update the stored result for a subscription by wire ID

This is used by wire protocol to store results for delta computation.

§Arguments
  • wire_id - The wire protocol subscription ID (UUID bytes)
  • result_hash - Hash of the new result set
  • result - The new result set
Source

pub fn update_pk_columns_by_wire_id( &self, wire_id: &[u8; 16], pk_columns: Vec<usize>, )

Update the primary key columns for a subscription by wire ID

This is called after PK detection to set the actual PK column indices for selective column updates.

Source

pub fn update_pk_columns_with_eligibility_by_wire_id( &self, wire_id: &[u8; 16], pk_columns: Vec<usize>, confident: bool, ) -> bool

Update PK columns with eligibility tracking

Sets the PK columns and marks whether the subscription is eligible for selective column updates (based on confident PK detection).

Returns true if the subscription is newly marked as selective-eligible.

Source

pub fn update_pk_columns_with_eligibility( &self, id: SubscriptionId, pk_columns: Vec<usize>, confident: bool, ) -> bool

Update PK columns with eligibility tracking by internal subscription ID

Sets the PK columns and marks whether the subscription is eligible for selective column updates (based on confident PK detection). This variant is used for HTTP SSE subscriptions that don’t have wire IDs.

Returns true if the subscription is newly marked as selective-eligible.

Source

pub fn update_selective_updates( &self, id: SubscriptionId, config: SelectiveColumnConfig, )

Update selective updates configuration for a subscription

Allows per-subscription overrides of the server-level selective updates config.

§Arguments
  • id - The subscription ID
  • config - The new selective updates configuration (Some to set, None to clear)
Source

pub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize>

Get the primary key columns for a subscription by wire ID

Returns the PK column indices, or default [0] if not found.

Source

pub fn set_selective_updates_override_by_wire_id( &self, wire_id: &[u8; 16], config: SelectiveColumnConfig, )

Set per-subscription selective updates configuration override by wire ID

Allows clients to override server-level selective update thresholds on a per-subscription basis via the wire protocol.

Source

pub fn get_effective_selective_config_by_wire_id( &self, wire_id: &[u8; 16], server_config: &SelectiveColumnConfig, ) -> SelectiveColumnConfig

Get the effective selective column config for a subscription by wire ID

Returns the per-subscription override config if set, otherwise creates a config using the server-level settings with subscription-specific pk_columns.

Source§

impl SubscriptionManager

Source

pub fn new() -> Self

Create a new subscription manager with default configuration

Source

pub fn with_config(config: SubscriptionConfig) -> Self

Create a new subscription manager with custom configuration

Trait Implementations§

Source§

impl Default for SubscriptionManager

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> MaybeSend for T
where T: Send,

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<G1, G2> Within<G2> for G1
where G2: Contains<G1>,

Source§

fn is_within(&self, b: &G2) -> bool