1use async_trait::async_trait;
4use sqlx::PgPool;
5use tracing::{debug, error, instrument, warn};
6
7use crate::config::PgmqNotifyConfig;
8use crate::error::{PgmqNotifyError, Result};
9use crate::events::{
10 BatchReadyEvent, MessageReadyEvent, MessageWithPayloadEvent, PgmqNotifyEvent, QueueCreatedEvent,
11};
12
13#[async_trait]
15pub trait PgmqNotifyEmitter: Send + Sync {
16 async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()>;
18
19 async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()>;
21
22 async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()>;
24
25 async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()>;
27
28 async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()>;
30
31 async fn is_healthy(&self) -> bool;
33}
34
35pub struct DbEmitter {
37 pool: PgPool,
38 config: PgmqNotifyConfig,
39}
40
41impl std::fmt::Debug for DbEmitter {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("DbEmitter")
44 .field("config", &self.config)
45 .field("pool", &"PgPool")
46 .finish()
47 }
48}
49
50impl DbEmitter {
51 pub fn new(pool: PgPool, config: PgmqNotifyConfig) -> Result<Self> {
53 config.validate()?;
54 Ok(Self { pool, config })
55 }
56
57 #[must_use]
59 pub fn config(&self) -> &PgmqNotifyConfig {
60 &self.config
61 }
62
63 fn build_payload(&self, event: &PgmqNotifyEvent) -> Result<String> {
65 let payload = if self.config.include_metadata {
66 serde_json::to_string(event)?
67 } else {
68 let minimal_event = match event {
70 PgmqNotifyEvent::QueueCreated(e) => {
71 PgmqNotifyEvent::QueueCreated(QueueCreatedEvent {
72 queue_name: e.queue_name.clone(),
73 namespace: e.namespace.clone(),
74 created_at: e.created_at,
75 metadata: std::collections::HashMap::new(),
76 })
77 }
78 PgmqNotifyEvent::MessageReady(e) => {
79 PgmqNotifyEvent::MessageReady(MessageReadyEvent {
80 msg_id: e.msg_id,
81 queue_name: e.queue_name.clone(),
82 namespace: e.namespace.clone(),
83 ready_at: e.ready_at,
84 metadata: std::collections::HashMap::new(),
85 visibility_timeout_seconds: e.visibility_timeout_seconds,
86 })
87 }
88 PgmqNotifyEvent::BatchReady(e) => PgmqNotifyEvent::BatchReady(BatchReadyEvent {
89 msg_ids: e.msg_ids.clone(),
90 queue_name: e.queue_name.clone(),
91 namespace: e.namespace.clone(),
92 message_count: e.message_count,
93 ready_at: e.ready_at,
94 metadata: std::collections::HashMap::new(),
95 delay_seconds: e.delay_seconds,
96 }),
97 PgmqNotifyEvent::MessageWithPayload(e) => {
99 PgmqNotifyEvent::MessageWithPayload(e.clone())
100 }
101 };
102 serde_json::to_string(&minimal_event)?
103 };
104
105 if payload.len() > self.config.max_payload_size {
107 return Err(PgmqNotifyError::config(format!(
108 "Payload size {} exceeds limit {}",
109 payload.len(),
110 self.config.max_payload_size
111 )));
112 }
113
114 Ok(payload)
115 }
116
117 #[instrument(skip(self, payload), fields(channel = %channel))]
119 async fn notify_channel(&self, channel: &str, payload: &str) -> Result<()> {
120 debug!("Sending notification to channel: {}", channel);
121
122 let sql = format!("NOTIFY {}, $1", channel);
123 sqlx::query(&sql)
124 .bind(payload)
125 .execute(&self.pool)
126 .await
127 .map_err(|e| {
128 error!("Failed to send notification to channel {}: {}", channel, e);
129 PgmqNotifyError::Database(e)
130 })?;
131
132 debug!("Successfully sent notification to channel: {}", channel);
133 Ok(())
134 }
135}
136
137#[async_trait]
138impl PgmqNotifyEmitter for DbEmitter {
139 #[instrument(skip(self, event), fields(queue = %event.queue_name, namespace = %event.namespace))]
140 async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()> {
141 let pgmq_event = PgmqNotifyEvent::QueueCreated(event);
142 let payload = self.build_payload(&pgmq_event)?;
143 let channel = self.config.queue_created_channel()?;
144
145 self.notify_channel(&channel, &payload).await
146 }
147
148 #[instrument(skip(self, event), fields(msg_id = %event.msg_id, queue = %event.queue_name, namespace = %event.namespace))]
149 async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()> {
150 let namespace = event.namespace.clone();
151 let pgmq_event = PgmqNotifyEvent::MessageReady(event);
152 let payload = self.build_payload(&pgmq_event)?;
153
154 let namespace_channel = self.config.message_ready_channel(&namespace)?;
156 let global_channel = self.config.global_message_ready_channel()?;
157
158 self.notify_channel(&namespace_channel, &payload).await?;
160
161 if namespace_channel != global_channel {
163 self.notify_channel(&global_channel, &payload).await?;
164 }
165
166 Ok(())
167 }
168
169 #[instrument(skip(self, event), fields(msg_count = %event.message_count, queue = %event.queue_name, namespace = %event.namespace))]
170 async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()> {
171 let namespace = event.namespace.clone();
172 let pgmq_event = PgmqNotifyEvent::BatchReady(event);
173 let payload = self.build_payload(&pgmq_event)?;
174
175 let namespace_channel = self.config.message_ready_channel(&namespace)?;
177 let global_channel = self.config.global_message_ready_channel()?;
178
179 self.notify_channel(&namespace_channel, &payload).await?;
181
182 if namespace_channel != global_channel {
184 self.notify_channel(&global_channel, &payload).await?;
185 }
186
187 Ok(())
188 }
189
190 #[instrument(skip(self, event), fields(msg_id = %event.msg_id, queue = %event.queue_name, namespace = %event.namespace))]
191 async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()> {
192 let namespace = event.namespace.clone();
193 let pgmq_event = PgmqNotifyEvent::MessageWithPayload(event);
194 let payload = self.build_payload(&pgmq_event)?;
195
196 let namespace_channel = self.config.message_ready_channel(&namespace)?;
198 let global_channel = self.config.global_message_ready_channel()?;
199
200 self.notify_channel(&namespace_channel, &payload).await?;
202
203 if namespace_channel != global_channel {
205 self.notify_channel(&global_channel, &payload).await?;
206 }
207
208 Ok(())
209 }
210
211 async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()> {
212 match event {
213 PgmqNotifyEvent::QueueCreated(e) => self.emit_queue_created(e).await,
214 PgmqNotifyEvent::MessageReady(e) => self.emit_message_ready(e).await,
215 PgmqNotifyEvent::MessageWithPayload(e) => self.emit_message_with_payload(e).await,
216 PgmqNotifyEvent::BatchReady(e) => self.emit_batch_ready(e).await,
217 }
218 }
219
220 async fn is_healthy(&self) -> bool {
221 match self.pool.acquire().await {
223 Ok(_) => true,
224 Err(e) => {
225 warn!("Database emitter health check failed: {}", e);
226 false
227 }
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
234pub struct NoopEmitter {
235 config: PgmqNotifyConfig,
236}
237
238impl NoopEmitter {
239 pub fn new(config: PgmqNotifyConfig) -> Result<Self> {
241 config.validate()?;
242 Ok(Self { config })
243 }
244
245 #[must_use]
247 pub fn config(&self) -> &PgmqNotifyConfig {
248 &self.config
249 }
250}
251
252#[async_trait]
253impl PgmqNotifyEmitter for NoopEmitter {
254 async fn emit_queue_created(&self, event: QueueCreatedEvent) -> Result<()> {
255 debug!(
256 "NoopEmitter: Would emit queue created event for {}",
257 event.queue_name
258 );
259 Ok(())
260 }
261
262 async fn emit_message_ready(&self, event: MessageReadyEvent) -> Result<()> {
263 debug!(
264 "NoopEmitter: Would emit message ready event for msg {} in queue {}",
265 event.msg_id, event.queue_name
266 );
267 Ok(())
268 }
269
270 async fn emit_batch_ready(&self, event: BatchReadyEvent) -> Result<()> {
271 debug!(
272 "NoopEmitter: Would emit batch ready event for {} messages in queue {}",
273 event.message_count, event.queue_name
274 );
275 Ok(())
276 }
277
278 async fn emit_message_with_payload(&self, event: MessageWithPayloadEvent) -> Result<()> {
279 debug!(
280 "NoopEmitter: Would emit message with payload event for msg {} in queue {}",
281 event.msg_id, event.queue_name
282 );
283 Ok(())
284 }
285
286 async fn emit_event(&self, event: PgmqNotifyEvent) -> Result<()> {
287 debug!("NoopEmitter: Would emit event: {:?}", event.event_type());
288 Ok(())
289 }
290
291 async fn is_healthy(&self) -> bool {
292 true }
294}
295
296#[derive(Debug)]
298pub struct EmitterFactory;
299
300impl EmitterFactory {
301 #[expect(
303 dead_code,
304 reason = "emitter factory API - module restricted per TAS-187"
305 )]
306 pub fn database(pool: PgPool, config: PgmqNotifyConfig) -> Result<DbEmitter> {
307 DbEmitter::new(pool, config)
308 }
309
310 #[allow(
312 dead_code,
313 reason = "emitter factory API - module restricted per TAS-187"
314 )]
315 pub fn noop(config: PgmqNotifyConfig) -> Result<NoopEmitter> {
316 NoopEmitter::new(config)
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use std::collections::HashMap;
324
325 #[tokio::test]
326 async fn test_noop_emitter() {
327 let config = PgmqNotifyConfig::default();
328 let emitter = NoopEmitter::new(config).unwrap();
329
330 let queue_event = QueueCreatedEvent::new("test_queue", "test");
331 let message_event = MessageReadyEvent::new(123, "test_queue", "test");
332
333 assert!(emitter.emit_queue_created(queue_event).await.is_ok());
335 assert!(emitter.emit_message_ready(message_event).await.is_ok());
336 assert!(emitter.is_healthy().await);
337 }
338
339 #[test]
340 fn test_payload_building() {
341 let config = PgmqNotifyConfig::default();
342 assert!(config.validate().is_ok());
344 }
345
346 #[test]
347 fn test_payload_size_limit() {
348 let mut large_metadata = HashMap::new();
349 for i in 0..1000 {
350 large_metadata.insert(format!("key_{}", i), "x".repeat(100));
351 }
352
353 let event = QueueCreatedEvent::new("test_queue", "test").with_metadata(large_metadata);
354
355 let pgmq_event = PgmqNotifyEvent::QueueCreated(event);
356 let json = serde_json::to_string(&pgmq_event).unwrap();
357
358 assert!(json.len() > 7800);
360 }
361
362 #[tokio::test]
363 async fn test_emitter_factory() {
364 let config = PgmqNotifyConfig::default();
365
366 let noop = EmitterFactory::noop(config.clone()).unwrap();
368 assert!(noop.is_healthy().await);
369 }
370
371 #[test]
372 fn test_channel_naming() {
373 let config = PgmqNotifyConfig::new().with_channels_prefix("test_app");
374
375 assert_eq!(
376 config.queue_created_channel().unwrap(),
377 "test_app.pgmq_queue_created"
378 );
379 assert_eq!(
380 config.message_ready_channel("orders").unwrap(),
381 "test_app.pgmq_message_ready.orders"
382 );
383 assert_eq!(
384 config.global_message_ready_channel().unwrap(),
385 "test_app.pgmq_message_ready"
386 );
387 }
388}