Skip to main content

pg_queue/
listen.rs

1use crate::errors::{PgQueueError, Result};
2use sqlx::postgres::PgListener;
3use sqlx::PgPool;
4use std::time::Duration;
5use tokio::time::timeout;
6
7/// A notification received from PostgreSQL LISTEN
8#[derive(Debug, Clone)]
9pub struct Notification {
10    pub channel: String,
11    pub payload: String,
12}
13
14/// Service for listening to PostgreSQL NOTIFY messages
15pub struct ListenerService {
16    listener: PgListener,
17}
18
19impl ListenerService {
20    /// Create a new listener service connected to the database
21    pub async fn new(pool: &PgPool) -> Result<Self> {
22        let listener = PgListener::connect_with(pool).await?;
23        Ok(Self { listener })
24    }
25
26    /// Subscribe to a channel
27    pub async fn listen(&mut self, channel: &str) -> Result<()> {
28        self.listener.listen(channel).await?;
29        Ok(())
30    }
31
32    /// Subscribe to multiple channels
33    pub async fn listen_all(&mut self, channels: &[&str]) -> Result<()> {
34        self.listener.listen_all(channels.iter().copied()).await?;
35        Ok(())
36    }
37
38    /// Unsubscribe from a channel
39    pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
40        self.listener.unlisten(channel).await?;
41        Ok(())
42    }
43
44    /// Unsubscribe from all channels
45    pub async fn unlisten_all(&mut self) -> Result<()> {
46        self.listener.unlisten_all().await?;
47        Ok(())
48    }
49
50    /// Wait for the next notification (blocking)
51    pub async fn recv(&mut self) -> Result<Notification> {
52        let notification = self.listener.recv().await?;
53        Ok(Notification {
54            channel: notification.channel().to_string(),
55            payload: notification.payload().to_string(),
56        })
57    }
58
59    /// Wait for a notification with timeout
60    pub async fn recv_timeout(&mut self, duration: Duration) -> Result<Option<Notification>> {
61        match timeout(duration, self.listener.recv()).await {
62            Ok(Ok(notification)) => Ok(Some(Notification {
63                channel: notification.channel().to_string(),
64                payload: notification.payload().to_string(),
65            })),
66            Ok(Err(e)) => Err(PgQueueError::Database(e)),
67            Err(_) => Ok(None), // Timeout
68        }
69    }
70
71    /// Try to receive without blocking (with zero timeout)
72    pub async fn try_recv(&mut self) -> Option<Notification> {
73        self.recv_timeout(Duration::from_millis(0))
74            .await
75            .ok()
76            .flatten()
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83
84    #[test]
85    fn test_notification_struct() {
86        let notification = Notification {
87            channel: "my_channel".to_string(),
88            payload: r#"{"key": "value"}"#.to_string(),
89        };
90
91        assert_eq!(notification.channel, "my_channel");
92        assert_eq!(notification.payload, r#"{"key": "value"}"#);
93    }
94}