1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
use futures::{try_ready, Future, Poll}; use postgres_protocol::message::frontend; use state_machine_future::{transition, RentToOwn, StateMachineFuture}; use tokio_io::io::{self, Flush, WriteAll}; use tokio_io::{AsyncRead, AsyncWrite}; use crate::config::SslMode; use crate::error::Error; use crate::proto::{MaybeTlsStream, TlsFuture}; use crate::TlsConnect; #[derive(StateMachineFuture)] pub enum CancelQueryRaw<S, T> where S: AsyncRead + AsyncWrite, T: TlsConnect<S>, { #[state_machine_future(start, transitions(SendingCancel))] Start { future: TlsFuture<S, T>, process_id: i32, secret_key: i32, }, #[state_machine_future(transitions(FlushingCancel))] SendingCancel { future: WriteAll<MaybeTlsStream<S, T::Stream>, Vec<u8>>, }, #[state_machine_future(transitions(Finished))] FlushingCancel { future: Flush<MaybeTlsStream<S, T::Stream>>, }, #[state_machine_future(ready)] Finished(()), #[state_machine_future(error)] Failed(Error), } impl<S, T> PollCancelQueryRaw<S, T> for CancelQueryRaw<S, T> where S: AsyncRead + AsyncWrite, T: TlsConnect<S>, { fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start<S, T>>) -> Poll<AfterStart<S, T>, Error> { let (stream, _) = try_ready!(state.future.poll()); let mut buf = vec![]; frontend::cancel_request(state.process_id, state.secret_key, &mut buf); transition!(SendingCancel { future: io::write_all(stream, buf), }) } fn poll_sending_cancel<'a>( state: &'a mut RentToOwn<'a, SendingCancel<S, T>>, ) -> Poll<AfterSendingCancel<S, T>, Error> { let (stream, _) = try_ready_closed!(state.future.poll()); transition!(FlushingCancel { future: io::flush(stream), }) } fn poll_flushing_cancel<'a>( state: &'a mut RentToOwn<'a, FlushingCancel<S, T>>, ) -> Poll<AfterFlushingCancel, Error> { try_ready_closed!(state.future.poll()); transition!(Finished(())) } } impl<S, T> CancelQueryRawFuture<S, T> where S: AsyncRead + AsyncWrite, T: TlsConnect<S>, { pub fn new( stream: S, mode: SslMode, tls: T, process_id: i32, secret_key: i32, ) -> CancelQueryRawFuture<S, T> { CancelQueryRaw::start(TlsFuture::new(stream, mode, tls), process_id, secret_key) } }