sequoia_gpg_agent/assuan.rs
1//! Assuan RPC support.
2
3#![warn(missing_docs)]
4
5use std::cmp;
6use std::fmt;
7use std::io::Write;
8use std::mem;
9use std::path::Path;
10use std::pin::Pin;
11use std::task::{Poll, Context};
12
13use lalrpop_util::ParseError;
14
15use futures::{Future, Stream, StreamExt};
16use tokio::io::{BufReader, ReadHalf, WriteHalf};
17use tokio::io::{AsyncRead, AsyncWriteExt};
18
19use crate::openpgp;
20use openpgp::crypto::mem::Protected;
21
22use crate::Result;
23
24mod lexer;
25mod socket;
26use socket::IpcStream;
27
28// Maximum line length of the reference implementation.
29const MAX_LINE_LENGTH: usize = 1000;
30
31// Load the generated code.
32lalrpop_util::lalrpop_mod!(
33 #[allow(clippy::all)]
34 #[allow(missing_docs, unused_parens)]
35 grammar,
36 "/assuan/grammar.rs"
37);
38
39#[derive(thiserror::Error, Debug)]
40/// Errors returned from the Assuan routines.
41#[non_exhaustive]
42pub enum Error {
43 /// Handshake failed.
44 #[error("Handshake failed: {0}")]
45 HandshakeFailed(String),
46
47 /// The caller violated the protocol.
48 #[error("Invalid operation: {0}")]
49 InvalidOperation(String),
50
51 /// The remote party violated the protocol.
52 #[error("Protocol violation: {0}")]
53 ProtocolError(String),
54
55 /// The remote operation failed.
56 #[error("Operation failed: {0}")]
57 OperationFailed(String),
58}
59
60/// A connection to an Assuan server.
61///
62/// Commands may be issued using [`Connection::send`]. Note that the
63/// command is sent lazily, i.e. it is only sent if you poll for the
64/// responses.
65///
66/// [`Connection::send`]: Client::send()
67///
68/// `Client` implements [`Stream`] to return all server responses
69/// until the first [`Response::Ok`], [`Response::Error`], or
70/// [`Response::Inquire`].
71///
72/// [`Stream`]: #impl-Stream
73///
74/// [`Response::Ok`] and [`Response::Error`] indicate success and
75/// failure. [`Response::Inquire`] means that the server requires
76/// more information to complete the request. This information may be
77/// provided using [`Connection::data()`], or the operation may be
78/// canceled using [`Connection::cancel()`].
79///
80/// [`Connection::data()`]: Client::data()
81/// [`Connection::cancel()`]: Client::cancel()
82pub struct Client {
83 r: BufReader<ReadHalf<IpcStream>>, // xxx: abstract over
84 buffer: Vec<u8>,
85 done: bool,
86 w: WriteState,
87 trace_send: Option<Box<dyn Fn(&[u8]) + Send + Sync>>,
88 trace_receive: Option<Box<dyn Fn(&[u8]) + Send + Sync>>,
89}
90assert_send_and_sync!(Client);
91
92impl fmt::Debug for Client {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_struct("Client")
95 .field("r", &self.r)
96 .field("buffer", &self.buffer)
97 .field("done", &self.done)
98 .field("w", &self.w)
99 .finish()
100 }
101}
102enum WriteState {
103 Ready(WriteHalf<IpcStream>),
104 Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<IpcStream>>>
105 + Send + Sync>>),
106 Transitioning,
107 Dead,
108}
109assert_send_and_sync!(WriteState);
110
111impl std::fmt::Debug for WriteState {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>)
113 -> std::result::Result<(), std::fmt::Error>
114 {
115 use WriteState::*;
116 match self {
117 Ready(_) => write!(f, "WriteState::Ready"),
118 Sending(_) => write!(f, "WriteState::Sending"),
119 Transitioning => write!(f, "WriteState::Transitioning"),
120 Dead => write!(f, "WriteState::Dead"),
121 }
122 }
123}
124
125/// Percent-escapes the given string.
126pub fn escape<S: AsRef<str>>(s: S) -> String {
127 let mut r = String::with_capacity(s.as_ref().len());
128 for c in s.as_ref().chars() {
129 match c {
130 '%' => r.push_str("%25"),
131 ' ' => r.push('+'),
132 n if n.is_ascii() && (n as u8) < 32 =>
133 r.push_str(&format!("%{:02X}", n as u8)),
134 _ => r.push(c),
135 }
136 }
137 r
138}
139
140impl Client {
141 /// Connects to the server.
142 pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> {
143 let connection = socket::sock_connect(path)?;
144 Ok(ConnectionFuture::new(connection).await?)
145 }
146
147 /// Lazily sends a command to the server.
148 ///
149 /// For the command to be actually executed, stream the responses
150 /// using this objects [`Stream`] implementation.
151 ///
152 /// Note: It is very important to poll the client object until it
153 /// returns `None`. Otherwise, the server and client will lose
154 /// synchronization, and requests and responses will no longer be
155 /// correctly associated.
156 ///
157 /// [`Stream`]: #impl-Stream
158 ///
159 /// The response stream ends in either a [`Response::Ok`],
160 /// [`Response::Error`], or [`Response::Inquire`]. `Ok` and
161 /// `Error` indicate success and failure of the current operation.
162 /// `Inquire` means that the server requires more information to
163 /// complete the request. This information may be provided using
164 /// [`Connection::data()`], or the operation may be canceled using
165 /// [`Connection::cancel()`].
166 ///
167 /// [`Response::Ok`]: super::assuan::Response::Ok
168 /// [`Response::Error`]: super::assuan::Response::Error
169 /// [`Response::Inquire`]: super::assuan::Response::Inquire
170 /// [`Connection::data()`]: Client::data()
171 /// [`Connection::cancel()`]: Client::cancel()
172 ///
173 /// Note: `command` is passed as-is. Control characters, like
174 /// `%`, must be %-escaped using [`escape`].
175 pub fn send<'a, C>(&'a mut self, command: C) -> Result<()>
176 where C: AsRef<[u8]> + 'a
177 {
178 if let WriteState::Sending(_) = self.w {
179 return Err(Error::InvalidOperation(
180 "Busy, poll responses first".into()).into());
181 }
182
183 self.w =
184 match mem::replace(&mut self.w, WriteState::Transitioning)
185 {
186 WriteState::Ready(mut sink) => {
187 let command = command.as_ref();
188 let mut c = command.to_vec();
189 if ! c.ends_with(b"\n") {
190 c.push(0x0a);
191 }
192 if let Some(t) = self.trace_send.as_ref() {
193 t(&c);
194 }
195 WriteState::Sending(Box::pin(async move {
196 sink.write_all(&c).await?;
197 Ok(sink)
198 }))
199 },
200 WriteState::Dead => {
201 // We're still dead.
202 self.w = WriteState::Dead;
203 return Err(Error::OperationFailed(
204 "Connection dropped".into()).into());
205 }
206 s => panic!("Client state machine desynchronized with servers: \
207 in {:?}, should be in WriteState::Ready", s),
208 };
209
210 Ok(())
211 }
212
213 /// Sends a simple command to the server and returns the response.
214 ///
215 /// This method can only be used with simple commands, i.e. those
216 /// which do not require handling inquiries from the server. To
217 /// send complex commands, use [`Client::send`] and handle the
218 /// inquiries.
219 pub async fn send_simple<C>(&mut self, cmd: C) -> Result<Protected>
220 where
221 C: AsRef<str>,
222 {
223 self.send(cmd.as_ref())?;
224 let mut data = Vec::new();
225 while let Some(response) = self.next().await {
226 match response? {
227 Response::Data { partial } => {
228 // Securely erase partial.
229 let partial = Protected::from(partial);
230 data.extend_from_slice(&partial);
231 },
232 Response::Ok { .. }
233 | Response::Comment { .. }
234 | Response::Status { .. } =>
235 (), // Ignore.
236 Response::Error { ref message, .. } =>
237 return operation_failed(self, message).await,
238 response =>
239 return protocol_error(&response),
240 }
241 }
242
243 Ok(data.into())
244 }
245
246 /// Lazily cancels a pending operation.
247 ///
248 /// For the command to be actually executed, stream the responses
249 /// using this objects [`Stream`] implementation.
250 ///
251 /// [`Stream`]: #impl-Stream
252 pub fn cancel(&mut self) -> Result<()> {
253 self.send("CAN")
254 }
255
256 /// Lazily sends data in response to an inquire.
257 ///
258 /// For the command to be actually executed, stream the responses
259 /// using this objects [`Stream`] implementation.
260 ///
261 /// [`Stream`]: #impl-Stream
262 ///
263 /// The response stream ends in either a [`Response::Ok`],
264 /// [`Response::Error`], or another [`Response::Inquire`]. `Ok`
265 /// and `Error` indicate success and failure of the original
266 /// operation that lead to the current inquiry.
267 ///
268 /// [`Response::Ok`]: super::assuan::Response::Ok
269 /// [`Response::Error`]: super::assuan::Response::Error
270 /// [`Response::Inquire`]: super::assuan::Response::Inquire
271 pub fn data<'a, C>(&'a mut self, data: C) -> Result<()>
272 where C: AsRef<[u8]> + 'a
273 {
274 let mut data = data.as_ref();
275 let mut request = Vec::with_capacity(data.len());
276 while ! data.is_empty() {
277 if !request.is_empty() {
278 request.push(0x0a);
279 }
280 write!(&mut request, "D ").unwrap();
281 let mut line_len = 2;
282 while ! data.is_empty() && line_len < MAX_LINE_LENGTH - 3 {
283 let c = data[0];
284 data = &data[1..];
285 match c as char {
286 '%' | '\n' | '\r' => {
287 line_len += 3;
288 write!(&mut request, "%{:02X}", c).unwrap();
289 },
290 _ => {
291 line_len += 1;
292 request.push(c);
293 },
294 }
295 }
296 }
297 write!(&mut request, "\nEND").unwrap();
298 self.send(request)
299 }
300
301 /// Start tracing the data that is sent to the server.
302 ///
303 /// Note: if a tracing function is already registered, this
304 /// replaces it.
305 pub fn trace_data_sent(&mut self, fun: Box<dyn Fn(&[u8]) + Send + Sync>)
306 {
307 self.trace_send = Some(fun);
308 }
309
310 /// Start tracing the data that is received from the server.
311 ///
312 /// Note: if a tracing function is already registered, this
313 /// replaces it.
314 pub fn trace_data_received(&mut self, fun: Box<dyn Fn(&[u8]) + Send + Sync>)
315 {
316 self.trace_receive = Some(fun);
317 }
318}
319
320/// Returns a convenient Err value for use in the state machines.
321///
322/// This function must only be called after the assuan server returns
323/// an ERR. message is the error message returned from the server.
324/// This function first checks that the server hasn't sent anything
325/// else, which would be a protocol violation. If that is not the
326/// case, it turns the message into an Err.
327pub(crate) async fn operation_failed<T>(agent: &mut Client,
328 message: &Option<String>)
329 -> Result<T>
330{
331 if let Some(response) = agent.next().await {
332 protocol_error(&response?)
333 } else {
334 Err(Error::OperationFailed(
335 message.as_ref().map(|e| e.to_string())
336 .unwrap_or_else(|| "Unknown reason".into()))
337 .into())
338 }
339}
340
341/// Returns a convenient Err value for use in the state machines.
342pub(crate) fn protocol_error<T>(response: &Response) -> Result<T> {
343 Err(Error::ProtocolError(
344 format!("Got unexpected response {:?}", response))
345 .into())
346}
347
348/// A future that will resolve to a `Client`.
349struct ConnectionFuture(Option<Client>);
350
351impl ConnectionFuture {
352 fn new(c: IpcStream) -> Self {
353 let (r, w) = tokio::io::split(c);
354 let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
355 Self(Some(Client {
356 r: BufReader::new(r), buffer, done: false,
357 w: WriteState::Ready(w),
358 trace_send: None,
359 trace_receive: None,
360 }))
361 }
362}
363
364impl Future for ConnectionFuture {
365 type Output = Result<Client>;
366
367 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
368 // Consume the initial message from the server.
369 let client: &mut Client = self.0.as_mut().expect("future polled after completion");
370 let mut responses = client.by_ref().collect::<Vec<_>>();
371
372 match Pin::new(&mut responses).poll(cx) {
373 Poll::Ready(response) => {
374 Poll::Ready(match response.iter().last() {
375 Some(Ok(Response::Ok { .. })) =>
376 Ok(self.0.take().unwrap()),
377 Some(Ok(Response::Error { code, message })) =>
378 Err(Error::HandshakeFailed(
379 format!("Error {}: {:?}", code, message)).into()),
380 l @ Some(_) =>
381 Err(Error::HandshakeFailed(
382 format!("Unexpected server response: {:?}", l)
383 ).into()),
384 None => // XXX does that happen?
385 Err(Error::HandshakeFailed(
386 "No data received from server".into()).into()),
387 })
388 },
389 Poll::Pending => Poll::Pending,
390 }
391 }
392}
393
394impl Stream for Client {
395 type Item = Result<Response>;
396
397 /// Attempt to pull out the next value of this stream, returning
398 /// None if the stream is finished.
399 ///
400 /// Note: It _is_ safe to call this again after the stream
401 /// finished, i.e. returned `Ready(None)`.
402 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403 // First, handle sending of the command.
404 match self.w {
405 WriteState::Ready(_) =>
406 (), // Nothing to do, poll for responses below.
407 WriteState::Sending(_) => {
408 self.w = if let WriteState::Sending(mut f) =
409 mem::replace(&mut self.w, WriteState::Transitioning)
410 {
411 match f.as_mut().poll(cx) {
412 Poll::Ready(Ok(sink)) => WriteState::Ready(sink),
413 Poll::Pending => WriteState::Sending(f),
414 Poll::Ready(Err(e)) => {
415 self.w = WriteState::Dead;
416 return Poll::Ready(Some(Err(e)));
417 },
418 }
419 } else {
420 unreachable!()
421 };
422 },
423 WriteState::Transitioning =>
424 unreachable!(),
425 WriteState::Dead =>
426 (), // Nothing left to do, poll for responses below.
427 }
428
429 // Recheck if we are still sending the command.
430 if let WriteState::Sending(_) = self.w {
431 return Poll::Pending;
432 }
433
434 // Check if the previous response was one of ok, error, or
435 // inquire.
436 if self.done {
437 // If so, we signal end of stream here.
438 self.done = false;
439 return Poll::Ready(None);
440 }
441
442 // The compiler is not smart enough to figure out disjoint borrows
443 // through Pin via DerefMut (which wholly borrows `self`), so unwrap it
444 let Self { buffer, done, r, trace_receive, .. } = Pin::into_inner(self);
445 let mut reader = Pin::new(r);
446 loop {
447 // Try to yield a line from the buffer. For that, try to
448 // find linebreaks.
449 if let Some(p) = buffer.iter().position(|&b| b == 0x0a) {
450 let line: Vec<u8> = buffer.drain(..p+1).collect();
451 // xxx: rtrim linebreak even more? crlf maybe?
452 if let Some(t) = trace_receive {
453 t(&line[..line.len()-1]);
454 }
455 let r = Response::parse(&line[..line.len()-1])?;
456 // If this response is one of ok, error, or inquire,
457 // we want to surrender control to the client next
458 // time she asks for an item.
459 *done = r.is_done();
460 return Poll::Ready(Some(Ok(r)));
461 }
462
463 // No more linebreaks in the buffer. We need to get more.
464 // First, get a new read buffer.
465 // Later, append the read data to the Client's buffer
466
467 let mut vec = vec![0u8; MAX_LINE_LENGTH];
468 let mut read_buf = tokio::io::ReadBuf::new(&mut vec);
469
470 match reader.as_mut().poll_read(cx, &mut read_buf)? {
471 Poll::Ready(()) => {
472 if read_buf.filled().is_empty() {
473 // End of stream.
474 return Poll::Ready(None)
475 } else {
476 buffer.extend_from_slice(read_buf.filled());
477 continue;
478 }
479 },
480
481 Poll::Pending => {
482 return Poll::Pending;
483 },
484 }
485 }
486 }
487}
488
489/// Server response.
490#[derive(Debug, PartialEq)]
491pub enum Response {
492 /// Operation successful.
493 Ok {
494 /// Optional human-readable message.
495 message: Option<String>,
496 },
497 /// An error occurred.
498 Error {
499 /// Error code.
500 ///
501 /// This code is defined in `libgpg-error`.
502 code: usize,
503 /// Optional human-readable message.
504 message: Option<String>,
505 },
506 /// Information about the ongoing operation.
507 Status {
508 /// Indicates what the status message is about.
509 keyword: String,
510 /// Human-readable message.
511 message: String,
512 },
513 /// A comment for debugging purposes.
514 Comment {
515 /// Human-readable message.
516 message: String,
517 },
518 /// Raw data returned to the client.
519 Data {
520 /// A chunk of raw data.
521 ///
522 /// Consecutive `Data` responses must be joined.
523 partial: Vec<u8>,
524 },
525 /// Request for information from the client.
526 Inquire {
527 /// The subject of the inquiry.
528 keyword: String,
529 /// Optional parameters.
530 parameters: Option<Vec<u8>>,
531 },
532}
533
534impl Response {
535 /// Parses the given response.
536 pub fn parse(b: &[u8]) -> Result<Response> {
537 match self::grammar::ResponseParser::new().parse(lexer::Lexer::new(b)) {
538 Ok(r) => Ok(r),
539 Err(err) => {
540 let mut msg = Vec::new();
541 writeln!(&mut msg, "Parsing: {:?}: {:?}", b, err)?;
542 if let ParseError::UnrecognizedToken {
543 token: (start, _, end), ..
544 } = err
545 {
546 writeln!(&mut msg, "Context:")?;
547 let chars = b.iter().enumerate()
548 .filter_map(|(i, c)| {
549 if cmp::max(8, start) - 8 <= i
550 && i <= end + 8
551 {
552 Some((i, c))
553 } else {
554 None
555 }
556 });
557 for (i, c) in chars {
558 writeln!(&mut msg, "{} {} {}: {:?}",
559 if i == start { "*" } else { " " },
560 i,
561 *c as char,
562 c)?;
563 }
564 }
565 Err(anyhow::anyhow!(
566 String::from_utf8_lossy(&msg).to_string()).into())
567 },
568 }
569 }
570
571 /// Returns true if this message indicates success.
572 pub fn is_ok(&self) -> bool {
573 matches!(self, Response::Ok { .. } )
574 }
575
576 /// Returns true if this message indicates an error.
577 pub fn is_err(&self) -> bool {
578 matches!(self, Response::Error { .. })
579 }
580
581 /// Returns true if this message is an inquiry.
582 pub fn is_inquire(&self) -> bool {
583 matches!(self, Response::Inquire { .. })
584 }
585
586 /// Returns true if this response concludes the server's response.
587 pub fn is_done(&self) -> bool {
588 // All server responses end in either OK or ERR.
589 self.is_ok() || self.is_err()
590 // However, the server may inquire more
591 // information. We also surrender control to the
592 // caller by yielding the responses we have seen
593 // so far, and allow her to respond to the
594 // inquiry.
595 || self.is_inquire()
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
604 fn basics() {
605 assert_eq!(
606 Response::parse(b"OK Pleased to meet you, process 7745")
607 .unwrap(),
608 Response::Ok {
609 message: Some("Pleased to meet you, process 7745".into()),
610 });
611 assert_eq!(
612 Response::parse(b"ERR 67109139 Unknown IPC command <GPG Agent>")
613 .unwrap(),
614 Response::Error {
615 code: 67109139,
616 message :Some("Unknown IPC command <GPG Agent>".into()),
617 });
618
619 let status =
620 b"S KEYINFO 151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -";
621 assert_eq!(
622 Response::parse(status).unwrap(),
623 Response::Status {
624 keyword: "KEYINFO".into(),
625 message:
626 "151BCDB0C293927B7E36660BE47F28DA8729BD19 D - - - C - - -"
627 .into(),
628 });
629
630 assert_eq!(
631 Response::parse(b"D (7:sig-val(3:rsa(1:s1:%25%0D)))")
632 .unwrap(),
633 Response::Data {
634 partial: b"(7:sig-val(3:rsa(1:s1:%\x0d)))".to_vec(),
635 });
636
637 assert_eq!(
638 Response::parse(b"INQUIRE CIPHERTEXT")
639 .unwrap(),
640 Response::Inquire {
641 keyword: "CIPHERTEXT".into(),
642 parameters: None,
643 });
644 }
645}