Skip to main content

tasker_pgmq/
events.rs

1//! # Event types for PGMQ notifications
2//!
3//! This module defines the event types that are emitted by PGMQ operations
4//! and can be listened to for real-time processing. Events are designed to be
5//! lightweight and contain only essential information to minimize notification overhead.
6//!
7//! ## Event Types
8//!
9//! - [`QueueCreatedEvent`] - Emitted when a new queue is created
10//! - [`MessageReadyEvent`] - Emitted when a message is enqueued (signal only, large messages)
11//! - [`MessageWithPayloadEvent`] - Emitted when a message is enqueued with full payload (TAS-133)
12//! - [`BatchReadyEvent`] - Emitted when a batch of messages is enqueued
13//!
14//! ## TAS-133: Full Payload Notifications
15//!
16//! When a message is small enough (< 7KB), the full payload is included in the notification
17//! via `MessageWithPayloadEvent`. This enables RabbitMQ-style direct processing without
18//! a separate fetch operation. For large messages, `MessageReadyEvent` is used as a signal
19//! that requires fetching the message via `pgmq_read_specific_message`.
20//!
21//! ## Usage
22//!
23//! ```rust
24//! use tasker_pgmq::{PgmqNotifyEvent, MessageReadyEvent, MessageWithPayloadEvent};
25//! use chrono::Utc;
26//!
27//! // Signal-only event (large messages)
28//! let signal_event = PgmqNotifyEvent::MessageReady(MessageReadyEvent {
29//!     queue_name: "tasks_queue".to_string(),
30//!     namespace: "tasks".to_string(),
31//!     msg_id: 12345,
32//!     ready_at: Utc::now(),
33//!     metadata: Default::default(),
34//!     visibility_timeout_seconds: Some(30),
35//! });
36//!
37//! // Full payload event (small messages, TAS-133)
38//! let payload_event = PgmqNotifyEvent::MessageWithPayload(MessageWithPayloadEvent {
39//!     queue_name: "tasks_queue".to_string(),
40//!     namespace: "tasks".to_string(),
41//!     msg_id: 12346,
42//!     message: serde_json::json!({"task": "process", "data": "small"}),
43//!     ready_at: Utc::now(),
44//!     delay_seconds: 0,
45//! });
46//!
47//! // Serialize to JSON for notification
48//! let json = serde_json::to_string(&signal_event).unwrap();
49//! assert!(json.contains("message_ready"));
50//! ```
51
52use chrono::{DateTime, Utc};
53use serde::{Deserialize, Serialize};
54use std::collections::HashMap;
55
56/// Union of all possible PGMQ notification events
57///
58/// This enum represents all event types that can be emitted by PGMQ operations.
59/// Events are tagged for JSON serialization and can be pattern-matched for handling.
60///
61/// # Examples
62///
63/// ```rust
64/// use tasker_pgmq::{PgmqNotifyEvent, QueueCreatedEvent};
65/// use chrono::Utc;
66///
67/// let event = PgmqNotifyEvent::QueueCreated(QueueCreatedEvent {
68///     queue_name: "new_queue".to_string(),
69///     namespace: "default".to_string(),
70///     created_at: Utc::now(),
71///     metadata: Default::default(),
72/// });
73///
74/// match event {
75///     PgmqNotifyEvent::QueueCreated(e) => {
76///         println!("Queue created: {}", e.queue_name);
77///     }
78///     PgmqNotifyEvent::MessageReady(e) => {
79///         println!("Message ready: {}", e.msg_id);
80///     }
81///     PgmqNotifyEvent::MessageWithPayload(e) => {
82///         println!("Message with payload: {}", e.msg_id);
83///     }
84///     PgmqNotifyEvent::BatchReady(e) => {
85///         println!("Batch ready: {} messages", e.message_count);
86///     }
87/// }
88/// ```
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
90#[serde(tag = "event_type", rename_all = "snake_case")]
91pub enum PgmqNotifyEvent {
92    /// Queue was created
93    QueueCreated(QueueCreatedEvent),
94    /// Message is ready for processing in a queue (signal only, requires fetch)
95    /// Used for large messages (>= 7KB) that don't fit in pg_notify payload
96    MessageReady(MessageReadyEvent),
97    /// Message with full payload included (TAS-133)
98    /// Used for small messages (< 7KB) - enables direct processing without fetch
99    MessageWithPayload(MessageWithPayloadEvent),
100    /// Batch of messages are ready for processing in a queue
101    BatchReady(BatchReadyEvent),
102}
103
104/// Event emitted when a new PGMQ queue is created
105///
106/// This event is triggered when a new queue is created in the PGMQ system.
107/// It includes the queue name, extracted namespace, and creation timestamp.
108///
109/// # Examples
110///
111/// ```rust
112/// use tasker_pgmq::QueueCreatedEvent;
113/// use chrono::Utc;
114/// use std::collections::HashMap;
115///
116/// let event = QueueCreatedEvent {
117///     queue_name: "orders_queue".to_string(),
118///     namespace: "orders".to_string(),
119///     created_at: Utc::now(),
120///     metadata: HashMap::new(),
121/// };
122///
123/// assert_eq!(event.queue_name, "orders_queue");
124/// assert_eq!(event.namespace, "orders");
125/// ```
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
127pub struct QueueCreatedEvent {
128    /// Name of the queue that was created
129    pub queue_name: String,
130    /// Extracted namespace from the queue name
131    pub namespace: String,
132    /// When the queue was created
133    pub created_at: DateTime<Utc>,
134    /// Optional metadata about the queue
135    #[serde(default)]
136    pub metadata: HashMap<String, String>,
137}
138
139/// Event emitted when a message is ready for processing
140///
141/// This event is triggered when a message is enqueued in PGMQ and becomes
142/// available for processing by workers. It provides the message ID and queue
143/// information needed to claim and process the message.
144///
145/// # Examples
146///
147/// ```rust
148/// use tasker_pgmq::MessageReadyEvent;
149/// use chrono::Utc;
150/// use std::collections::HashMap;
151///
152/// let event = MessageReadyEvent {
153///     msg_id: 42,
154///     queue_name: "tasks_queue".to_string(),
155///     namespace: "tasks".to_string(),
156///     ready_at: Utc::now(),
157///     metadata: HashMap::new(),
158///     visibility_timeout_seconds: Some(30),
159/// };
160///
161/// assert_eq!(event.msg_id, 42);
162/// assert_eq!(event.queue_name, "tasks_queue");
163/// assert_eq!(event.namespace, "tasks");
164/// ```
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub struct MessageReadyEvent {
167    /// ID of the message that's ready
168    pub msg_id: i64,
169    /// Queue where the message is available
170    pub queue_name: String,
171    /// Extracted namespace from the queue name
172    pub namespace: String,
173    /// When the message became ready
174    pub ready_at: DateTime<Utc>,
175    /// Optional message metadata (limited by payload size)
176    #[serde(default)]
177    pub metadata: HashMap<String, String>,
178    /// Visibility timeout if applicable
179    pub visibility_timeout_seconds: Option<i32>,
180}
181
182impl PgmqNotifyEvent {
183    /// Get the namespace for any event type
184    #[must_use]
185    pub fn namespace(&self) -> &str {
186        match self {
187            PgmqNotifyEvent::QueueCreated(event) => &event.namespace,
188            PgmqNotifyEvent::MessageReady(event) => &event.namespace,
189            PgmqNotifyEvent::MessageWithPayload(event) => &event.namespace,
190            PgmqNotifyEvent::BatchReady(event) => &event.namespace,
191        }
192    }
193
194    /// Get the queue name for any event type
195    #[must_use]
196    pub fn queue_name(&self) -> &str {
197        match self {
198            PgmqNotifyEvent::QueueCreated(event) => &event.queue_name,
199            PgmqNotifyEvent::MessageReady(event) => &event.queue_name,
200            PgmqNotifyEvent::MessageWithPayload(event) => &event.queue_name,
201            PgmqNotifyEvent::BatchReady(event) => &event.queue_name,
202        }
203    }
204
205    /// Get the timestamp for any event type
206    #[must_use]
207    pub fn timestamp(&self) -> DateTime<Utc> {
208        match self {
209            PgmqNotifyEvent::QueueCreated(event) => event.created_at,
210            PgmqNotifyEvent::MessageReady(event) => event.ready_at,
211            PgmqNotifyEvent::MessageWithPayload(event) => event.ready_at,
212            PgmqNotifyEvent::BatchReady(event) => event.ready_at,
213        }
214    }
215
216    /// Get metadata for any event type
217    ///
218    /// Note: `MessageWithPayload` events don't have metadata, returns empty HashMap
219    #[must_use]
220    pub fn metadata(&self) -> &HashMap<String, String> {
221        // Static empty map for variants without metadata
222        static EMPTY: std::sync::OnceLock<HashMap<String, String>> = std::sync::OnceLock::new();
223        match self {
224            PgmqNotifyEvent::QueueCreated(event) => &event.metadata,
225            PgmqNotifyEvent::MessageReady(event) => &event.metadata,
226            PgmqNotifyEvent::MessageWithPayload(_) => EMPTY.get_or_init(HashMap::new),
227            PgmqNotifyEvent::BatchReady(event) => &event.metadata,
228        }
229    }
230
231    /// Check if event matches a specific namespace
232    #[must_use]
233    pub fn matches_namespace(&self, namespace: &str) -> bool {
234        self.namespace() == namespace
235    }
236
237    /// Get the event type as a string
238    #[must_use]
239    pub fn event_type(&self) -> &'static str {
240        match self {
241            PgmqNotifyEvent::QueueCreated(_) => "queue_created",
242            PgmqNotifyEvent::MessageReady(_) => "message_ready",
243            PgmqNotifyEvent::MessageWithPayload(_) => "message_with_payload",
244            PgmqNotifyEvent::BatchReady(_) => "batch_ready",
245        }
246    }
247
248    /// Get the message ID if this event is message-related
249    ///
250    /// Returns `Some(msg_id)` for `MessageReady` and `MessageWithPayload` events,
251    /// `None` for queue creation and batch events.
252    #[must_use]
253    pub fn msg_id(&self) -> Option<i64> {
254        match self {
255            PgmqNotifyEvent::MessageReady(event) => Some(event.msg_id),
256            PgmqNotifyEvent::MessageWithPayload(event) => Some(event.msg_id),
257            PgmqNotifyEvent::QueueCreated(_) | PgmqNotifyEvent::BatchReady(_) => None,
258        }
259    }
260
261    /// Check if this event includes the full message payload (TAS-133)
262    ///
263    /// Returns `true` for `MessageWithPayload` events where the message
264    /// can be processed directly without a separate fetch.
265    #[must_use]
266    pub fn has_payload(&self) -> bool {
267        matches!(self, PgmqNotifyEvent::MessageWithPayload(_))
268    }
269
270    /// Get the message payload if available (TAS-133)
271    ///
272    /// Returns `Some(&Value)` for `MessageWithPayload` events,
273    /// `None` for all other event types.
274    #[must_use]
275    pub fn payload(&self) -> Option<&serde_json::Value> {
276        match self {
277            PgmqNotifyEvent::MessageWithPayload(event) => Some(&event.message),
278            _ => None,
279        }
280    }
281}
282
283impl QueueCreatedEvent {
284    /// Create a new queue created event
285    pub fn new<S: Into<String>>(queue_name: S, namespace: S) -> Self {
286        Self {
287            queue_name: queue_name.into(),
288            namespace: namespace.into(),
289            created_at: Utc::now(),
290            metadata: HashMap::new(),
291        }
292    }
293
294    /// Create with custom timestamp
295    pub fn with_timestamp<S: Into<String>>(
296        queue_name: S,
297        namespace: S,
298        created_at: DateTime<Utc>,
299    ) -> Self {
300        Self {
301            queue_name: queue_name.into(),
302            namespace: namespace.into(),
303            created_at,
304            metadata: HashMap::new(),
305        }
306    }
307
308    /// Add metadata to the event
309    #[must_use]
310    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
311        self.metadata = metadata;
312        self
313    }
314
315    /// Add a single metadata entry
316    pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
317        self.metadata.insert(key.into(), value.into());
318        self
319    }
320}
321
322impl MessageReadyEvent {
323    /// Create a new message ready event
324    pub fn new<S: Into<String>>(msg_id: i64, queue_name: S, namespace: S) -> Self {
325        Self {
326            msg_id,
327            queue_name: queue_name.into(),
328            namespace: namespace.into(),
329            ready_at: Utc::now(),
330            metadata: HashMap::new(),
331            visibility_timeout_seconds: None,
332        }
333    }
334
335    /// Create with custom timestamp
336    pub fn with_timestamp<S: Into<String>>(
337        msg_id: i64,
338        queue_name: S,
339        namespace: S,
340        ready_at: DateTime<Utc>,
341    ) -> Self {
342        Self {
343            msg_id,
344            queue_name: queue_name.into(),
345            namespace: namespace.into(),
346            ready_at,
347            metadata: HashMap::new(),
348            visibility_timeout_seconds: None,
349        }
350    }
351
352    /// Add metadata to the event
353    #[must_use]
354    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
355        self.metadata = metadata;
356        self
357    }
358
359    /// Add a single metadata entry
360    pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
361        self.metadata.insert(key.into(), value.into());
362        self
363    }
364
365    /// Set visibility timeout
366    #[must_use]
367    pub fn with_visibility_timeout(mut self, timeout_seconds: i32) -> Self {
368        self.visibility_timeout_seconds = Some(timeout_seconds);
369        self
370    }
371}
372
373/// Event emitted when a message is ready with full payload included (TAS-133)
374///
375/// This event is triggered for messages smaller than 7KB, where the full payload
376/// can fit within pg_notify's ~8KB limit. This enables RabbitMQ-style direct
377/// processing without a separate database fetch operation.
378///
379/// For larger messages, [`MessageReadyEvent`] is used instead (signal-only).
380///
381/// # Benefits
382///
383/// - **Reduced latency**: No separate fetch required
384/// - **Unified consumer code**: Same processing pattern as RabbitMQ
385/// - **Direct ack**: Can delete message by msg_id after processing
386///
387/// # Examples
388///
389/// ```rust
390/// use tasker_pgmq::MessageWithPayloadEvent;
391/// use chrono::Utc;
392///
393/// let event = MessageWithPayloadEvent {
394///     msg_id: 42,
395///     queue_name: "tasks_queue".to_string(),
396///     namespace: "tasks".to_string(),
397///     message: serde_json::json!({"task": "process", "data": [1, 2, 3]}),
398///     ready_at: Utc::now(),
399///     delay_seconds: 0,
400/// };
401///
402/// // Process message directly without fetching
403/// let task_data = &event.message["task"];
404/// assert_eq!(task_data, "process");
405/// ```
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
407pub struct MessageWithPayloadEvent {
408    /// ID of the message (for ack/delete after processing)
409    pub msg_id: i64,
410    /// Queue where the message is stored
411    pub queue_name: String,
412    /// Extracted namespace from the queue name
413    pub namespace: String,
414    /// The full message payload (included because size < 7KB)
415    pub message: serde_json::Value,
416    /// When the message became ready
417    pub ready_at: DateTime<Utc>,
418    /// Delay in seconds before message became visible (if any)
419    #[serde(default)]
420    pub delay_seconds: i32,
421}
422
423impl MessageWithPayloadEvent {
424    /// Create a new message with payload event
425    pub fn new<S: Into<String>>(
426        msg_id: i64,
427        queue_name: S,
428        namespace: S,
429        message: serde_json::Value,
430    ) -> Self {
431        Self {
432            msg_id,
433            queue_name: queue_name.into(),
434            namespace: namespace.into(),
435            message,
436            ready_at: Utc::now(),
437            delay_seconds: 0,
438        }
439    }
440
441    /// Create with custom timestamp
442    pub fn with_timestamp<S: Into<String>>(
443        msg_id: i64,
444        queue_name: S,
445        namespace: S,
446        message: serde_json::Value,
447        ready_at: DateTime<Utc>,
448    ) -> Self {
449        Self {
450            msg_id,
451            queue_name: queue_name.into(),
452            namespace: namespace.into(),
453            message,
454            ready_at,
455            delay_seconds: 0,
456        }
457    }
458
459    /// Set delay seconds
460    #[must_use]
461    pub fn with_delay_seconds(mut self, delay_seconds: i32) -> Self {
462        self.delay_seconds = delay_seconds;
463        self
464    }
465}
466
467/// Event emitted when a batch of messages is ready for processing
468///
469/// This event is triggered when multiple messages are enqueued in PGMQ via batch
470/// operations and become available for processing. It provides the message IDs and
471/// queue information needed to claim and process the batch.
472///
473/// # Examples
474///
475/// ```rust
476/// use tasker_pgmq::BatchReadyEvent;
477/// use chrono::Utc;
478/// use std::collections::HashMap;
479///
480/// let event = BatchReadyEvent {
481///     msg_ids: vec![1, 2, 3],
482///     queue_name: "tasks_queue".to_string(),
483///     namespace: "tasks".to_string(),
484///     message_count: 3,
485///     ready_at: Utc::now(),
486///     metadata: HashMap::new(),
487///     delay_seconds: 0,
488/// };
489///
490/// assert_eq!(event.msg_ids, vec![1, 2, 3]);
491/// assert_eq!(event.message_count, 3);
492/// ```
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
494pub struct BatchReadyEvent {
495    /// IDs of the messages in the batch
496    pub msg_ids: Vec<i64>,
497    /// Queue where the messages are available
498    pub queue_name: String,
499    /// Extracted namespace from the queue name
500    pub namespace: String,
501    /// Number of messages in the batch
502    pub message_count: i64,
503    /// When the batch became ready
504    pub ready_at: DateTime<Utc>,
505    /// Optional message metadata (limited by payload size)
506    #[serde(default)]
507    pub metadata: HashMap<String, String>,
508    /// Delay in seconds before messages become visible
509    pub delay_seconds: i32,
510}
511
512impl BatchReadyEvent {
513    /// Create a new batch ready event
514    pub fn new<S: Into<String>>(msg_ids: Vec<i64>, queue_name: S, namespace: S) -> Self {
515        let message_count = msg_ids.len() as i64;
516        Self {
517            msg_ids,
518            queue_name: queue_name.into(),
519            namespace: namespace.into(),
520            message_count,
521            ready_at: Utc::now(),
522            metadata: HashMap::new(),
523            delay_seconds: 0,
524        }
525    }
526
527    /// Create with custom timestamp
528    pub fn with_timestamp<S: Into<String>>(
529        msg_ids: Vec<i64>,
530        queue_name: S,
531        namespace: S,
532        ready_at: DateTime<Utc>,
533    ) -> Self {
534        let message_count = msg_ids.len() as i64;
535        Self {
536            msg_ids,
537            queue_name: queue_name.into(),
538            namespace: namespace.into(),
539            message_count,
540            ready_at,
541            metadata: HashMap::new(),
542            delay_seconds: 0,
543        }
544    }
545
546    /// Add metadata to the event
547    #[must_use]
548    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
549        self.metadata = metadata;
550        self
551    }
552
553    /// Add a single metadata entry
554    pub fn add_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
555        self.metadata.insert(key.into(), value.into());
556        self
557    }
558
559    /// Set delay seconds
560    #[must_use]
561    pub fn with_delay_seconds(mut self, delay_seconds: i32) -> Self {
562        self.delay_seconds = delay_seconds;
563        self
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570
571    #[test]
572    fn test_queue_created_event() {
573        let event = QueueCreatedEvent::new("orders_queue", "orders");
574        assert_eq!(event.queue_name, "orders_queue");
575        assert_eq!(event.namespace, "orders");
576        assert!(event.metadata.is_empty());
577    }
578
579    #[test]
580    fn test_message_ready_event() {
581        let event = MessageReadyEvent::new(123, "inventory_queue", "inventory");
582        assert_eq!(event.msg_id, 123);
583        assert_eq!(event.queue_name, "inventory_queue");
584        assert_eq!(event.namespace, "inventory");
585        assert!(event.metadata.is_empty());
586        assert_eq!(event.visibility_timeout_seconds, None);
587    }
588
589    #[test]
590    fn test_event_common_methods() {
591        let queue_event =
592            PgmqNotifyEvent::QueueCreated(QueueCreatedEvent::new("test_queue", "test"));
593        let message_event =
594            PgmqNotifyEvent::MessageReady(MessageReadyEvent::new(456, "test_queue", "test"));
595
596        assert_eq!(queue_event.namespace(), "test");
597        assert_eq!(queue_event.queue_name(), "test_queue");
598        assert_eq!(queue_event.event_type(), "queue_created");
599        assert!(queue_event.matches_namespace("test"));
600        assert!(!queue_event.matches_namespace("other"));
601
602        assert_eq!(message_event.namespace(), "test");
603        assert_eq!(message_event.queue_name(), "test_queue");
604        assert_eq!(message_event.event_type(), "message_ready");
605        assert!(message_event.matches_namespace("test"));
606    }
607
608    #[test]
609    fn test_event_serialization() {
610        let event = PgmqNotifyEvent::QueueCreated(
611            QueueCreatedEvent::new("orders_queue", "orders").add_metadata("created_by", "system"),
612        );
613
614        let json = serde_json::to_string(&event).unwrap();
615        let deserialized: PgmqNotifyEvent = serde_json::from_str(&json).unwrap();
616
617        assert_eq!(event, deserialized);
618    }
619
620    #[test]
621    fn test_metadata_builder() {
622        let mut metadata = HashMap::new();
623        metadata.insert("key1".to_string(), "value1".to_string());
624        metadata.insert("key2".to_string(), "value2".to_string());
625
626        let event = QueueCreatedEvent::new("test_queue", "test")
627            .with_metadata(metadata)
628            .add_metadata("single_key", "single_value");
629
630        assert_eq!(
631            event.metadata.get("single_key"),
632            Some(&"single_value".to_string())
633        );
634        assert_eq!(event.metadata.get("key1"), Some(&"value1".to_string()));
635        assert_eq!(event.metadata.get("key2"), Some(&"value2".to_string()));
636    }
637}