Skip to main content

tasker_pgmq/
emitter.rs

1//! Event emitters for PGMQ notifications
2
3use async_trait::async_trait;
4use sqlx::PgPool;
5use tracing::{debug, error, instrument, warn};
6
7use crate::config::PgmqNotifyConfig;
8use crate::error::{PgmqNotifyError, Result};
9use crate::events::{
10    BatchReadyEvent, MessageReadyEvent, MessageWithPayloadEvent, PgmqNotifyEvent, QueueCreatedEvent,
11};
12
13/// Trait for emitting PGMQ notifications
14#[async_trait]
15pub trait PgmqNotifyEmitter: Send + Sync {
16    /// Emit a queue created event
17    async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()>;
18
19    /// Emit a message ready event (signal only, for large messages)
20    async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()>;
21
22    /// Emit a message with payload event (TAS-133, for small messages)
23    async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()>;
24
25    /// Emit a batch ready event
26    async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()>;
27
28    /// Emit a generic PGMQ event
29    async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()>;
30
31    /// Check if emitter is healthy and can send notifications
32    async fn is_healthy(&self) -> bool;
33}
34
35/// Database-backed emitter using `PostgreSQL` NOTIFY
36pub struct DbEmitter {
37    pool: PgPool,
38    config: PgmqNotifyConfig,
39}
40
41impl std::fmt::Debug for DbEmitter {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("DbEmitter")
44            .field("config", &self.config)
45            .field("pool", &"PgPool")
46            .finish()
47    }
48}
49
50impl DbEmitter {
51    /// Create a new database emitter
52    pub fn new(pool: PgPool, config: PgmqNotifyConfig) -> Result<Self> {
53        config.validate()?;
54        Ok(Self { pool, config })
55    }
56
57    /// Get the configuration
58    #[must_use]
59    pub fn config(&self) -> &PgmqNotifyConfig {
60        &self.config
61    }
62
63    /// Build notification payload with size validation
64    fn build_payload(&self, event: &PgmqNotifyEvent) -> Result<String> {
65        let payload = if self.config.include_metadata {
66            serde_json::to_string(event)?
67        } else {
68            // Create a minimal payload without metadata
69            let minimal_event = match event {
70                PgmqNotifyEvent::QueueCreated(e) => {
71                    PgmqNotifyEvent::QueueCreated(QueueCreatedEvent {
72                        queue_name: e.queue_name.clone(),
73                        namespace: e.namespace.clone(),
74                        created_at: e.created_at,
75                        metadata: std::collections::HashMap::new(),
76                    })
77                }
78                PgmqNotifyEvent::MessageReady(e) => {
79                    PgmqNotifyEvent::MessageReady(MessageReadyEvent {
80                        msg_id: e.msg_id,
81                        queue_name: e.queue_name.clone(),
82                        namespace: e.namespace.clone(),
83                        ready_at: e.ready_at,
84                        metadata: std::collections::HashMap::new(),
85                        visibility_timeout_seconds: e.visibility_timeout_seconds,
86                    })
87                }
88                PgmqNotifyEvent::BatchReady(e) => PgmqNotifyEvent::BatchReady(BatchReadyEvent {
89                    msg_ids: e.msg_ids.clone(),
90                    queue_name: e.queue_name.clone(),
91                    namespace: e.namespace.clone(),
92                    message_count: e.message_count,
93                    ready_at: e.ready_at,
94                    metadata: std::collections::HashMap::new(),
95                    delay_seconds: e.delay_seconds,
96                }),
97                // MessageWithPayload doesn't have metadata to strip, pass through as-is
98                PgmqNotifyEvent::MessageWithPayload(e) => {
99                    PgmqNotifyEvent::MessageWithPayload(e.clone())
100                }
101            };
102            serde_json::to_string(&minimal_event)?
103        };
104
105        // Check payload size limit
106        if payload.len() > self.config.max_payload_size {
107            return Err(PgmqNotifyError::config(format!(
108                "Payload size {} exceeds limit {}",
109                payload.len(),
110                self.config.max_payload_size
111            )));
112        }
113
114        Ok(payload)
115    }
116
117    /// Send notification to `PostgreSQL` channel
118    #[instrument(skip(self, payload), fields(channel = %channel))]
119    async fn notify_channel(&self, channel: &str, payload: &str) -> Result<()> {
120        debug!("Sending notification to channel: {}", channel);
121
122        let sql = format!("NOTIFY {}, $1", channel);
123        sqlx::query(&sql)
124            .bind(payload)
125            .execute(&self.pool)
126            .await
127            .map_err(|e| {
128                error!("Failed to send notification to channel {}: {}", channel, e);
129                PgmqNotifyError::Database(e)
130            })?;
131
132        debug!("Successfully sent notification to channel: {}", channel);
133        Ok(())
134    }
135}
136
137#[async_trait]
138impl PgmqNotifyEmitter for DbEmitter {
139    #[instrument(skip(self, event), fields(queue = %event.queue_name, namespace = %event.namespace))]
140    async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()> {
141        let pgmq_event = PgmqNotifyEvent::QueueCreated(event);
142        let payload = self.build_payload(&pgmq_event)?;
143        let channel = self.config.queue_created_channel()?;
144
145        self.notify_channel(&channel, &payload).await
146    }
147
148    #[instrument(skip(self, event), fields(msg_id = %event.msg_id, queue = %event.queue_name, namespace = %event.namespace))]
149    async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()> {
150        let namespace = event.namespace.clone();
151        let pgmq_event = PgmqNotifyEvent::MessageReady(event);
152        let payload = self.build_payload(&pgmq_event)?;
153
154        // Send to both namespace-specific and global channels
155        let namespace_channel = self.config.message_ready_channel(&namespace)?;
156        let global_channel = self.config.global_message_ready_channel()?;
157
158        // Send to namespace-specific channel first
159        self.notify_channel(&namespace_channel, &payload).await?;
160
161        // Also send to global channel for listeners monitoring all namespaces
162        if namespace_channel != global_channel {
163            self.notify_channel(&global_channel, &payload).await?;
164        }
165
166        Ok(())
167    }
168
169    #[instrument(skip(self, event), fields(msg_count = %event.message_count, queue = %event.queue_name, namespace = %event.namespace))]
170    async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()> {
171        let namespace = event.namespace.clone();
172        let pgmq_event = PgmqNotifyEvent::BatchReady(event);
173        let payload = self.build_payload(&pgmq_event)?;
174
175        // Send to both namespace-specific and global channels (same pattern as MessageReady)
176        let namespace_channel = self.config.message_ready_channel(&namespace)?;
177        let global_channel = self.config.global_message_ready_channel()?;
178
179        // Send to namespace-specific channel first
180        self.notify_channel(&namespace_channel, &payload).await?;
181
182        // Also send to global channel for listeners monitoring all namespaces
183        if namespace_channel != global_channel {
184            self.notify_channel(&global_channel, &payload).await?;
185        }
186
187        Ok(())
188    }
189
190    #[instrument(skip(self, event), fields(msg_id = %event.msg_id, queue = %event.queue_name, namespace = %event.namespace))]
191    async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()> {
192        let namespace = event.namespace.clone();
193        let pgmq_event = PgmqNotifyEvent::MessageWithPayload(event);
194        let payload = self.build_payload(&pgmq_event)?;
195
196        // Send to both namespace-specific and global channels (same pattern as MessageReady)
197        let namespace_channel = self.config.message_ready_channel(&namespace)?;
198        let global_channel = self.config.global_message_ready_channel()?;
199
200        // Send to namespace-specific channel first
201        self.notify_channel(&namespace_channel, &payload).await?;
202
203        // Also send to global channel for listeners monitoring all namespaces
204        if namespace_channel != global_channel {
205            self.notify_channel(&global_channel, &payload).await?;
206        }
207
208        Ok(())
209    }
210
211    async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()> {
212        match event {
213            PgmqNotifyEvent::QueueCreated(e) => self.emit_queue_created(e).await,
214            PgmqNotifyEvent::MessageReady(e) => self.emit_message_ready(e).await,
215            PgmqNotifyEvent::MessageWithPayload(e) => self.emit_message_with_payload(e).await,
216            PgmqNotifyEvent::BatchReady(e) => self.emit_batch_ready(e).await,
217        }
218    }
219
220    async fn is_healthy(&self) -> bool {
221        // Simple health check - try to get a connection from the pool
222        match self.pool.acquire().await {
223            Ok(_) => true,
224            Err(e) => {
225                warn!("Database emitter health check failed: {}", e);
226                false
227            }
228        }
229    }
230}
231
232/// No-operation emitter for testing and disabled scenarios
233#[derive(Debug, Clone)]
234pub struct NoopEmitter {
235    config: PgmqNotifyConfig,
236}
237
238impl NoopEmitter {
239    /// Create a new no-op emitter
240    pub fn new(config: PgmqNotifyConfig) -> Result<Self> {
241        config.validate()?;
242        Ok(Self { config })
243    }
244
245    /// Get the configuration
246    #[must_use]
247    pub fn config(&self) -> &PgmqNotifyConfig {
248        &self.config
249    }
250}
251
252#[async_trait]
253impl PgmqNotifyEmitter for NoopEmitter {
254    async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()> {
255        debug!(
256            "NoopEmitter: Would emit queue created event for {}",
257            event.queue_name
258        );
259        Ok(())
260    }
261
262    async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()> {
263        debug!(
264            "NoopEmitter: Would emit message ready event for msg {} in queue {}",
265            event.msg_id, event.queue_name
266        );
267        Ok(())
268    }
269
270    async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()> {
271        debug!(
272            "NoopEmitter: Would emit batch ready event for {} messages in queue {}",
273            event.message_count, event.queue_name
274        );
275        Ok(())
276    }
277
278    async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()> {
279        debug!(
280            "NoopEmitter: Would emit message with payload event for msg {} in queue {}",
281            event.msg_id, event.queue_name
282        );
283        Ok(())
284    }
285
286    async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()> {
287        debug!("NoopEmitter: Would emit event: {:?}", event.event_type());
288        Ok(())
289    }
290
291    async fn is_healthy(&self) -> bool {
292        true // No-op emitter is always healthy
293    }
294}
295
296/// Factory for creating emitters based on configuration
297#[derive(Debug)]
298pub struct EmitterFactory;
299
300impl EmitterFactory {
301    /// Create a database emitter
302    #[expect(
303        dead_code,
304        reason = "emitter factory API - module restricted per TAS-187"
305    )]
306    pub fn database(pool: PgPool, config: PgmqNotifyConfig) -> Result<DbEmitter> {
307        DbEmitter::new(pool, config)
308    }
309
310    /// Create a no-op emitter
311    #[allow(
312        dead_code,
313        reason = "emitter factory API - module restricted per TAS-187"
314    )]
315    pub fn noop(config: PgmqNotifyConfig) -> Result<NoopEmitter> {
316        NoopEmitter::new(config)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use std::collections::HashMap;
324
325    #[tokio::test]
326    async fn test_noop_emitter() {
327        let config = PgmqNotifyConfig::default();
328        let emitter = NoopEmitter::new(config).unwrap();
329
330        let queue_event = QueueCreatedEvent::new("test_queue", "test");
331        let message_event = MessageReadyEvent::new(123, "test_queue", "test");
332
333        // All operations should succeed with no-op
334        assert!(emitter.emit_queue_created(queue_event).await.is_ok());
335        assert!(emitter.emit_message_ready(message_event).await.is_ok());
336        assert!(emitter.is_healthy().await);
337    }
338
339    #[test]
340    fn test_payload_building() {
341        let config = PgmqNotifyConfig::default();
342        // Skip the actual database test but test configuration
343        assert!(config.validate().is_ok());
344    }
345
346    #[test]
347    fn test_payload_size_limit() {
348        let mut large_metadata = HashMap::new();
349        for i in 0..1000 {
350            large_metadata.insert(format!("key_{}", i), "x".repeat(100));
351        }
352
353        let event = QueueCreatedEvent::new("test_queue", "test").with_metadata(large_metadata);
354
355        let pgmq_event = PgmqNotifyEvent::QueueCreated(event);
356        let json = serde_json::to_string(&pgmq_event).unwrap();
357
358        // This should exceed the default 7800 byte limit
359        assert!(json.len() > 7800);
360    }
361
362    #[tokio::test]
363    async fn test_emitter_factory() {
364        let config = PgmqNotifyConfig::default();
365
366        // Test no-op factory
367        let noop = EmitterFactory::noop(config.clone()).unwrap();
368        assert!(noop.is_healthy().await);
369    }
370
371    #[test]
372    fn test_channel_naming() {
373        let config = PgmqNotifyConfig::new().with_channels_prefix("test_app");
374
375        assert_eq!(
376            config.queue_created_channel().unwrap(),
377            "test_app.pgmq_queue_created"
378        );
379        assert_eq!(
380            config.message_ready_channel("orders").unwrap(),
381            "test_app.pgmq_message_ready.orders"
382        );
383        assert_eq!(
384            config.global_message_ready_channel().unwrap(),
385            "test_app.pgmq_message_ready"
386        );
387    }
388}