1use crate::errors::{PgQueueError, Result};
2use sqlx::postgres::PgListener;
3use sqlx::PgPool;
4use std::time::Duration;
5use tokio::time::timeout;
6
7#[derive(Debug, Clone)]
9pub struct Notification {
10 pub channel: String,
11 pub payload: String,
12}
13
14pub struct ListenerService {
16 listener: PgListener,
17}
18
19impl ListenerService {
20 pub async fn new(pool: &PgPool) -> Result<Self> {
22 let listener = PgListener::connect_with(pool).await?;
23 Ok(Self { listener })
24 }
25
26 pub async fn listen(&mut self, channel: &str) -> Result<()> {
28 self.listener.listen(channel).await?;
29 Ok(())
30 }
31
32 pub async fn listen_all(&mut self, channels: &[&str]) -> Result<()> {
34 self.listener.listen_all(channels.iter().copied()).await?;
35 Ok(())
36 }
37
38 pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
40 self.listener.unlisten(channel).await?;
41 Ok(())
42 }
43
44 pub async fn unlisten_all(&mut self) -> Result<()> {
46 self.listener.unlisten_all().await?;
47 Ok(())
48 }
49
50 pub async fn recv(&mut self) -> Result<Notification> {
52 let notification = self.listener.recv().await?;
53 Ok(Notification {
54 channel: notification.channel().to_string(),
55 payload: notification.payload().to_string(),
56 })
57 }
58
59 pub async fn recv_timeout(&mut self, duration: Duration) -> Result<Option<Notification>> {
61 match timeout(duration, self.listener.recv()).await {
62 Ok(Ok(notification)) => Ok(Some(Notification {
63 channel: notification.channel().to_string(),
64 payload: notification.payload().to_string(),
65 })),
66 Ok(Err(e)) => Err(PgQueueError::Database(e)),
67 Err(_) => Ok(None), }
69 }
70
71 pub async fn try_recv(&mut self) -> Option<Notification> {
73 self.recv_timeout(Duration::from_millis(0))
74 .await
75 .ok()
76 .flatten()
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83
84 #[test]
85 fn test_notification_struct() {
86 let notification = Notification {
87 channel: "my_channel".to_string(),
88 payload: r#"{"key": "value"}"#.to_string(),
89 };
90
91 assert_eq!(notification.channel, "my_channel");
92 assert_eq!(notification.payload, r#"{"key": "value"}"#);
93 }
94}