bollard 0.14.0

An asynchronous Docker daemon API
Documentation
#![cfg(windows)]

use hyper::client::connect::Connected;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient};
use tokio::time;

use std::ffi::OsStr;
use std::future::Future;
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use winapi::shared::winerror;

use crate::docker::ClientType;
use crate::uri::Uri;

pin_project! {
    pub struct NamedPipeStream {
        #[pin]
        io: NamedPipeClient,
    }
}

impl NamedPipeStream {
    pub async fn connect<A>(addr: A) -> Result<NamedPipeStream, io::Error>
    where
        A: AsRef<Path> + AsRef<OsStr>,
    {
        let opts = ClientOptions::new();

        let client = loop {
            match opts.open(&addr) {
                Ok(client) => break client,
                Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
                Err(e) => return Err(e),
            };

            time::sleep(Duration::from_millis(50)).await;
        };

        Ok(NamedPipeStream { io: client })
    }
}

impl AsyncRead for NamedPipeStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_read(cx, buf)
    }
}

impl AsyncWrite for NamedPipeStream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.io).poll_write(cx, buf)
    }

    fn poll_write_vectored(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        bufs: &[io::IoSlice<'_>],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.io).poll_write_vectored(cx, bufs)
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.poll_flush(cx)
    }
}

#[derive(Clone, Copy, Debug)]
pub struct NamedPipeConnector;

impl hyper::service::Service<hyper::Uri> for NamedPipeConnector {
    type Response = NamedPipeStream;
    type Error = io::Error;
    type Future =
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, destination: hyper::Uri) -> Self::Future {
        let fut = async move {
            match destination.scheme() {
                Some(scheme) if scheme == NAMED_PIPE_SCHEME => Ok(()),
                _ => Err(io::Error::new(
                    io::ErrorKind::InvalidInput,
                    format!("Invalid scheme {:?}", destination.scheme()),
                )),
            }?;

            match Uri::socket_path_dest(&destination, &ClientType::NamedPipe) {
                Some(path) => Ok(NamedPipeStream::connect(&path).await?),

                None => Err(io::Error::new(
                    io::ErrorKind::InvalidInput,
                    format!("Invalid uri {:?}", destination),
                )),
            }
        };

        Box::pin(fut)
    }
}

impl hyper::client::connect::Connection for NamedPipeStream {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

const NAMED_PIPE_SCHEME: &str = "net.pipe";