qail_pg/driver/
cancel.rs

1//! Query cancellation methods for PostgreSQL connection.
2
3use super::{CANCEL_REQUEST_CODE, PgConnection, PgResult};
4use tokio::io::AsyncWriteExt;
5use tokio::net::TcpStream;
6
7/// A token that can be used to cancel a running query.
8/// This token is safe to send across threads and does not borrow the connection.
9#[derive(Debug, Clone)]
10pub struct CancelToken {
11    pub(crate) host: String,
12    pub(crate) port: u16,
13    pub(crate) process_id: i32,
14    pub(crate) secret_key: i32,
15}
16
17impl CancelToken {
18    /// Attempt to cancel the ongoing query.
19    /// This opens a new TCP connection and sends a CancelRequest message.
20    pub async fn cancel_query(&self) -> PgResult<()> {
21        PgConnection::cancel_query(
22            &self.host,
23            self.port,
24            self.process_id,
25            self.secret_key
26        ).await
27    }
28}
29
30impl PgConnection {
31    /// Get the cancel key for this connection.
32    pub fn get_cancel_key(&self) -> (i32, i32) {
33        (self.process_id, self.secret_key)
34    }
35
36    /// Cancel a running query on a PostgreSQL backend.
37    /// This opens a new TCP connection and sends a CancelRequest message.
38    /// The original connection continues running but the query is interrupted.
39    pub async fn cancel_query(
40        host: &str,
41        port: u16,
42        process_id: i32,
43        secret_key: i32,
44    ) -> PgResult<()> {
45        // Open new connection just for cancel
46        let addr = format!("{}:{}", host, port);
47        let mut stream = TcpStream::connect(&addr).await?;
48
49        // Send CancelRequest message:
50        // Length (16) + CancelRequest code (80877102) + process_id + secret_key
51        let mut buf = [0u8; 16];
52        buf[0..4].copy_from_slice(&16i32.to_be_bytes()); // Length
53        buf[4..8].copy_from_slice(&CANCEL_REQUEST_CODE.to_be_bytes());
54        buf[8..12].copy_from_slice(&process_id.to_be_bytes());
55        buf[12..16].copy_from_slice(&secret_key.to_be_bytes());
56
57        stream.write_all(&buf).await?;
58
59        // Server will close connection after receiving cancel request
60        Ok(())
61    }
62}