sequoia_gpg_agent/
assuan.rs

1//! Assuan RPC support.
2
3#![warn(missing_docs)]
4
5use std::cmp;
6use std::fmt;
7use std::io::Write;
8use std::mem;
9use std::path::Path;
10use std::pin::Pin;
11use std::task::{Poll, Context};
12
13use lalrpop_util::ParseError;
14
15use futures::{Future, Stream, StreamExt};
16use tokio::io::{BufReader, ReadHalf, WriteHalf};
17use tokio::io::{AsyncRead, AsyncWriteExt};
18
19use crate::openpgp;
20use openpgp::crypto::mem::Protected;
21
22use crate::Result;
23
24mod lexer;
25mod socket;
26use socket::IpcStream;
27
28// Maximum line length of the reference implementation.
29const MAX_LINE_LENGTH: usize = 1000;
30
31// Load the generated code.
32lalrpop_util::lalrpop_mod!(
33    #[allow(clippy::all)]
34    #[allow(missing_docs, unused_parens)]
35    grammar,
36    "/assuan/grammar.rs"
37);
38
39#[derive(thiserror::Error, Debug)]
40/// Errors returned from the Assuan routines.
41#[non_exhaustive]
42pub enum Error {
43    /// Handshake failed.
44    #[error("Handshake failed: {0}")]
45    HandshakeFailed(String),
46
47    /// The caller violated the protocol.
48    #[error("Invalid operation: {0}")]
49    InvalidOperation(String),
50
51    /// The remote party violated the protocol.
52    #[error("Protocol violation: {0}")]
53    ProtocolError(String),
54
55    /// The remote operation failed.
56    #[error("Operation failed: {0}")]
57    OperationFailed(String),
58}
59
60/// A connection to an Assuan server.
61///
62/// Commands may be issued using [`Connection::send`].  Note that the
63/// command is sent lazily, i.e. it is only sent if you poll for the
64/// responses.
65///
66/// [`Connection::send`]: Client::send()
67///
68/// `Client` implements [`Stream`] to return all server responses
69/// until the first [`Response::Ok`], [`Response::Error`], or
70/// [`Response::Inquire`].
71///
72/// [`Stream`]: #impl-Stream
73///
74/// [`Response::Ok`] and [`Response::Error`] indicate success and
75/// failure.  [`Response::Inquire`] means that the server requires
76/// more information to complete the request.  This information may be
77/// provided using [`Connection::data()`], or the operation may be
78/// canceled using [`Connection::cancel()`].
79///
80/// [`Connection::data()`]: Client::data()
81/// [`Connection::cancel()`]: Client::cancel()
82pub struct Client {
83    r: BufReader<ReadHalf<IpcStream>>, // xxx: abstract over
84    buffer: Vec<u8>,
85    done: bool,
86    w: WriteState,
87    trace_send: Option<Box<dyn Fn(&[u8]) + Send + Sync>>,
88    trace_receive: Option<Box<dyn Fn(&[u8]) + Send + Sync>>,
89}
90assert_send_and_sync!(Client);
91
92impl fmt::Debug for Client {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_struct("Client")
95         .field("r", &self.r)
96         .field("buffer", &self.buffer)
97         .field("done", &self.done)
98         .field("w", &self.w)
99         .finish()
100    }
101}
102enum WriteState {
103    Ready(WriteHalf<IpcStream>),
104    Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<IpcStream>>>
105                    + Send + Sync>>),
106    Transitioning,
107    Dead,
108}
109assert_send_and_sync!(WriteState);
110
111impl std::fmt::Debug for WriteState {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>)
113        -> std::result::Result<(), std::fmt::Error>
114    {
115        use WriteState::*;
116        match self {
117            Ready(_) => write!(f, "WriteState::Ready"),
118            Sending(_) => write!(f, "WriteState::Sending"),
119            Transitioning => write!(f, "WriteState::Transitioning"),
120            Dead => write!(f, "WriteState::Dead"),
121        }
122    }
123}
124
125/// Percent-escapes the given string.
126pub fn escape<S: AsRef<str>>(s: S) -> String {
127    let mut r = String::with_capacity(s.as_ref().len());
128    for c in s.as_ref().chars() {
129        match c {
130            '%' => r.push_str("%25"),
131            ' ' => r.push('+'),
132            n if n.is_ascii() && (n as u8) < 32 =>
133                r.push_str(&format!("%{:02X}", n as u8)),
134            _ => r.push(c),
135        }
136    }
137    r
138}
139
140impl Client {
141    /// Connects to the server.
142    pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> {
143        let connection = socket::sock_connect(path)?;
144        Ok(ConnectionFuture::new(connection).await?)
145    }
146
147    /// Lazily sends a command to the server.
148    ///
149    /// For the command to be actually executed, stream the responses
150    /// using this objects [`Stream`] implementation.
151    ///
152    /// Note: It is very important to poll the client object until it
153    /// returns `None`.  Otherwise, the server and client will lose
154    /// synchronization, and requests and responses will no longer be
155    /// correctly associated.
156    ///
157    /// [`Stream`]: #impl-Stream
158    ///
159    /// The response stream ends in either a [`Response::Ok`],
160    /// [`Response::Error`], or [`Response::Inquire`].  `Ok` and
161    /// `Error` indicate success and failure of the current operation.
162    /// `Inquire` means that the server requires more information to
163    /// complete the request.  This information may be provided using
164    /// [`Connection::data()`], or the operation may be canceled using
165    /// [`Connection::cancel()`].
166    ///
167    /// [`Response::Ok`]: super::assuan::Response::Ok
168    /// [`Response::Error`]: super::assuan::Response::Error
169    /// [`Response::Inquire`]: super::assuan::Response::Inquire
170    /// [`Connection::data()`]: Client::data()
171    /// [`Connection::cancel()`]: Client::cancel()
172    ///
173    /// Note: `command` is passed as-is.  Control characters, like
174    /// `%`, must be %-escaped using [`escape`].
175    pub fn send<'a, C>(&'a mut self, command: C) -> Result<()>
176        where C: AsRef<[u8]> + 'a
177    {
178        if let WriteState::Sending(_) = self.w {
179            return Err(Error::InvalidOperation(
180                "Busy, poll responses first".into()).into());
181        }
182
183        self.w =
184            match mem::replace(&mut self.w, WriteState::Transitioning)
185        {
186            WriteState::Ready(mut sink) => {
187                let command = command.as_ref();
188                let mut c = command.to_vec();
189                if ! c.ends_with(b"\n") {
190                    c.push(0x0a);
191                }
192                if let Some(t) = self.trace_send.as_ref() {
193                    t(&c);
194                }
195                WriteState::Sending(Box::pin(async move {
196                    sink.write_all(&c).await?;
197                    Ok(sink)
198                }))
199            },
200            WriteState::Dead => {
201                // We're still dead.
202                self.w = WriteState::Dead;
203                return Err(Error::OperationFailed(
204                    "Connection dropped".into()).into());
205            }
206            s => panic!("Client state machine desynchronized with servers: \
207                         in {:?}, should be in WriteState::Ready", s),
208        };
209
210        Ok(())
211    }
212
213    /// Sends a simple command to the server and returns the response.
214    ///
215    /// This method can only be used with simple commands, i.e. those
216    /// which do not require handling inquiries from the server.  To
217    /// send complex commands, use [`Client::send`] and handle the
218    /// inquiries.
219    pub async fn send_simple<C>(&mut self, cmd: C) -> Result<Protected>
220    where
221        C: AsRef<str>,
222    {
223        self.send(cmd.as_ref())?;
224        let mut data = Vec::new();
225        while let Some(response) = self.next().await {
226            match response? {
227                Response::Data { partial } => {
228                    // Securely erase partial.
229                    let partial = Protected::from(partial);
230                    data.extend_from_slice(&partial);
231                },
232                Response::Ok { .. }
233                | Response::Comment { .. }
234                | Response::Status { .. } =>
235                    (), // Ignore.
236                Response::Error { ref message, .. } =>
237                    return operation_failed(self, message).await,
238                response =>
239                    return protocol_error(&response),
240            }
241        }
242
243        Ok(data.into())
244    }
245
246    /// Lazily cancels a pending operation.
247    ///
248    /// For the command to be actually executed, stream the responses
249    /// using this objects [`Stream`] implementation.
250    ///
251    /// [`Stream`]: #impl-Stream
252    pub fn cancel(&mut self) -> Result<()> {
253        self.send("CAN")
254    }
255
256    /// Lazily sends data in response to an inquire.
257    ///
258    /// For the command to be actually executed, stream the responses
259    /// using this objects [`Stream`] implementation.
260    ///
261    /// [`Stream`]: #impl-Stream
262    ///
263    /// The response stream ends in either a [`Response::Ok`],
264    /// [`Response::Error`], or another [`Response::Inquire`].  `Ok`
265    /// and `Error` indicate success and failure of the original
266    /// operation that lead to the current inquiry.
267    ///
268    /// [`Response::Ok`]: super::assuan::Response::Ok
269    /// [`Response::Error`]: super::assuan::Response::Error
270    /// [`Response::Inquire`]: super::assuan::Response::Inquire
271    pub fn data<'a, C>(&'a mut self, data: C) -> Result<()>
272        where C: AsRef<[u8]> + 'a
273    {
274        let mut data = data.as_ref();
275        let mut request = Vec::with_capacity(data.len());
276        while ! data.is_empty() {
277            if !request.is_empty() {
278                request.push(0x0a);
279            }
280            write!(&mut request, "D ").unwrap();
281            let mut line_len = 2;
282            while ! data.is_empty() && line_len < MAX_LINE_LENGTH - 3 {
283                let c = data[0];
284                data = &data[1..];
285                match c as char {
286                    '%' | '\n' | '\r' => {
287                        line_len += 3;
288                        write!(&mut request, "%{:02X}", c).unwrap();
289                    },
290                    _ => {
291                        line_len += 1;
292                        request.push(c);
293                    },
294                }
295            }
296        }
297        write!(&mut request, "\nEND").unwrap();
298        self.send(request)
299    }
300
301    /// Start tracing the data that is sent to the server.
302    ///
303    /// Note: if a tracing function is already registered, this
304    /// replaces it.
305    pub fn trace_data_sent(&mut self, fun: Box<dyn Fn(&[u8]) + Send + Sync>)
306    {
307        self.trace_send = Some(fun);
308    }
309
310    /// Start tracing the data that is received from the server.
311    ///
312    /// Note: if a tracing function is already registered, this
313    /// replaces it.
314    pub fn trace_data_received(&mut self, fun: Box<dyn Fn(&[u8]) + Send + Sync>)
315    {
316        self.trace_receive = Some(fun);
317    }
318}
319
320/// Returns a convenient Err value for use in the state machines.
321///
322/// This function must only be called after the assuan server returns
323/// an ERR.  message is the error message returned from the server.
324/// This function first checks that the server hasn't sent anything
325/// else, which would be a protocol violation.  If that is not the
326/// case, it turns the message into an Err.
327pub(crate) async fn operation_failed<T>(agent: &mut Client,
328                                        message: &Option<String>)
329                                        -> Result<T>
330{
331    if let Some(response) = agent.next().await {
332        protocol_error(&response?)
333    } else {
334        Err(Error::OperationFailed(
335            message.as_ref().map(|e| e.to_string())
336                .unwrap_or_else(|| "Unknown reason".into()))
337            .into())
338    }
339}
340
341/// Returns a convenient Err value for use in the state machines.
342pub(crate) fn protocol_error<T>(response: &Response) -> Result<T> {
343    Err(Error::ProtocolError(
344        format!("Got unexpected response {:?}", response))
345        .into())
346}
347
348/// A future that will resolve to a `Client`.
349struct ConnectionFuture(Option<Client>);
350
351impl ConnectionFuture {
352    fn new(c: IpcStream) -> Self {
353        let (r, w) = tokio::io::split(c);
354        let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
355        Self(Some(Client {
356            r: BufReader::new(r), buffer, done: false,
357            w: WriteState::Ready(w),
358            trace_send: None,
359            trace_receive: None,
360        }))
361    }
362}
363
364impl Future for ConnectionFuture {
365    type Output = Result<Client>;
366
367    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
368        // Consume the initial message from the server.
369        let client: &mut Client = self.0.as_mut().expect("future polled after completion");
370        let mut responses = client.by_ref().collect::<Vec<_>>();
371
372        match Pin::new(&mut responses).poll(cx) {
373            Poll::Ready(response) => {
374                Poll::Ready(match response.iter().last() {
375                    Some(Ok(Response::Ok { .. })) =>
376                        Ok(self.0.take().unwrap()),
377                    Some(Ok(Response::Error { code, message })) =>
378                        Err(Error::HandshakeFailed(
379                            format!("Error {}: {:?}", code, message)).into()),
380                    l @ Some(_) =>
381                        Err(Error::HandshakeFailed(
382                            format!("Unexpected server response: {:?}", l)
383                        ).into()),
384                    None => // XXX does that happen?
385                        Err(Error::HandshakeFailed(
386                            "No data received from server".into()).into()),
387                })
388            },
389            Poll::Pending => Poll::Pending,
390        }
391    }
392}
393
394impl Stream for Client {
395    type Item = Result<Response>;
396
397    /// Attempt to pull out the next value of this stream, returning
398    /// None if the stream is finished.
399    ///
400    /// Note: It _is_ safe to call this again after the stream
401    /// finished, i.e. returned `Ready(None)`.
402    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403        // First, handle sending of the command.
404        match self.w {
405            WriteState::Ready(_) =>
406                (),  // Nothing to do, poll for responses below.
407            WriteState::Sending(_) => {
408                self.w = if let WriteState::Sending(mut f) =
409                    mem::replace(&mut self.w, WriteState::Transitioning)
410                {
411                    match f.as_mut().poll(cx) {
412                        Poll::Ready(Ok(sink)) => WriteState::Ready(sink),
413                        Poll::Pending => WriteState::Sending(f),
414                        Poll::Ready(Err(e)) => {
415                            self.w = WriteState::Dead;
416                            return Poll::Ready(Some(Err(e)));
417                        },
418                    }
419                } else {
420                    unreachable!()
421                };
422            },
423            WriteState::Transitioning =>
424                unreachable!(),
425            WriteState::Dead =>
426                (),  // Nothing left to do, poll for responses below.
427        }
428
429        // Recheck if we are still sending the command.
430        if let WriteState::Sending(_) = self.w {
431            return Poll::Pending;
432        }
433
434        // Check if the previous response was one of ok, error, or
435        // inquire.
436        if self.done {
437            // If so, we signal end of stream here.
438            self.done = false;
439            return Poll::Ready(None);
440        }
441
442        // The compiler is not smart enough to figure out disjoint borrows
443        // through Pin via DerefMut (which wholly borrows `self`), so unwrap it
444        let Self { buffer, done, r, trace_receive, .. } = Pin::into_inner(self);
445        let mut reader = Pin::new(r);
446        loop {
447            // Try to yield a line from the buffer.  For that, try to
448            // find linebreaks.
449            if let Some(p) = buffer.iter().position(|&b| b == 0x0a) {
450                let line: Vec<u8> = buffer.drain(..p+1).collect();
451                // xxx: rtrim linebreak even more? crlf maybe?
452                if let Some(t) = trace_receive {
453                    t(&line[..line.len()-1]);
454                }
455                let r = Response::parse(&line[..line.len()-1])?;
456                // If this response is one of ok, error, or inquire,
457                // we want to surrender control to the client next
458                // time she asks for an item.
459                *done = r.is_done();
460                return Poll::Ready(Some(Ok(r)));
461            }
462
463            // No more linebreaks in the buffer.  We need to get more.
464            // First, get a new read buffer.
465            // Later, append the read data to the Client's buffer
466
467            let mut vec = vec![0u8; MAX_LINE_LENGTH];
468            let mut read_buf = tokio::io::ReadBuf::new(&mut vec);
469
470            match reader.as_mut().poll_read(cx, &mut read_buf)? {
471                Poll::Ready(()) => {
472                    if read_buf.filled().is_empty() {
473                        // End of stream.
474                        return Poll::Ready(None)
475                    } else {
476                        buffer.extend_from_slice(read_buf.filled());
477                        continue;
478                    }
479                },
480
481                Poll::Pending => {
482                    return Poll::Pending;
483                },
484            }
485        }
486    }
487}
488
489/// Server response.
490#[derive(Debug, PartialEq)]
491pub enum Response {
492    /// Operation successful.
493    Ok {
494        /// Optional human-readable message.
495        message: Option<String>,
496    },
497    /// An error occurred.
498    Error {
499        /// Error code.
500        ///
501        /// This code is defined in `libgpg-error`.
502        code: usize,
503        /// Optional human-readable message.
504        message: Option<String>,
505    },
506    /// Information about the ongoing operation.
507    Status {
508        /// Indicates what the status message is about.
509        keyword: String,
510        /// Human-readable message.
511        message: String,
512    },
513    /// A comment for debugging purposes.
514    Comment {
515        /// Human-readable message.
516        message: String,
517    },
518    /// Raw data returned to the client.
519    Data {
520        /// A chunk of raw data.
521        ///
522        /// Consecutive `Data` responses must be joined.
523        partial: Vec<u8>,
524    },
525    /// Request for information from the client.
526    Inquire {
527        /// The subject of the inquiry.
528        keyword: String,
529        /// Optional parameters.
530        parameters: Option<Vec<u8>>,
531    },
532}
533
534impl Response {
535    /// Parses the given response.
536    pub fn parse(b: &[u8]) -> Result<Response> {
537        match self::grammar::ResponseParser::new().parse(lexer::Lexer::new(b)) {
538            Ok(r) => Ok(r),
539            Err(err) => {
540                let mut msg = Vec::new();
541                writeln!(&mut msg, "Parsing: {:?}: {:?}", b, err)?;
542                if let ParseError::UnrecognizedToken {
543                    token: (start, _, end), ..
544                } = err
545                {
546                    writeln!(&mut msg, "Context:")?;
547                    let chars = b.iter().enumerate()
548                        .filter_map(|(i, c)| {
549                            if cmp::max(8, start) - 8 <= i
550                                && i <= end + 8
551                            {
552                                Some((i, c))
553                            } else {
554                                None
555                            }
556                        });
557                    for (i, c) in chars {
558                        writeln!(&mut msg, "{} {} {}: {:?}",
559                                 if i == start { "*" } else { " " },
560                                 i,
561                                 *c as char,
562                                 c)?;
563                    }
564                }
565                Err(anyhow::anyhow!(
566                    String::from_utf8_lossy(&msg).to_string()).into())
567            },
568        }
569    }
570
571    /// Returns true if this message indicates success.
572    pub fn is_ok(&self) -> bool {
573        matches!(self, Response::Ok { .. } )
574    }
575
576    /// Returns true if this message indicates an error.
577    pub fn is_err(&self) -> bool {
578        matches!(self, Response::Error { .. })
579    }
580
581    /// Returns true if this message is an inquiry.
582    pub fn is_inquire(&self) -> bool {
583        matches!(self, Response::Inquire { .. })
584    }
585
586    /// Returns true if this response concludes the server's response.
587    pub fn is_done(&self) -> bool {
588        // All server responses end in either OK or ERR.
589        self.is_ok() || self.is_err()
590        // However, the server may inquire more
591        // information.  We also surrender control to the
592        // caller by yielding the responses we have seen
593        // so far, and allow her to respond to the
594        // inquiry.
595            || self.is_inquire()
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    #[test]
604    fn basics() {
605        assert_eq!(
606            Response::parse(b"OK Pleased to meet you, process 7745")
607                .unwrap(),
608            Response::Ok {
609                message: Some("Pleased to meet you, process 7745".into()),
610            });
611        assert_eq!(
612            Response::parse(b"ERR 67109139 Unknown IPC command <GPG Agent>")
613                .unwrap(),
614            Response::Error {
615                code: 67109139,
616                message :Some("Unknown IPC command <GPG Agent>".into()),
617            });
618
619        let status =
620          b"S KEYINFO 151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -";
621        assert_eq!(
622            Response::parse(status).unwrap(),
623            Response::Status {
624                keyword: "KEYINFO".into(),
625                message:
626                    "151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -"
627                    .into(),
628            });
629
630        assert_eq!(
631            Response::parse(b"D (7:sig-val(3:rsa(1:s1:%25%0D)))")
632                .unwrap(),
633            Response::Data {
634                partial: b"(7:sig-val(3:rsa(1:s1:%\x0d)))".to_vec(),
635            });
636
637        assert_eq!(
638            Response::parse(b"INQUIRE CIPHERTEXT")
639                .unwrap(),
640            Response::Inquire {
641                keyword: "CIPHERTEXT".into(),
642                parameters: None,
643            });
644    }
645}