pg_queue/lib.rs
1//! # pg-queue
2//!
3//! PostgreSQL-based job queue, pub/sub, and cache — a Redis replacement.
4//!
5//! - **Queues**: SKIP LOCKED for concurrent job processing
6//! - **Pub/Sub**: LISTEN/NOTIFY for real-time messaging
7//! - **Cache**: UNLOGGED tables with per-query TTL checks
8//! - **Request-Response**: RPC over PostgreSQL with correlation IDs
9//!
10//! ## Setup
11//!
12//! Run `migrations/setup.sql` against your database, then create queues:
13//!
14//! ```sql
15//! SELECT pg_queue_create_queue('emails');
16//! SELECT pg_queue_create_queue('jobs');
17//! ```
18//!
19//! ## Usage
20//!
21//! ```rust,no_run
22//! use pg_queue::{PgQueueManager, QueueName};
23//!
24//! # async fn example() -> pg_queue::Result<()> {
25//! let pool = sqlx::PgPool::connect("postgres://localhost/mydb").await.unwrap();
26//! let mgr = PgQueueManager::new(pool);
27//!
28//! let emails = QueueName::new("emails")?;
29//!
30//! // Push a job
31//! mgr.queue.push(&emails, &serde_json::json!({"to": "user@example.com"})).await?;
32//!
33//! // Pop and process
34//! if let Some(job) = mgr.queue.pop::<serde_json::Value>(&emails).await? {
35//! println!("Processing job {}: {:?}", job.id, job.payload);
36//! mgr.queue.complete(&emails, job.id).await?;
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42pub mod cache;
43pub mod errors;
44pub mod listen;
45pub mod notify;
46pub mod queue;
47pub mod request_response;
48
49pub use cache::CacheRepository;
50pub use errors::{PgQueueError, Result};
51pub use listen::{ListenerService, Notification};
52pub use notify::NotifyService;
53pub use queue::{Job, JobStatus, QueueName, QueueRepository};
54pub use request_response::{RequestResponseService, RequestWrapper};
55
56pub use sqlx::PgPool;
57
58/// Main manager combining all PostgreSQL queue/pubsub/cache functionality
59#[derive(Clone)]
60pub struct PgQueueManager {
61 pub queue: QueueRepository,
62 pub notify: NotifyService,
63 pub cache: CacheRepository,
64 pub request_response: RequestResponseService,
65 pool: PgPool,
66}
67
68impl PgQueueManager {
69 /// Create a new PgQueueManager with the given connection pool
70 pub fn new(pool: PgPool) -> Self {
71 let queue = QueueRepository::new(pool.clone());
72 Self {
73 notify: NotifyService::new(pool.clone()),
74 cache: CacheRepository::new(pool.clone()),
75 request_response: RequestResponseService::new(pool.clone(), queue.clone()),
76 queue,
77 pool,
78 }
79 }
80
81 /// Get a reference to the connection pool
82 pub fn pool(&self) -> &PgPool {
83 &self.pool
84 }
85
86 /// Create a new listener service for pub/sub
87 pub async fn create_listener(&self) -> Result<ListenerService> {
88 ListenerService::new(&self.pool).await
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn test_queue_name_construction() {
98 let q = QueueName::new("orders").unwrap();
99 assert_eq!(q.table_name(), "queue_orders");
100 assert_eq!(q.channel_name(), "queue_orders");
101 }
102}