use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::ready;
use tokio::task::JoinHandle;
use crate::client::SftpClient;
impl SftpClient {
pub fn stop(&mut self) -> impl Future<Output = ()> + Drop + Send + Sync + '_ {
SftpClientStopping::new(self)
}
pub fn is_stopped(&self) -> bool {
self.commands.is_none()
}
}
impl Drop for SftpClient {
fn drop(&mut self) {
let stop = SftpClientStopping::new(self);
if !stop.is_stopped() {
futures::executor::block_on(stop);
}
}
}
pub(super) struct SftpClientStopping<'a> {
client: &'a mut SftpClient,
request_processor: Option<JoinHandle<()>>,
}
impl<'a> SftpClientStopping<'a> {
pub(super) fn new(client: &'a mut SftpClient) -> SftpClientStopping<'_> {
client.commands = None;
if let Some(request_processor) = client.request_processor.take() {
if let Some(request_processor) = Arc::into_inner(request_processor) {
log::trace!("Waiting for client to stop");
return SftpClientStopping {
client,
request_processor: Some(request_processor),
};
}
}
SftpClientStopping {
client,
request_processor: None,
}
}
pub(super) fn is_stopped(&self) -> bool {
self.request_processor.is_none()
}
}
impl Future for SftpClientStopping<'_> {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if let Some(request_processor) = &mut self.request_processor {
_ = ready!(Pin::new(request_processor).poll(cx));
}
self.request_processor = None;
std::task::Poll::Ready(())
}
}
impl Drop for SftpClientStopping<'_> {
fn drop(&mut self) {
if let Some(request_processor) = self.request_processor.take() {
self.client.request_processor = Some(Arc::new(request_processor))
}
}
}