tasker_pgmq/events.rs
1//! # Event types for PGMQ notifications
2//!
3//! This module defines the event types that are emitted by PGMQ operations
4//! and can be listened to for real-time processing. Events are designed to be
5//! lightweight and contain only essential information to minimize notification overhead.
6//!
7//! ## Event Types
8//!
9//! - [`QueueCreatedEvent`] - Emitted when a new queue is created
10//! - [`MessageReadyEvent`] - Emitted when a message is enqueued (signal only, large messages)
11//! - [`MessageWithPayloadEvent`] - Emitted when a message is enqueued with full payload (TAS-133)
12//! - [`BatchReadyEvent`] - Emitted when a batch of messages is enqueued
13//!
14//! ## TAS-133: Full Payload Notifications
15//!
16//! When a message is small enough (< 7KB), the full payload is included in the notification
17//! via `MessageWithPayloadEvent`. This enables RabbitMQ-style direct processing without
18//! a separate fetch operation. For large messages, `MessageReadyEvent` is used as a signal
19//! that requires fetching the message via `pgmq_read_specific_message`.
20//!
21//! ## Usage
22//!
23//! ```rust
24//! use tasker_pgmq::{PgmqNotifyEvent, MessageReadyEvent, MessageWithPayloadEvent};
25//! use chrono::Utc;
26//!
27//! // Signal-only event (large messages)
28//! let signal_event = PgmqNotifyEvent::MessageReady(MessageReadyEvent {
29//! queue_name: "tasks_queue".to_string(),
30//! namespace: "tasks".to_string(),
31//! msg_id: 12345,
32//! ready_at: Utc::now(),
33//! metadata: Default::default(),
34//! visibility_timeout_seconds: Some(30),
35//! });
36//!
37//! // Full payload event (small messages, TAS-133)
38//! let payload_event = PgmqNotifyEvent::MessageWithPayload(MessageWithPayloadEvent {
39//! queue_name: "tasks_queue".to_string(),
40//! namespace: "tasks".to_string(),
41//! msg_id: 12346,
42//! message: serde_json::json!({"task": "process", "data": "small"}),
43//! ready_at: Utc::now(),
44//! delay_seconds: 0,
45//! });
46//!
47//! // Serialize to JSON for notification
48//! let json = serde_json::to_string(&signal_event).unwrap();
49//! assert!(json.contains("message_ready"));
50//! ```
51
52use chrono::{DateTime, Utc};
53use serde::{Deserialize, Serialize};
54use std::collections::HashMap;
55
56/// Union of all possible PGMQ notification events
57///
58/// This enum represents all event types that can be emitted by PGMQ operations.
59/// Events are tagged for JSON serialization and can be pattern-matched for handling.
60///
61/// # Examples
62///
63/// ```rust
64/// use tasker_pgmq::{PgmqNotifyEvent, QueueCreatedEvent};
65/// use chrono::Utc;
66///
67/// let event = PgmqNotifyEvent::QueueCreated(QueueCreatedEvent {
68/// queue_name: "new_queue".to_string(),
69/// namespace: "default".to_string(),
70/// created_at: Utc::now(),
71/// metadata: Default::default(),
72/// });
73///
74/// match event {
75/// PgmqNotifyEvent::QueueCreated(e) => {
76/// println!("Queue created: {}", e.queue_name);
77/// }
78/// PgmqNotifyEvent::MessageReady(e) => {
79/// println!("Message ready: {}", e.msg_id);
80/// }
81/// PgmqNotifyEvent::MessageWithPayload(e) => {
82/// println!("Message with payload: {}", e.msg_id);
83/// }
84/// PgmqNotifyEvent::BatchReady(e) => {
85/// println!("Batch ready: {} messages", e.message_count);
86/// }
87/// }
88/// ```
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
90#[serde(tag = "event_type", rename_all = "snake_case")]
91pub enum PgmqNotifyEvent {
92 /// Queue was created
93 QueueCreated(QueueCreatedEvent),
94 /// Message is ready for processing in a queue (signal only, requires fetch)
95 /// Used for large messages (>= 7KB) that don't fit in pg_notify payload
96 MessageReady(MessageReadyEvent),
97 /// Message with full payload included (TAS-133)
98 /// Used for small messages (< 7KB) - enables direct processing without fetch
99 MessageWithPayload(MessageWithPayloadEvent),
100 /// Batch of messages are ready for processing in a queue
101 BatchReady(BatchReadyEvent),
102}
103
104/// Event emitted when a new PGMQ queue is created
105///
106/// This event is triggered when a new queue is created in the PGMQ system.
107/// It includes the queue name, extracted namespace, and creation timestamp.
108///
109/// # Examples
110///
111/// ```rust
112/// use tasker_pgmq::QueueCreatedEvent;
113/// use chrono::Utc;
114/// use std::collections::HashMap;
115///
116/// let event = QueueCreatedEvent {
117/// queue_name: "orders_queue".to_string(),
118/// namespace: "orders".to_string(),
119/// created_at: Utc::now(),
120/// metadata: HashMap::new(),
121/// };
122///
123/// assert_eq!(event.queue_name, "orders_queue");
124/// assert_eq!(event.namespace, "orders");
125/// ```
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
127pub struct QueueCreatedEvent {
128 /// Name of the queue that was created
129 pub queue_name: String,
130 /// Extracted namespace from the queue name
131 pub namespace: String,
132 /// When the queue was created
133 pub created_at: DateTime<Utc>,
134 /// Optional metadata about the queue
135 #[serde(default)]
136 pub metadata: HashMap<String, String>,
137}
138
139/// Event emitted when a message is ready for processing
140///
141/// This event is triggered when a message is enqueued in PGMQ and becomes
142/// available for processing by workers. It provides the message ID and queue
143/// information needed to claim and process the message.
144///
145/// # Examples
146///
147/// ```rust
148/// use tasker_pgmq::MessageReadyEvent;
149/// use chrono::Utc;
150/// use std::collections::HashMap;
151///
152/// let event = MessageReadyEvent {
153/// msg_id: 42,
154/// queue_name: "tasks_queue".to_string(),
155/// namespace: "tasks".to_string(),
156/// ready_at: Utc::now(),
157/// metadata: HashMap::new(),
158/// visibility_timeout_seconds: Some(30),
159/// };
160///
161/// assert_eq!(event.msg_id, 42);
162/// assert_eq!(event.queue_name, "tasks_queue");
163/// assert_eq!(event.namespace, "tasks");
164/// ```
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub struct MessageReadyEvent {
167 /// ID of the message that's ready
168 pub msg_id: i64,
169 /// Queue where the message is available
170 pub queue_name: String,
171 /// Extracted namespace from the queue name
172 pub namespace: String,
173 /// When the message became ready
174 pub ready_at: DateTime<Utc>,
175 /// Optional message metadata (limited by payload size)
176 #[serde(default)]
177 pub metadata: HashMap<String, String>,
178 /// Visibility timeout if applicable
179 pub visibility_timeout_seconds: Option<i32>,
180}
181
182impl PgmqNotifyEvent {
183 /// Get the namespace for any event type
184 #[must_use]
185 pub fn namespace(&self) -> &str {
186 match self {
187 PgmqNotifyEvent::QueueCreated(event) => &event.namespace,
188 PgmqNotifyEvent::MessageReady(event) => &event.namespace,
189 PgmqNotifyEvent::MessageWithPayload(event) => &event.namespace,
190 PgmqNotifyEvent::BatchReady(event) => &event.namespace,
191 }
192 }
193
194 /// Get the queue name for any event type
195 #[must_use]
196 pub fn queue_name(&self) -> &str {
197 match self {
198 PgmqNotifyEvent::QueueCreated(event) => &event.queue_name,
199 PgmqNotifyEvent::MessageReady(event) => &event.queue_name,
200 PgmqNotifyEvent::MessageWithPayload(event) => &event.queue_name,
201 PgmqNotifyEvent::BatchReady(event) => &event.queue_name,
202 }
203 }
204
205 /// Get the timestamp for any event type
206 #[must_use]
207 pub fn timestamp(&self) -> DateTime<Utc> {
208 match self {
209 PgmqNotifyEvent::QueueCreated(event) => event.created_at,
210 PgmqNotifyEvent::MessageReady(event) => event.ready_at,
211 PgmqNotifyEvent::MessageWithPayload(event) => event.ready_at,
212 PgmqNotifyEvent::BatchReady(event) => event.ready_at,
213 }
214 }
215
216 /// Get metadata for any event type
217 ///
218 /// Note: `MessageWithPayload` events don't have metadata, returns empty HashMap
219 #[must_use]
220 pub fn metadata(&self) -> &HashMap<String, String> {
221 // Static empty map for variants without metadata
222 static EMPTY: std::sync::OnceLock<HashMap<String, String>> = std::sync::OnceLock::new();
223 match self {
224 PgmqNotifyEvent::QueueCreated(event) => &event.metadata,
225 PgmqNotifyEvent::MessageReady(event) => &event.metadata,
226 PgmqNotifyEvent::MessageWithPayload(_) => EMPTY.get_or_init(HashMap::new),
227 PgmqNotifyEvent::BatchReady(event) => &event.metadata,
228 }
229 }
230
231 /// Check if event matches a specific namespace
232 #[must_use]
233 pub fn matches_namespace(&self, namespace: &str) -> bool {
234 self.namespace() == namespace
235 }
236
237 /// Get the event type as a string
238 #[must_use]
239 pub fn event_type(&self) -> &'static str {
240 match self {
241 PgmqNotifyEvent::QueueCreated(_) => "queue_created",
242 PgmqNotifyEvent::MessageReady(_) => "message_ready",
243 PgmqNotifyEvent::MessageWithPayload(_) => "message_with_payload",
244 PgmqNotifyEvent::BatchReady(_) => "batch_ready",
245 }
246 }
247
248 /// Get the message ID if this event is message-related
249 ///
250 /// Returns `Some(msg_id)` for `MessageReady` and `MessageWithPayload` events,
251 /// `None` for queue creation and batch events.
252 #[must_use]
253 pub fn msg_id(&self) -> Option<i64> {
254 match self {
255 PgmqNotifyEvent::MessageReady(event) => Some(event.msg_id),
256 PgmqNotifyEvent::MessageWithPayload(event) => Some(event.msg_id),
257 PgmqNotifyEvent::QueueCreated(_) | PgmqNotifyEvent::BatchReady(_) => None,
258 }
259 }
260
261 /// Check if this event includes the full message payload (TAS-133)
262 ///
263 /// Returns `true` for `MessageWithPayload` events where the message
264 /// can be processed directly without a separate fetch.
265 #[must_use]
266 pub fn has_payload(&self) -> bool {
267 matches!(self, PgmqNotifyEvent::MessageWithPayload(_))
268 }
269
270 /// Get the message payload if available (TAS-133)
271 ///
272 /// Returns `Some(&Value)` for `MessageWithPayload` events,
273 /// `None` for all other event types.
274 #[must_use]
275 pub fn payload(&self) -> Option<&serde_json::Value> {
276 match self {
277 PgmqNotifyEvent::MessageWithPayload(event) => Some(&event.message),
278 _ => None,
279 }
280 }
281}
282
283impl QueueCreatedEvent {
284 /// Create a new queue created event
285 pub fn new<S: Into<String>>(queue_name: S, namespace: S) -> Self {
286 Self {
287 queue_name: queue_name.into(),
288 namespace: namespace.into(),
289 created_at: Utc::now(),
290 metadata: HashMap::new(),
291 }
292 }
293
294 /// Create with custom timestamp
295 pub fn with_timestamp<S: Into<String>>(
296 queue_name: S,
297 namespace: S,
298 created_at: DateTime<Utc>,
299 ) -> Self {
300 Self {
301 queue_name: queue_name.into(),
302 namespace: namespace.into(),
303 created_at,
304 metadata: HashMap::new(),
305 }
306 }
307
308 /// Add metadata to the event
309 #[must_use]
310 pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
311 self.metadata = metadata;
312 self
313 }
314
315 /// Add a single metadata entry
316 pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
317 self.metadata.insert(key.into(), value.into());
318 self
319 }
320}
321
322impl MessageReadyEvent {
323 /// Create a new message ready event
324 pub fn new<S: Into<String>>(msg_id: i64, queue_name: S, namespace: S) -> Self {
325 Self {
326 msg_id,
327 queue_name: queue_name.into(),
328 namespace: namespace.into(),
329 ready_at: Utc::now(),
330 metadata: HashMap::new(),
331 visibility_timeout_seconds: None,
332 }
333 }
334
335 /// Create with custom timestamp
336 pub fn with_timestamp<S: Into<String>>(
337 msg_id: i64,
338 queue_name: S,
339 namespace: S,
340 ready_at: DateTime<Utc>,
341 ) -> Self {
342 Self {
343 msg_id,
344 queue_name: queue_name.into(),
345 namespace: namespace.into(),
346 ready_at,
347 metadata: HashMap::new(),
348 visibility_timeout_seconds: None,
349 }
350 }
351
352 /// Add metadata to the event
353 #[must_use]
354 pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
355 self.metadata = metadata;
356 self
357 }
358
359 /// Add a single metadata entry
360 pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
361 self.metadata.insert(key.into(), value.into());
362 self
363 }
364
365 /// Set visibility timeout
366 #[must_use]
367 pub fn with_visibility_timeout(mut self, timeout_seconds: i32) -> Self {
368 self.visibility_timeout_seconds = Some(timeout_seconds);
369 self
370 }
371}
372
373/// Event emitted when a message is ready with full payload included (TAS-133)
374///
375/// This event is triggered for messages smaller than 7KB, where the full payload
376/// can fit within pg_notify's ~8KB limit. This enables RabbitMQ-style direct
377/// processing without a separate database fetch operation.
378///
379/// For larger messages, [`MessageReadyEvent`] is used instead (signal-only).
380///
381/// # Benefits
382///
383/// - **Reduced latency**: No separate fetch required
384/// - **Unified consumer code**: Same processing pattern as RabbitMQ
385/// - **Direct ack**: Can delete message by msg_id after processing
386///
387/// # Examples
388///
389/// ```rust
390/// use tasker_pgmq::MessageWithPayloadEvent;
391/// use chrono::Utc;
392///
393/// let event = MessageWithPayloadEvent {
394/// msg_id: 42,
395/// queue_name: "tasks_queue".to_string(),
396/// namespace: "tasks".to_string(),
397/// message: serde_json::json!({"task": "process", "data": [1, 2, 3]}),
398/// ready_at: Utc::now(),
399/// delay_seconds: 0,
400/// };
401///
402/// // Process message directly without fetching
403/// let task_data = &event.message["task"];
404/// assert_eq!(task_data, "process");
405/// ```
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
407pub struct MessageWithPayloadEvent {
408 /// ID of the message (for ack/delete after processing)
409 pub msg_id: i64,
410 /// Queue where the message is stored
411 pub queue_name: String,
412 /// Extracted namespace from the queue name
413 pub namespace: String,
414 /// The full message payload (included because size < 7KB)
415 pub message: serde_json::Value,
416 /// When the message became ready
417 pub ready_at: DateTime<Utc>,
418 /// Delay in seconds before message became visible (if any)
419 #[serde(default)]
420 pub delay_seconds: i32,
421}
422
423impl MessageWithPayloadEvent {
424 /// Create a new message with payload event
425 pub fn new<S: Into<String>>(
426 msg_id: i64,
427 queue_name: S,
428 namespace: S,
429 message: serde_json::Value,
430 ) -> Self {
431 Self {
432 msg_id,
433 queue_name: queue_name.into(),
434 namespace: namespace.into(),
435 message,
436 ready_at: Utc::now(),
437 delay_seconds: 0,
438 }
439 }
440
441 /// Create with custom timestamp
442 pub fn with_timestamp<S: Into<String>>(
443 msg_id: i64,
444 queue_name: S,
445 namespace: S,
446 message: serde_json::Value,
447 ready_at: DateTime<Utc>,
448 ) -> Self {
449 Self {
450 msg_id,
451 queue_name: queue_name.into(),
452 namespace: namespace.into(),
453 message,
454 ready_at,
455 delay_seconds: 0,
456 }
457 }
458
459 /// Set delay seconds
460 #[must_use]
461 pub fn with_delay_seconds(mut self, delay_seconds: i32) -> Self {
462 self.delay_seconds = delay_seconds;
463 self
464 }
465}
466
467/// Event emitted when a batch of messages is ready for processing
468///
469/// This event is triggered when multiple messages are enqueued in PGMQ via batch
470/// operations and become available for processing. It provides the message IDs and
471/// queue information needed to claim and process the batch.
472///
473/// # Examples
474///
475/// ```rust
476/// use tasker_pgmq::BatchReadyEvent;
477/// use chrono::Utc;
478/// use std::collections::HashMap;
479///
480/// let event = BatchReadyEvent {
481/// msg_ids: vec![1, 2, 3],
482/// queue_name: "tasks_queue".to_string(),
483/// namespace: "tasks".to_string(),
484/// message_count: 3,
485/// ready_at: Utc::now(),
486/// metadata: HashMap::new(),
487/// delay_seconds: 0,
488/// };
489///
490/// assert_eq!(event.msg_ids, vec![1, 2, 3]);
491/// assert_eq!(event.message_count, 3);
492/// ```
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
494pub struct BatchReadyEvent {
495 /// IDs of the messages in the batch
496 pub msg_ids: Vec<i64>,
497 /// Queue where the messages are available
498 pub queue_name: String,
499 /// Extracted namespace from the queue name
500 pub namespace: String,
501 /// Number of messages in the batch
502 pub message_count: i64,
503 /// When the batch became ready
504 pub ready_at: DateTime<Utc>,
505 /// Optional message metadata (limited by payload size)
506 #[serde(default)]
507 pub metadata: HashMap<String, String>,
508 /// Delay in seconds before messages become visible
509 pub delay_seconds: i32,
510}
511
512impl BatchReadyEvent {
513 /// Create a new batch ready event
514 pub fn new<S: Into<String>>(msg_ids: Vec<i64>, queue_name: S, namespace: S) -> Self {
515 let message_count = msg_ids.len() as i64;
516 Self {
517 msg_ids,
518 queue_name: queue_name.into(),
519 namespace: namespace.into(),
520 message_count,
521 ready_at: Utc::now(),
522 metadata: HashMap::new(),
523 delay_seconds: 0,
524 }
525 }
526
527 /// Create with custom timestamp
528 pub fn with_timestamp<S: Into<String>>(
529 msg_ids: Vec<i64>,
530 queue_name: S,
531 namespace: S,
532 ready_at: DateTime<Utc>,
533 ) -> Self {
534 let message_count = msg_ids.len() as i64;
535 Self {
536 msg_ids,
537 queue_name: queue_name.into(),
538 namespace: namespace.into(),
539 message_count,
540 ready_at,
541 metadata: HashMap::new(),
542 delay_seconds: 0,
543 }
544 }
545
546 /// Add metadata to the event
547 #[must_use]
548 pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
549 self.metadata = metadata;
550 self
551 }
552
553 /// Add a single metadata entry
554 pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
555 self.metadata.insert(key.into(), value.into());
556 self
557 }
558
559 /// Set delay seconds
560 #[must_use]
561 pub fn with_delay_seconds(mut self, delay_seconds: i32) -> Self {
562 self.delay_seconds = delay_seconds;
563 self
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_queue_created_event() {
573 let event = QueueCreatedEvent::new("orders_queue", "orders");
574 assert_eq!(event.queue_name, "orders_queue");
575 assert_eq!(event.namespace, "orders");
576 assert!(event.metadata.is_empty());
577 }
578
579 #[test]
580 fn test_message_ready_event() {
581 let event = MessageReadyEvent::new(123, "inventory_queue", "inventory");
582 assert_eq!(event.msg_id, 123);
583 assert_eq!(event.queue_name, "inventory_queue");
584 assert_eq!(event.namespace, "inventory");
585 assert!(event.metadata.is_empty());
586 assert_eq!(event.visibility_timeout_seconds, None);
587 }
588
589 #[test]
590 fn test_event_common_methods() {
591 let queue_event =
592 PgmqNotifyEvent::QueueCreated(QueueCreatedEvent::new("test_queue", "test"));
593 let message_event =
594 PgmqNotifyEvent::MessageReady(MessageReadyEvent::new(456, "test_queue", "test"));
595
596 assert_eq!(queue_event.namespace(), "test");
597 assert_eq!(queue_event.queue_name(), "test_queue");
598 assert_eq!(queue_event.event_type(), "queue_created");
599 assert!(queue_event.matches_namespace("test"));
600 assert!(!queue_event.matches_namespace("other"));
601
602 assert_eq!(message_event.namespace(), "test");
603 assert_eq!(message_event.queue_name(), "test_queue");
604 assert_eq!(message_event.event_type(), "message_ready");
605 assert!(message_event.matches_namespace("test"));
606 }
607
608 #[test]
609 fn test_event_serialization() {
610 let event = PgmqNotifyEvent::QueueCreated(
611 QueueCreatedEvent::new("orders_queue", "orders").add_metadata("created_by", "system"),
612 );
613
614 let json = serde_json::to_string(&event).unwrap();
615 let deserialized: PgmqNotifyEvent = serde_json::from_str(&json).unwrap();
616
617 assert_eq!(event, deserialized);
618 }
619
620 #[test]
621 fn test_metadata_builder() {
622 let mut metadata = HashMap::new();
623 metadata.insert("key1".to_string(), "value1".to_string());
624 metadata.insert("key2".to_string(), "value2".to_string());
625
626 let event = QueueCreatedEvent::new("test_queue", "test")
627 .with_metadata(metadata)
628 .add_metadata("single_key", "single_value");
629
630 assert_eq!(
631 event.metadata.get("single_key"),
632 Some(&"single_value".to_string())
633 );
634 assert_eq!(event.metadata.get("key1"), Some(&"value1".to_string()));
635 assert_eq!(event.metadata.get("key2"), Some(&"value2".to_string()));
636 }
637}