pub struct PgmqNotifyClient { /* private fields */ }Expand description
Unified PGMQ client with comprehensive functionality and notification capabilities
Implementations§
Source§impl PgmqClient
impl PgmqClient
Sourcepub async fn new(database_url: &str) -> Result<Self>
pub async fn new(database_url: &str) -> Result<Self>
Create new unified PGMQ client using connection string
Sourcepub async fn new_with_config(
database_url: &str,
config: PgmqNotifyConfig,
) -> Result<Self>
pub async fn new_with_config( database_url: &str, config: PgmqNotifyConfig, ) -> Result<Self>
Create new unified PGMQ client with custom configuration
Sourcepub async fn new_with_pool(pool: PgPool) -> Self
pub async fn new_with_pool(pool: PgPool) -> Self
Create new unified PGMQ client using existing connection pool (BYOP - Bring Your Own Pool)
Sourcepub async fn new_with_pool_and_config(
pool: PgPool,
config: PgmqNotifyConfig,
) -> Self
pub async fn new_with_pool_and_config( pool: PgPool, config: PgmqNotifyConfig, ) -> Self
Create new unified PGMQ client with existing pool and custom configuration
Sourcepub async fn create_queue(&self, queue_name: &str) -> Result<()>
pub async fn create_queue(&self, queue_name: &str) -> Result<()>
Create queue if it doesn’t exist
Sourcepub async fn send_json_message<T>(
&self,
queue_name: &str,
message: &T,
) -> Result<i64>where
T: Serialize,
pub async fn send_json_message<T>(
&self,
queue_name: &str,
message: &T,
) -> Result<i64>where
T: Serialize,
Send generic JSON message to queue
Sourcepub async fn send_message_with_delay<T>(
&self,
queue_name: &str,
message: &T,
delay_seconds: u64,
) -> Result<i64>where
T: Serialize,
pub async fn send_message_with_delay<T>(
&self,
queue_name: &str,
message: &T,
delay_seconds: u64,
) -> Result<i64>where
T: Serialize,
Send message with visibility timeout (delay)
Sourcepub async fn read_messages(
&self,
queue_name: &str,
visibility_timeout: Option<i32>,
limit: Option<i32>,
) -> Result<Vec<Message<Value>>>
pub async fn read_messages( &self, queue_name: &str, visibility_timeout: Option<i32>, limit: Option<i32>, ) -> Result<Vec<Message<Value>>>
Read messages from queue
Sourcepub async fn pop_message(
&self,
queue_name: &str,
) -> Result<Option<Message<Value>>>
pub async fn pop_message( &self, queue_name: &str, ) -> Result<Option<Message<Value>>>
Read messages from queue with pop (single read and delete)
Sourcepub async fn read_specific_message<T>(
&self,
queue_name: &str,
message_id: i64,
visibility_timeout: i32,
) -> Result<Option<Message<T>>>where
T: DeserializeOwned,
pub async fn read_specific_message<T>(
&self,
queue_name: &str,
message_id: i64,
visibility_timeout: i32,
) -> Result<Option<Message<T>>>where
T: DeserializeOwned,
Read a specific message by ID using custom SQL function (for notification event handling)
Sourcepub async fn delete_message(
&self,
queue_name: &str,
message_id: i64,
) -> Result<()>
pub async fn delete_message( &self, queue_name: &str, message_id: i64, ) -> Result<()>
Delete message from queue
Sourcepub async fn archive_message(
&self,
queue_name: &str,
message_id: i64,
) -> Result<()>
pub async fn archive_message( &self, queue_name: &str, message_id: i64, ) -> Result<()>
Archive message (move to archive)
Sourcepub async fn set_visibility_timeout(
&self,
queue_name: &str,
message_id: i64,
vt_seconds: i32,
) -> Result<()>
pub async fn set_visibility_timeout( &self, queue_name: &str, message_id: i64, vt_seconds: i32, ) -> Result<()>
Set visibility timeout for a message
Extends or resets the visibility timeout for a message. This is useful for:
- Heartbeat during long-running step processing
- Returning a message to the queue immediately (vt_seconds = 0)
§Arguments
queue_name- The queue containing the messagemessage_id- The message ID to updatevt_seconds- New visibility timeout in seconds from now
§Returns
Returns Ok(()) on success, or an error if the message doesn’t exist.
Sourcepub async fn purge_queue(&self, queue_name: &str) -> Result<u64>
pub async fn purge_queue(&self, queue_name: &str) -> Result<u64>
Purge queue (delete all messages)
Sourcepub async fn drop_queue(&self, queue_name: &str) -> Result<()>
pub async fn drop_queue(&self, queue_name: &str) -> Result<()>
Drop queue completely
Sourcepub async fn queue_metrics(&self, queue_name: &str) -> Result<QueueMetrics>
pub async fn queue_metrics(&self, queue_name: &str) -> Result<QueueMetrics>
Get queue metrics/statistics
Sourcepub fn pool(&self) -> &PgPool
pub fn pool(&self) -> &PgPool
Get reference to underlying connection pool for advanced operations
Sourcepub fn config(&self) -> &PgmqNotifyConfig
pub fn config(&self) -> &PgmqNotifyConfig
Get the configuration
Sourcepub fn has_notify_capabilities(&self) -> bool
pub fn has_notify_capabilities(&self) -> bool
Check if this client has notification capabilities enabled
Sourcepub async fn health_check(&self) -> Result<bool>
pub async fn health_check(&self) -> Result<bool>
Health check - verify database connectivity
Sourcepub async fn get_client_status(&self) -> Result<ClientStatus>
pub async fn get_client_status(&self) -> Result<ClientStatus>
Get client status information
Sourcepub fn extract_namespace(&self, queue_name: &str) -> Option<String>
pub fn extract_namespace(&self, queue_name: &str) -> Option<String>
Extract namespace from queue name using configured pattern
Sourcepub async fn create_listener(
&self,
buffer_size: usize,
) -> Result<PgmqNotifyListener>
pub async fn create_listener( &self, buffer_size: usize, ) -> Result<PgmqNotifyListener>
Create a notify listener for this client
§Arguments
buffer_size- MPSC channel buffer size (TAS-51: bounded channels)
§Note
TAS-51: Migrated from unbounded to bounded channel to prevent OOM during notification bursts. Buffer size should come from configuration based on context:
- Orchestration:
config.mpsc_channels.orchestration.event_listeners.pgmq_event_buffer_size - Worker:
config.mpsc_channels.worker.event_listeners.pgmq_event_buffer_size
Source§impl PgmqClient
Helper methods for common queue operations
impl PgmqClient
Helper methods for common queue operations
Sourcepub async fn process_namespace_queue(
&self,
namespace: &str,
visibility_timeout: Option<i32>,
batch_size: i32,
) -> Result<Vec<Message<Value>>>
pub async fn process_namespace_queue( &self, namespace: &str, visibility_timeout: Option<i32>, batch_size: i32, ) -> Result<Vec<Message<Value>>>
Process messages from namespace queue
Sourcepub async fn complete_message(
&self,
namespace: &str,
message_id: i64,
) -> Result<()>
pub async fn complete_message( &self, namespace: &str, message_id: i64, ) -> Result<()>
Complete message processing (delete from queue)
Sourcepub async fn initialize_namespace_queues(
&self,
namespaces: &[&str],
) -> Result<()>
pub async fn initialize_namespace_queues( &self, namespaces: &[&str], ) -> Result<()>
Initialize standard namespace queues
Sourcepub async fn send_with_transaction<T>(
&self,
queue_name: &str,
message: &T,
tx: &mut Transaction<'_, Postgres>,
) -> Result<i64>where
T: Serialize,
pub async fn send_with_transaction<T>(
&self,
queue_name: &str,
message: &T,
tx: &mut Transaction<'_, Postgres>,
) -> Result<i64>where
T: Serialize,
Send message within a transaction (for atomic operations)
Trait Implementations§
Source§impl Clone for PgmqClient
impl Clone for PgmqClient
Source§fn clone(&self) -> PgmqClient
fn clone(&self) -> PgmqClient
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for PgmqClient
impl !RefUnwindSafe for PgmqClient
impl Send for PgmqClient
impl Sync for PgmqClient
impl Unpin for PgmqClient
impl !UnwindSafe for PgmqClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more