pg_wired/cancel.rs
1//! Query cancellation via CancelRequest.
2//!
3//! PostgreSQL cancellation works by opening a NEW TCP connection to the server
4//! and sending a 16-byte CancelRequest message containing the backend PID and
5//! secret key from the original connection's BackendKeyData.
6
7use bytes::{BufMut, BytesMut};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10
11use crate::error::PgWireError;
12
13/// A token that can cancel a running query on a specific backend.
14/// Cloneable and Send, can be passed to another task or stored for timeout handling.
15///
16/// The backend secret is held internally and is not exposed through any
17/// accessor; obtain a token from an `AsyncConn` / `WireConn` rather than
18/// constructing one with raw fields.
19#[derive(Debug, Clone)]
20pub struct CancelToken {
21 addr: String,
22 pid: i32,
23 secret: i32,
24}
25
26impl CancelToken {
27 /// Construct a cancel token from raw parts. Intended for callers that
28 /// persisted the three values across processes; most users should get a
29 /// token from `AsyncConn::cancel_token()` instead.
30 pub fn new(addr: String, pid: i32, secret: i32) -> Self {
31 Self { addr, pid, secret }
32 }
33
34 /// Server address this token targets.
35 pub fn addr(&self) -> &str {
36 &self.addr
37 }
38
39 /// Backend process ID this token cancels.
40 pub fn pid(&self) -> i32 {
41 self.pid
42 }
43
44 /// Send a cancel request to PostgreSQL.
45 ///
46 /// Opens a new TCP connection, sends the 16-byte CancelRequest, and closes.
47 /// The server will attempt to cancel the current query on the target backend.
48 ///
49 /// Note: cancellation is best-effort. The query may complete before the cancel
50 /// arrives, or the server may not be able to cancel it immediately.
51 ///
52 /// Times out after 5 seconds to prevent hanging if the server is unreachable.
53 pub async fn cancel(&self) -> Result<(), PgWireError> {
54 const CANCEL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
55
56 let result = tokio::time::timeout(CANCEL_TIMEOUT, async {
57 let mut stream = TcpStream::connect(&self.addr).await?;
58
59 // CancelRequest: no message type tag.
60 // Length(i32) = 16, RequestCode(i32) = 80877102, PID(i32), Secret(i32)
61 let mut buf = BytesMut::with_capacity(16);
62 buf.put_i32(16); // total message length
63 buf.put_i32(80877102); // cancel request code (1234 << 16 | 5678)
64 buf.put_i32(self.pid);
65 buf.put_i32(self.secret);
66
67 stream.write_all(&buf).await?;
68 stream.shutdown().await?;
69 Ok::<(), PgWireError>(())
70 })
71 .await;
72
73 match result {
74 Ok(inner) => inner,
75 Err(_elapsed) => Err(PgWireError::Protocol(
76 "cancel request timed out after 5 seconds".into(),
77 )),
78 }
79 }
80}