Skip to main content

qail_pg/driver/
cancel.rs

1//! Query cancellation methods for PostgreSQL connection.
2
3use super::{CANCEL_REQUEST_CODE, PgConnection, PgResult};
4use tokio::io::AsyncWriteExt;
5use tokio::net::TcpStream;
6
7/// A token that can be used to cancel a running query.
8/// This token is safe to send across threads and does not borrow the connection.
9#[derive(Debug, Clone)]
10pub struct CancelToken {
11    pub(crate) host: String,
12    pub(crate) port: u16,
13    pub(crate) process_id: i32,
14    pub(crate) secret_key: i32,
15}
16
17impl CancelToken {
18    /// Attempt to cancel the ongoing query.
19    /// This opens a new TCP connection and sends a CancelRequest message.
20    pub async fn cancel_query(&self) -> PgResult<()> {
21        PgConnection::cancel_query(&self.host, self.port, self.process_id, self.secret_key).await
22    }
23}
24
25impl PgConnection {
26    /// Get the cancel key for this connection.
27    pub fn get_cancel_key(&self) -> (i32, i32) {
28        (self.process_id, self.secret_key)
29    }
30
31    /// Cancel a running query on a PostgreSQL backend.
32    /// This opens a new TCP connection and sends a CancelRequest message.
33    /// The original connection continues running but the query is interrupted.
34    pub async fn cancel_query(
35        host: &str,
36        port: u16,
37        process_id: i32,
38        secret_key: i32,
39    ) -> PgResult<()> {
40        // Open new connection just for cancel
41        let addr = format!("{}:{}", host, port);
42        let mut stream = TcpStream::connect(&addr).await?;
43
44        // Send CancelRequest message:
45        // Length (16) + CancelRequest code (80877102) + process_id + secret_key
46        let mut buf = [0u8; 16];
47        buf[0..4].copy_from_slice(&16i32.to_be_bytes()); // Length
48        buf[4..8].copy_from_slice(&CANCEL_REQUEST_CODE.to_be_bytes());
49        buf[8..12].copy_from_slice(&process_id.to_be_bytes());
50        buf[12..16].copy_from_slice(&secret_key.to_be_bytes());
51
52        stream.write_all(&buf).await?;
53
54        // Server will close connection after receiving cancel request
55        Ok(())
56    }
57}