Skip to main content

pg_wired/
cancel.rs

1//! Query cancellation via CancelRequest.
2//!
3//! PostgreSQL cancellation works by opening a NEW TCP connection to the server
4//! and sending a 16-byte CancelRequest message containing the backend PID and
5//! secret key from the original connection's BackendKeyData.
6
7use bytes::{BufMut, BytesMut};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10
11use crate::error::PgWireError;
12
13/// A token that can cancel a running query on a specific backend.
14/// Cloneable and Send, can be passed to another task or stored for timeout handling.
15///
16/// The backend secret is held internally and is not exposed through any
17/// accessor; obtain a token from an `AsyncConn` / `WireConn` rather than
18/// constructing one with raw fields.
19#[derive(Debug, Clone)]
20pub struct CancelToken {
21    addr: String,
22    pid: i32,
23    secret: i32,
24}
25
26impl CancelToken {
27    /// Construct a cancel token from raw parts. Intended for callers that
28    /// persisted the three values across processes; most users should get a
29    /// token from `AsyncConn::cancel_token()` instead.
30    pub fn new(addr: String, pid: i32, secret: i32) -> Self {
31        Self { addr, pid, secret }
32    }
33
34    /// Server address this token targets.
35    pub fn addr(&self) -> &str {
36        &self.addr
37    }
38
39    /// Backend process ID this token cancels.
40    pub fn pid(&self) -> i32 {
41        self.pid
42    }
43
44    /// Send a cancel request to PostgreSQL.
45    ///
46    /// Opens a new TCP connection, sends the 16-byte CancelRequest, and closes.
47    /// The server will attempt to cancel the current query on the target backend.
48    ///
49    /// Note: cancellation is best-effort. The query may complete before the cancel
50    /// arrives, or the server may not be able to cancel it immediately.
51    ///
52    /// Times out after 5 seconds to prevent hanging if the server is unreachable.
53    pub async fn cancel(&self) -> Result<(), PgWireError> {
54        const CANCEL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
55
56        let result = tokio::time::timeout(CANCEL_TIMEOUT, async {
57            let mut stream = TcpStream::connect(&self.addr).await?;
58
59            // CancelRequest: no message type tag.
60            // Length(i32) = 16, RequestCode(i32) = 80877102, PID(i32), Secret(i32)
61            let mut buf = BytesMut::with_capacity(16);
62            buf.put_i32(16); // total message length
63            buf.put_i32(80877102); // cancel request code (1234 << 16 | 5678)
64            buf.put_i32(self.pid);
65            buf.put_i32(self.secret);
66
67            stream.write_all(&buf).await?;
68            stream.shutdown().await?;
69            Ok::<(), PgWireError>(())
70        })
71        .await;
72
73        match result {
74            Ok(inner) => inner,
75            Err(_elapsed) => Err(PgWireError::Protocol(
76                "cancel request timed out after 5 seconds".into(),
77            )),
78        }
79    }
80}