1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use futures::{try_ready, Future, Poll};
use postgres_protocol::message::frontend;
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
use tokio_io::io::{self, Flush, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite};

use crate::config::SslMode;
use crate::error::Error;
use crate::proto::{MaybeTlsStream, TlsFuture};
use crate::TlsConnect;

#[derive(StateMachineFuture)]
pub enum CancelQueryRaw<S, T>
where
    S: AsyncRead + AsyncWrite,
    T: TlsConnect<S>,
{
    #[state_machine_future(start, transitions(SendingCancel))]
    Start {
        future: TlsFuture<S, T>,
        process_id: i32,
        secret_key: i32,
    },
    #[state_machine_future(transitions(FlushingCancel))]
    SendingCancel {
        future: WriteAll<MaybeTlsStream<S, T::Stream>, Vec<u8>>,
    },
    #[state_machine_future(transitions(Finished))]
    FlushingCancel {
        future: Flush<MaybeTlsStream<S, T::Stream>>,
    },
    #[state_machine_future(ready)]
    Finished(()),
    #[state_machine_future(error)]
    Failed(Error),
}

impl<S, T> PollCancelQueryRaw<S, T> for CancelQueryRaw<S, T>
where
    S: AsyncRead + AsyncWrite,
    T: TlsConnect<S>,
{
    fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start<S, T>>) -> Poll<AfterStart<S, T>, Error> {
        let (stream, _) = try_ready!(state.future.poll());

        let mut buf = vec![];
        frontend::cancel_request(state.process_id, state.secret_key, &mut buf);

        transition!(SendingCancel {
            future: io::write_all(stream, buf),
        })
    }

    fn poll_sending_cancel<'a>(
        state: &'a mut RentToOwn<'a, SendingCancel<S, T>>,
    ) -> Poll<AfterSendingCancel<S, T>, Error> {
        let (stream, _) = try_ready_closed!(state.future.poll());

        transition!(FlushingCancel {
            future: io::flush(stream),
        })
    }

    fn poll_flushing_cancel<'a>(
        state: &'a mut RentToOwn<'a, FlushingCancel<S, T>>,
    ) -> Poll<AfterFlushingCancel, Error> {
        try_ready_closed!(state.future.poll());
        transition!(Finished(()))
    }
}

impl<S, T> CancelQueryRawFuture<S, T>
where
    S: AsyncRead + AsyncWrite,
    T: TlsConnect<S>,
{
    pub fn new(
        stream: S,
        mode: SslMode,
        tls: T,
        process_id: i32,
        secret_key: i32,
    ) -> CancelQueryRawFuture<S, T> {
        CancelQueryRaw::start(TlsFuture::new(stream, mode, tls), process_id, secret_key)
    }
}