sequoia-ipc 0.29.0

Interprocess communication infrastructure for Sequoia
Documentation
//! Assuan RPC support.

#![warn(missing_docs)]

use std::cmp;
use std::io::Write;
use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::task::{Poll, Context};

use lalrpop_util::ParseError;

use futures::{Future, Stream, StreamExt};
use tokio::io::{BufReader, ReadHalf, WriteHalf};
use tokio::io::{AsyncRead, AsyncWriteExt};

use crate::openpgp;

use crate::Error;
use crate::Result;

mod lexer;
mod socket;
use socket::IpcStream;

// Maximum line length of the reference implementation.
const MAX_LINE_LENGTH: usize = 1000;

// Load the generated code.
lalrpop_util::lalrpop_mod!(
    #[allow(clippy::all)]
    #[allow(missing_docs, unused_parens)]
    grammar,
    "/assuan/grammar.rs"
);

/// A connection to an Assuan server.
///
/// Commands may be issued using [`Connection::send`].  Note that the
/// command is sent lazily, i.e. it is only sent if you poll for the
/// responses.
///
/// [`Connection::send`]: Client::send()
///
/// `Client` implements [`Stream`] to return all server responses
/// until the first [`Response::Ok`], [`Response::Error`], or
/// [`Response::Inquire`].
///
/// [`Stream`]: #impl-Stream
///
/// [`Response::Ok`] and [`Response::Error`] indicate success and
/// failure.  [`Response::Inquire`] means that the server requires
/// more information to complete the request.  This information may be
/// provided using [`Connection::data()`], or the operation may be
/// canceled using [`Connection::cancel()`].
///
/// [`Connection::data()`]: Client::data()
/// [`Connection::cancel()`]: Client::cancel()
pub struct Client {
    r: BufReader<ReadHalf<IpcStream>>, // xxx: abstract over
    buffer: Vec<u8>,
    done: bool,
    w: WriteState,
}
assert_send_and_sync!(Client);

enum WriteState {
    Ready(WriteHalf<IpcStream>),
    Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<IpcStream>>>
                    + Send + Sync>>),
    Transitioning,
    Dead,
}
assert_send_and_sync!(WriteState);

/// Percent-escapes the given string.
pub fn escape<S: AsRef<str>>(s: S) -> String {
    let mut r = String::with_capacity(s.as_ref().len());
    for c in s.as_ref().chars() {
        match c {
            '%' => r.push_str("%25"),
            ' ' => r.push('+'),
            n if n.is_ascii() && (n as u8) < 32 =>
                r.push_str(&format!("%{:02X}", n as u8)),
            _ => r.push(c),
        }
    }
    r
}

impl Client {
    /// Connects to the server.
    pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> {
        let connection = socket::sock_connect(path)?;
        Ok(ConnectionFuture::new(connection).await?)
    }

    /// Lazily sends a command to the server.
    ///
    /// For the command to be actually executed, stream the responses
    /// using this objects [`Stream`] implementation.
    ///
    /// [`Stream`]: #impl-Stream
    ///
    /// The response stream ends in either a [`Response::Ok`],
    /// [`Response::Error`], or [`Response::Inquire`].  `Ok` and
    /// `Error` indicate success and failure of the current operation.
    /// `Inquire` means that the server requires more information to
    /// complete the request.  This information may be provided using
    /// [`Connection::data()`], or the operation may be canceled using
    /// [`Connection::cancel()`].
    ///
    /// [`Response::Ok`]: super::assuan::Response::Ok
    /// [`Response::Error`]: super::assuan::Response::Error
    /// [`Response::Inquire`]: super::assuan::Response::Inquire
    /// [`Connection::data()`]: Client::data()
    /// [`Connection::cancel()`]: Client::cancel()
    ///
    /// Note: `command` is passed as-is.  Control characters, like
    /// `%`, must be %-escaped using [`escape`].
    pub fn send<'a, C: 'a>(&'a mut self, command: C) -> Result<()>
        where C: AsRef<[u8]>
    {
        if let WriteState::Sending(_) = self.w {
            return Err(openpgp::Error::InvalidOperation(
                "Busy, poll responses first".into()).into());
        }

        self.w =
            match mem::replace(&mut self.w, WriteState::Transitioning)
        {
            WriteState::Ready(mut sink) => {
                let command = command.as_ref();
                let mut c = command.to_vec();
                if ! c.ends_with(b"\n") {
                    c.push(0x0a);
                }
                WriteState::Sending(Box::pin(async move {
                    sink.write_all(&c).await?;
                    Ok(sink)
                }))
            },
            _ => unreachable!(),
        };

        Ok(())
    }

    /// Lazily cancels a pending operation.
    ///
    /// For the command to be actually executed, stream the responses
    /// using this objects [`Stream`] implementation.
    ///
    /// [`Stream`]: #impl-Stream
    pub fn cancel(&mut self) -> Result<()> {
        self.send("CAN")
    }

    /// Lazily sends data in response to an inquire.
    ///
    /// For the command to be actually executed, stream the responses
    /// using this objects [`Stream`] implementation.
    ///
    /// [`Stream`]: #impl-Stream
    ///
    /// The response stream ends in either a [`Response::Ok`],
    /// [`Response::Error`], or another [`Response::Inquire`].  `Ok`
    /// and `Error` indicate success and failure of the original
    /// operation that lead to the current inquiry.
    ///
    /// [`Response::Ok`]: super::assuan::Response::Ok
    /// [`Response::Error`]: super::assuan::Response::Error
    /// [`Response::Inquire`]: super::assuan::Response::Inquire
    pub fn data<'a, C: 'a>(&'a mut self, data: C) -> Result<()>
        where C: AsRef<[u8]>
    {
        let mut data = data.as_ref();
        let mut request = Vec::with_capacity(data.len());
        while ! data.is_empty() {
            if !request.is_empty() {
                request.push(0x0a);
            }
            write!(&mut request, "D ").unwrap();
            let mut line_len = 2;
            while ! data.is_empty() && line_len < MAX_LINE_LENGTH - 3 {
                let c = data[0];
                data = &data[1..];
                match c as char {
                    '%' | '\n' | '\r' => {
                        line_len += 3;
                        write!(&mut request, "%{:02X}", c).unwrap();
                    },
                    _ => {
                        line_len += 1;
                        request.push(c);
                    },
                }
            }
        }
        write!(&mut request, "\nEND").unwrap();
        self.send(request)
    }
}

/// A future that will resolve to a `Client`.
struct ConnectionFuture(Option<Client>);

impl ConnectionFuture {
    fn new(c: IpcStream) -> Self {
        let (r, w) = tokio::io::split(c);
        let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
        Self(Some(Client {
            r: BufReader::new(r), buffer, done: false,
            w: WriteState::Ready(w)
        }))
    }
}

impl Future for ConnectionFuture {
    type Output = Result<Client>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Consume the initial message from the server.
        let client: &mut Client = self.0.as_mut().expect("future polled after completion");
        let mut responses = client.by_ref().collect::<Vec<_>>();

        match Pin::new(&mut responses).poll(cx) {
            Poll::Ready(response) => {
                Poll::Ready(match response.iter().last() {
                    Some(Ok(Response::Ok { .. })) =>
                        Ok(self.0.take().unwrap()),
                    Some(Ok(Response::Error { code, message })) =>
                        Err(Error::HandshakeFailed(
                            format!("Error {}: {:?}", code, message)).into()),
                    l @ Some(_) =>
                        Err(Error::HandshakeFailed(
                            format!("Unexpected server response: {:?}", l)
                        ).into()),
                    None => // XXX does that happen?
                        Err(Error::HandshakeFailed(
                            "No data received from server".into()).into()),
                })
            },
            Poll::Pending => Poll::Pending,
        }
    }
}

impl Stream for Client {
    type Item = Result<Response>;

    /// Attempt to pull out the next value of this stream, returning
    /// None if the stream is finished.
    ///
    /// Note: It _is_ safe to call this again after the stream
    /// finished, i.e. returned `Ready(None)`.
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // First, handle sending of the command.
        match self.w {
            WriteState::Ready(_) =>
                (),  // Nothing to do, poll for responses below.
            WriteState::Sending(_) => {
                self.w = if let WriteState::Sending(mut f) =
                    mem::replace(&mut self.w, WriteState::Transitioning)
                {
                    match f.as_mut().poll(cx) {
                        Poll::Ready(Ok(sink)) => WriteState::Ready(sink),
                        Poll::Pending => WriteState::Sending(f),
                        Poll::Ready(Err(e)) => {
                            self.w = WriteState::Dead;
                            return Poll::Ready(Some(Err(e)));
                        },
                    }
                } else {
                    unreachable!()
                };
            },
            WriteState::Transitioning =>
                unreachable!(),
            WriteState::Dead =>
                (),  // Nothing left to do, poll for responses below.
        }

        // Recheck if we are still sending the command.
        if let WriteState::Sending(_) = self.w {
            return Poll::Pending;
        }

        // Check if the previous response was one of ok, error, or
        // inquire.
        if self.done {
            // If so, we signal end of stream here.
            self.done = false;
            return Poll::Ready(None);
        }

        // The compiler is not smart enough to figure out disjoint borrows
        // through Pin via DerefMut (which wholly borrows `self`), so unwrap it
        let Self { buffer, done, r, .. } = Pin::into_inner(self);
        let mut reader = Pin::new(r);
        loop {
            // Try to yield a line from the buffer.  For that, try to
            // find linebreaks.
            if let Some(p) = buffer.iter().position(|&b| b == 0x0a) {
                let line: Vec<u8> = buffer.drain(..p+1).collect();
                // xxx: rtrim linebreak even more? crlf maybe?
                let r = Response::parse(&line[..line.len()-1])?;
                // If this response is one of ok, error, or inquire,
                // we want to surrender control to the client next
                // time she asks for an item.
                *done = r.is_done();
                return Poll::Ready(Some(Ok(r)));
            }

            // No more linebreaks in the buffer.  We need to get more.
            // First, get a new read buffer.
            // Later, append the read data to the Client's buffer

            let mut vec = vec![0u8; MAX_LINE_LENGTH];
            let mut read_buf = tokio::io::ReadBuf::new(&mut vec);

            match reader.as_mut().poll_read(cx, &mut read_buf)? {
                Poll::Ready(()) => {
                    if read_buf.filled().is_empty() {
                        // End of stream.
                        return Poll::Ready(None)
                    } else {
                        buffer.extend_from_slice(read_buf.filled());
                        continue;
                    }
                },

                Poll::Pending => {
                    return Poll::Pending;
                },
            }
        }
    }
}

/// Server response.
#[derive(Debug, PartialEq)]
pub enum Response {
    /// Operation successful.
    Ok {
        /// Optional human-readable message.
        message: Option<String>,
    },
    /// An error occurred.
    Error {
        /// Error code.
        ///
        /// This code is defined in `libgpg-error`.
        code: usize,
        /// Optional human-readable message.
        message: Option<String>,
    },
    /// Information about the ongoing operation.
    Status {
        /// Indicates what the status message is about.
        keyword: String,
        /// Human-readable message.
        message: String,
    },
    /// A comment for debugging purposes.
    Comment {
        /// Human-readable message.
        message: String,
    },
    /// Raw data returned to the client.
    Data {
        /// A chunk of raw data.
        ///
        /// Consecutive `Data` responses must be joined.
        partial: Vec<u8>,
    },
    /// Request for information from the client.
    Inquire {
        /// The subject of the inquiry.
        keyword: String,
        /// Optional parameters.
        parameters: Option<Vec<u8>>,
    },
}

impl Response {
    /// Parses the given response.
    pub fn parse(b: &[u8]) -> Result<Response> {
        match self::grammar::ResponseParser::new().parse(lexer::Lexer::new(b)) {
            Ok(r) => Ok(r),
            Err(err) => {
                let mut msg = Vec::new();
                writeln!(&mut msg, "Parsing: {:?}: {:?}", b, err)?;
                if let ParseError::UnrecognizedToken {
                    token: (start, _, end), ..
                } = err
                {
                    writeln!(&mut msg, "Context:")?;
                    let chars = b.iter().enumerate()
                        .filter_map(|(i, c)| {
                            if cmp::max(8, start) - 8 <= i
                                && i <= end + 8
                            {
                                Some((i, c))
                            } else {
                                None
                            }
                        });
                    for (i, c) in chars {
                        writeln!(&mut msg, "{} {} {}: {:?}",
                                 if i == start { "*" } else { " " },
                                 i,
                                 *c as char,
                                 c)?;
                    }
                }
                Err(anyhow::anyhow!(
                    String::from_utf8_lossy(&msg).to_string()))
            },
        }
    }

    /// Returns true if this message indicates success.
    pub fn is_ok(&self) -> bool {
        matches!(self, Response::Ok { .. } )
    }

    /// Returns true if this message indicates an error.
    pub fn is_err(&self) -> bool {
        matches!(self, Response::Error { .. })
    }

    /// Returns true if this message is an inquiry.
    pub fn is_inquire(&self) -> bool {
        matches!(self, Response::Inquire { .. })
    }

    /// Returns true if this response concludes the server's response.
    pub fn is_done(&self) -> bool {
        // All server responses end in either OK or ERR.
        self.is_ok() || self.is_err()
        // However, the server may inquire more
        // information.  We also surrender control to the
        // caller by yielding the responses we have seen
        // so far, and allow her to respond to the
        // inquiry.
            || self.is_inquire()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn basics() {
        assert_eq!(
            Response::parse(b"OK Pleased to meet you, process 7745")
                .unwrap(),
            Response::Ok {
                message: Some("Pleased to meet you, process 7745".into()),
            });
        assert_eq!(
            Response::parse(b"ERR 67109139 Unknown IPC command <GPG Agent>")
                .unwrap(),
            Response::Error {
                code: 67109139,
                message :Some("Unknown IPC command <GPG Agent>".into()),
            });

        let status =
          b"S KEYINFO 151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -";
        assert_eq!(
            Response::parse(status).unwrap(),
            Response::Status {
                keyword: "KEYINFO".into(),
                message:
                    "151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -"
                    .into(),
            });

        assert_eq!(
            Response::parse(b"D (7:sig-val(3:rsa(1:s1:%25%0D)))")
                .unwrap(),
            Response::Data {
                partial: b"(7:sig-val(3:rsa(1:s1:%\x0d)))".to_vec(),
            });

        assert_eq!(
            Response::parse(b"INQUIRE CIPHERTEXT")
                .unwrap(),
            Response::Inquire {
                keyword: "CIPHERTEXT".into(),
                parameters: None,
            });
    }
}