qail_pg/driver/notification.rs
1//! LISTEN/NOTIFY support for PostgreSQL connections.
2//!
3//! PostgreSQL sends `NotificationResponse` messages asynchronously when
4//! a channel the connection is LISTENing on receives a NOTIFY.
5//!
6//! This module provides:
7//! - `Notification` struct — channel name + payload
8//! - `listen()` / `unlisten()` — subscribe/unsubscribe to channels
9//! - `poll_notifications()` — drain buffered notifications (non-blocking)
10//! - `recv_notification()` — block-wait for the next notification
11
12use super::{PgConnection, PgResult};
13use crate::protocol::PgEncoder;
14use tokio::io::AsyncWriteExt;
15
16/// A notification received from PostgreSQL LISTEN/NOTIFY.
17#[derive(Debug, Clone)]
18pub struct Notification {
19 /// The PID of the notifying backend process
20 pub process_id: i32,
21 /// The channel name
22 pub channel: String,
23 /// The payload (may be empty)
24 pub payload: String,
25}
26
27impl PgConnection {
28 /// Subscribe to a notification channel.
29 ///
30 /// ```ignore
31 /// conn.listen("price_calendar_changed").await?;
32 /// ```
33 pub async fn listen(&mut self, channel: &str) -> PgResult<()> {
34 // Channel names are identifiers, quote them to prevent injection
35 let sql = format!("LISTEN \"{}\"", channel.replace('"', "\"\""));
36 self.execute_simple(&sql).await
37 }
38
39 /// Unsubscribe from a notification channel.
40 pub async fn unlisten(&mut self, channel: &str) -> PgResult<()> {
41 let sql = format!("UNLISTEN \"{}\"", channel.replace('"', "\"\""));
42 self.execute_simple(&sql).await
43 }
44
45 /// Unsubscribe from all notification channels.
46 pub async fn unlisten_all(&mut self) -> PgResult<()> {
47 self.execute_simple("UNLISTEN *").await
48 }
49
50 /// Drain all buffered notifications without blocking.
51 ///
52 /// Notifications arrive asynchronously from PostgreSQL and are buffered
53 /// whenever `recv()` encounters a `NotificationResponse`. This method
54 /// returns all currently buffered notifications.
55 pub fn poll_notifications(&mut self) -> Vec<Notification> {
56 self.notifications.drain(..).collect()
57 }
58
59 /// Wait for the next notification, blocking until one arrives.
60 ///
61 /// Unlike `recv()`, this does NOT use the 30-second Slowloris timeout
62 /// guard. LISTEN connections idle for long periods — that's normal,
63 /// not a DoS attack.
64 ///
65 /// Useful for a dedicated LISTEN connection in a background task.
66 pub async fn recv_notification(&mut self) -> PgResult<Notification> {
67 use crate::protocol::BackendMessage;
68 use tokio::io::AsyncReadExt;
69
70 // Return buffered notification immediately if available
71 if let Some(n) = self.notifications.pop_front() {
72 return Ok(n);
73 }
74
75 // Send empty query to flush any pending notifications from server
76 let bytes = PgEncoder::encode_query_string("");
77 self.stream.write_all(&bytes).await?;
78
79 // Read messages — use recv() for the initial empty query response
80 // (which completes quickly), then switch to no-timeout reads
81 let mut got_ready = false;
82 loop {
83 // Try to decode from the existing buffer first
84 if self.buffer.len() >= 5 {
85 let msg_len = u32::from_be_bytes([
86 self.buffer[1],
87 self.buffer[2],
88 self.buffer[3],
89 self.buffer[4],
90 ]) as usize;
91
92 if self.buffer.len() > msg_len {
93 let msg_bytes = self.buffer.split_to(msg_len + 1);
94 let (msg, _) = BackendMessage::decode(&msg_bytes)
95 .map_err(super::PgError::Protocol)?;
96
97 match msg {
98 BackendMessage::NotificationResponse { process_id, channel, payload } => {
99 return Ok(Notification { process_id, channel, payload });
100 }
101 BackendMessage::EmptyQueryResponse => continue,
102 BackendMessage::ReadyForQuery(_) => {
103 got_ready = true;
104 // Check buffer for notifications that arrived with this batch
105 if let Some(n) = self.notifications.pop_front() {
106 return Ok(n);
107 }
108 continue;
109 }
110 _ => continue,
111 }
112 }
113 }
114
115 // Read from socket — use tokio read (no timeout!) if we've
116 // already gotten ReadyForQuery (now we're just waiting for NOTIFY)
117 if self.buffer.capacity() - self.buffer.len() < 65536 {
118 self.buffer.reserve(131072);
119 }
120
121 if got_ready {
122 // No timeout — LISTEN connections idle for hours, that's fine
123 let n = self.stream.read_buf(&mut self.buffer).await
124 .map_err(|e| super::PgError::Connection(format!("Read error: {e}")))?;
125 if n == 0 {
126 return Err(super::PgError::Connection("Connection closed".to_string()));
127 }
128 } else {
129 // Initial flush — use the normal timeout to avoid hanging
130 // if the server is unresponsive during the empty query
131 let n = self.read_with_timeout().await?;
132 if n == 0 {
133 return Err(super::PgError::Connection("Connection closed".to_string()));
134 }
135 }
136 }
137 }
138}