Skip to main content

sequoia_ipc/
lib.rs

1//! IPC mechanisms for Sequoia.
2//!
3//! This crate implements IPC mechanisms to communicate with Sequoia
4//! services.
5//!
6//! # Rationale
7//!
8//! Sequoia makes use of background services e.g. for managing and
9//! updating public keys.
10//!
11//! # Design
12//!
13//! We use the filesystem as namespace to discover services.  Every
14//! service has a file called rendezvous point.  Access to this file
15//! is serialized using file locking.  This file contains a socket
16//! address and a cookie that we use to connect to the server and
17//! authenticate us.  If the file does not exist, is malformed, or
18//! does not point to a usable server, we start a new one on demand.
19//!
20//! This design mimics Unix sockets, but works on Windows too.
21//!
22//! # External vs internal servers
23//!
24//! These servers can be either in external processes, or co-located
25//! within the current process.  We will first start an external
26//! process, and fall back to starting a thread instead.
27//!
28//! Using an external process is the preferred option.  It allows us
29//! to continuously update the keys in the keystore, for example.  It
30//! also means that we do not spawn a thread in your process, which is
31//! frowned upon for various reasons.
32//!
33//! Please see [`IPCPolicy`] for more information.
34
35#![doc(html_favicon_url = "https://docs.sequoia-pgp.org/favicon.png")]
36#![doc(html_logo_url = "https://docs.sequoia-pgp.org/logo.svg")]
37#![warn(missing_docs)]
38
39use std::fs;
40use std::io::{self, Read, Seek, Write};
41use std::net::{Ipv4Addr, SocketAddr, TcpStream, TcpListener};
42use std::path::Path;
43use std::path::PathBuf;
44use std::thread::JoinHandle;
45
46use anyhow::anyhow;
47use anyhow::Context as _;
48
49use fs2::FileExt;
50
51use capnp_rpc::{RpcSystem, twoparty};
52use capnp_rpc::rpc_twoparty_capnp::Side;
53pub use capnp_rpc as capnp_rpc;
54
55#[cfg(unix)]
56use std::os::unix::{io::{IntoRawFd, FromRawFd}, fs::OpenOptionsExt};
57#[cfg(windows)]
58use std::os::windows::io::{AsRawSocket, IntoRawSocket, FromRawSocket};
59#[cfg(windows)]
60use winapi::um::winsock2;
61
62use std::process::{Command, Stdio};
63use std::thread;
64
65use sequoia_openpgp as openpgp;
66use openpgp::crypto::mem::Encrypted;
67use openpgp::crypto::mem::Protected;
68use openpgp::crypto::random;
69
70#[macro_use] mod macros;
71pub mod keybox;
72mod keygrip;
73pub use self::keygrip::Keygrip;
74pub mod sexp;
75mod core;
76pub use crate::core::{Config, Context, IPCPolicy};
77
78#[cfg(test)]
79mod tests;
80
81/// Servers need to implement this trait.
82pub trait Handler {
83    /// Called on every connection.
84    fn handle(&self,
85              network: capnp_rpc::twoparty::VatNetwork<tokio_util::compat::Compat<tokio::net::tcp::OwnedReadHalf>>)
86              -> RpcSystem<Side>;
87}
88
89/// A factory for handlers.
90pub type HandlerFactory = fn(
91    descriptor: Descriptor,
92    local: &tokio::task::LocalSet
93) -> Result<Box<dyn Handler>>;
94
95/// A descriptor is used to connect to a service.
96#[derive(Clone)]
97pub struct Descriptor {
98    ctx: core::Context,
99    rendezvous: PathBuf,
100    executable: PathBuf,
101    factory: HandlerFactory,
102}
103
104impl std::fmt::Debug for Descriptor {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("Descriptor")
107            .field("rendezvous", &self.rendezvous)
108            .field("executable", &self.executable)
109            .finish()
110    }
111}
112
113impl Descriptor {
114    /// Create a descriptor given its rendez-vous point, the path to
115    /// the servers executable file, and a handler factory.
116    pub fn new(ctx: &core::Context, rendezvous: PathBuf,
117               executable: PathBuf, factory: HandlerFactory)
118               -> Self {
119        Descriptor {
120            ctx: ctx.clone(),
121            rendezvous,
122            executable,
123            factory,
124        }
125    }
126
127    /// Returns the context.
128    pub fn context(&self) -> &core::Context {
129        &self.ctx
130    }
131
132    /// Returns the rendez-vous point.
133    pub fn rendez_vous(&self) -> &Path {
134        &self.rendezvous
135    }
136
137    /// Connects to a descriptor, starting the server if necessary.
138    ///
139    /// # Panic
140    /// This will panic if called outside of the Tokio runtime context. See
141    /// See [`Handle::enter`] for more details.
142    ///
143    /// [`Handle::enter`]: tokio::runtime::Handle::enter()
144    pub fn connect(&self) -> Result<RpcSystem<Side>> {
145        self.connect_with_policy(*self.ctx.ipc_policy())
146    }
147
148    /// Connects to a descriptor, starting the server if necessary.
149    ///
150    /// This function does not use the context's IPC policy, but uses
151    /// the given one.
152    ///
153    /// # Panic
154    /// This will panic if called outside of the Tokio runtime context. See
155    /// See [`Handle::enter`] for more details.
156    ///
157    /// [`Handle::enter`]: tokio::runtime::Handle::enter()
158    pub fn connect_with_policy(&self, policy: core::IPCPolicy)
159                   -> Result<RpcSystem<Side>> {
160        let do_connect = |cookie: Cookie, mut s: TcpStream| {
161            cookie.send(&mut s)?;
162
163            /* Tokioize.  */
164            s.set_nonblocking(true)?;
165            let stream = tokio::net::TcpStream::from_std(s)?;
166            stream.set_nodelay(true)?;
167
168            let (reader, writer) = stream.into_split();
169            use tokio_util::compat::TokioAsyncReadCompatExt;
170            use tokio_util::compat::TokioAsyncWriteCompatExt;
171            let (reader, writer) = (reader.compat(), writer.compat_write());
172
173            let network =
174                Box::new(twoparty::VatNetwork::new(reader, writer,
175                                                   Side::Client,
176                                                   Default::default()));
177
178            Ok(RpcSystem::new(network, None))
179        };
180
181        fs::create_dir_all(self.ctx.home())?;
182
183        let mut file = CookieFile::open(&self.rendezvous)?;
184
185        if let Some((cookie, rest)) = file.read()? {
186            let stream = String::from_utf8(rest).map_err(drop)
187                .and_then(|rest| rest.parse::<SocketAddr>().map_err(drop))
188                .and_then(|addr| TcpStream::connect(addr).map_err(drop));
189
190            if let Ok(s) = stream {
191                do_connect(cookie, s)
192            } else {
193                /* Failed to connect.  Invalidate the cookie and try again.  */
194                file.clear()?;
195                drop(file);
196                self.connect()
197            }
198        } else {
199            let cookie = Cookie::new()?;
200
201            let (addr, external, _join_handle) = match policy {
202                core::IPCPolicy::Internal => self.start(false)?,
203                core::IPCPolicy::External => self.start(true)?,
204                core::IPCPolicy::Robust => self.start(true)
205                    .or_else(|_| self.start(false))?
206            };
207
208            /* XXX: It'd be nice not to waste this connection.  */
209            cookie.send(&mut TcpStream::connect(addr)?)?;
210
211            if external {
212                /* Write connection information to file.  */
213                file.write(&cookie, format!("{}", addr).as_bytes())?;
214            }
215            drop(file);
216
217            do_connect(cookie, TcpStream::connect(addr)?)
218        }
219    }
220
221    /// Start the service, either as an external process or as a
222    /// thread.
223    fn start(&self, external: bool)
224        -> Result<(SocketAddr, bool, Option<JoinHandle<Result<()>>>)>
225    {
226        let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
227        let addr = listener.local_addr()?;
228
229        /* Start the server, connect to it, and send the cookie.  */
230        let join_handle: Option<JoinHandle<Result<()>>> = if external {
231            self.fork(listener)?;
232            None
233        } else {
234            Some(self.spawn(listener)?)
235        };
236
237        Ok((addr, external, join_handle))
238    }
239
240    fn fork(&self, listener: TcpListener) -> Result<()> {
241        let mut cmd = new_background_command(&self.executable);
242        cmd
243            .arg("--home")
244            .arg(self.ctx.home())
245            .arg("--lib")
246            .arg(self.ctx.lib())
247            .arg("--ephemeral")
248            .arg(self.ctx.ephemeral().to_string())
249            .arg("--socket").arg("0")
250            .stdout(Stdio::null())
251            .stderr(Stdio::null());
252
253        platform! {
254            unix => {
255                // Pass the listening TCP socket as child stdin.
256                cmd.stdin(unsafe { Stdio::from_raw_fd(listener.into_raw_fd()) });
257            },
258            windows => {
259                // Sockets for `TcpListener` are not inheritable by default, so
260                // let's make them so, since we'll pass them to a child process.
261                unsafe {
262                    match winapi::um::handleapi::SetHandleInformation(
263                        listener.as_raw_socket() as _,
264                        winapi::um::winbase::HANDLE_FLAG_INHERIT,
265                        winapi::um::winbase::HANDLE_FLAG_INHERIT,
266                    ) {
267                        0 => Err(std::io::Error::last_os_error()),
268                        _ => Ok(())
269                    }?
270                };
271                // We can't pass the socket to stdin directly on Windows, since
272                // non-overlapped (blocking) I/O handles can be redirected there.
273                // We use Tokio (async I/O), so we just pass it via env var rather
274                // than establishing a separate channel to pass the socket through.
275                cmd.env("SOCKET", format!("{}", listener.into_raw_socket()));
276            }
277        }
278
279        cmd.spawn()?;
280        Ok(())
281    }
282
283    fn spawn(&self, l: TcpListener) -> Result<JoinHandle<Result<()>>> {
284        let descriptor = self.clone();
285        let join_handle = thread::spawn(move || -> Result<()> {
286            Server::new(descriptor)
287                .with_context(|| "Failed to spawn server".to_string())?
288                .serve_listener(l)
289                .with_context(|| "Failed to spawn server".to_string())?;
290            Ok(())
291        });
292
293        Ok(join_handle)
294    }
295
296    /// Turn this process into a server.
297    ///
298    /// This checks if a server is running.  If not, it turns the
299    /// current process into a server.
300    ///
301    /// This function is for servers trying to start themselves.
302    /// Normally, servers are started by clients on demand.  A client
303    /// should never call this function.
304    pub fn bootstrap(&mut self) -> Result<Option<JoinHandle<Result<()>>>> {
305        let mut file = CookieFile::open(&self.rendezvous)?;
306
307        // Try to connect to the server.  If it is already running,
308        // we're done.
309        if let Some((cookie, rest)) = file.read()? {
310            if let Ok(addr) = String::from_utf8(rest).map_err(drop)
311                .and_then(|rest| rest.parse::<SocketAddr>().map_err(drop))
312            {
313                let stream = TcpStream::connect(&addr).map_err(drop);
314
315                if let Ok(mut s) = stream {
316                    if let Ok(()) = cookie.send(&mut s) {
317                        // There's already a server running.
318                        return Ok(None);
319                    }
320                }
321            }
322        }
323
324        // Create a new cookie.
325        let cookie = Cookie::new()?;
326
327        // Start an *internal* server.
328        let (addr, _external, join_handle) = self.start(false)?;
329        let join_handle = join_handle
330            .expect("start returns the join handle for in-process servers");
331
332        file.write(&cookie, format!("{}", addr).as_bytes())?;
333        // Release the lock.
334        drop(file);
335
336        // Send the cookie to the server.
337        let mut s = TcpStream::connect(addr)?;
338        cookie.send(&mut s)?;
339
340        Ok(Some(join_handle))
341    }
342}
343
344/// A server.
345pub struct Server {
346    runtime: tokio::runtime::Runtime,
347    descriptor: Descriptor,
348}
349
350impl Server {
351    /// Creates a new server for the descriptor.
352    pub fn new(descriptor: Descriptor) -> Result<Self> {
353        Ok(Server {
354            runtime: tokio::runtime::Runtime::new()?,
355            descriptor,
356        })
357    }
358
359    /// Creates a Context from `env::args()`.
360    pub fn context() -> Result<core::Context> {
361        use std::env::args;
362        let args: Vec<String> = args().collect();
363
364        if args.len() != 7 || args[1] != "--home"
365            || args[3] != "--lib" || args[5] != "--ephemeral" {
366                return Err(anyhow!(
367                    "Usage: {} --home <HOMEDIR> --lib <LIBDIR> \
368                     --ephemeral true|false", args[0]));
369            }
370
371        let mut cfg = core::Context::configure()
372            .home(&args[2]).lib(&args[4]);
373
374        if let Ok(ephemeral) = args[6].parse() {
375            if ephemeral {
376                cfg.set_ephemeral();
377            }
378        } else {
379            return Err(anyhow!(
380                "Expected 'true' or 'false' for --ephemeral, got: {}",
381                args[6]));
382        }
383
384        cfg.build()
385    }
386
387    /// Turns this process into a server.
388    ///
389    /// External servers must call this early on.
390    ///
391    /// On Linux expects 'stdin' to be a listening TCP socket.
392    /// On Windows this expects `SOCKET` env var to be set to a listening socket
393    /// of the Windows Sockets API `SOCKET` value.
394    pub fn serve(&mut self) -> Result<()> {
395        let listener = platform! {
396            unix => unsafe { TcpListener::from_raw_fd(0) },
397            windows => {
398                let socket = std::env::var("SOCKET")?.parse()?;
399                unsafe { TcpListener::from_raw_socket(socket) }
400            }
401        };
402        self.serve_listener(listener)
403    }
404
405    fn serve_listener(&mut self, l: TcpListener) -> Result<()> {
406        // The protocol is:
407        //
408        // - The first client exclusively locks the cookie file.
409        //
410        // - The client allocates a TCP socket, and generates a
411        //   cookie.
412        //
413        // - The client starts the server, and passes the listener to
414        //   it.
415        //
416        // - The client connects to the server via the socket, and
417        //   sends it the cookie.
418        //
419        // - The client drops the connection and unlocks the cookie
420        //   file thereby allowing other clients to connect.
421        //
422        // - The server waits for the cookie on the first connection.
423        //
424        // - The server starts serving clients.
425        //
426        // Note: this initial connection cannot (currently) be used
427        // for executing RPCs; the server closes it immediately after
428        // receiving the cookie.
429
430        // The first client sends us the cookie.
431        let cookie = {
432            let mut i = l.accept()?;
433            Cookie::receive(&mut i.0)?
434        };
435
436        /* Tokioize.  */
437        let local = tokio::task::LocalSet::new();
438        let handler = (self.descriptor.factory)(self.descriptor.clone(), &local)?;
439
440        let server = async move {
441            l.set_nonblocking(true)?;
442            let socket = tokio::net::TcpListener::from_std(l).unwrap();
443
444            loop {
445                let (mut socket, _) = socket.accept().await?;
446
447                let _ = socket.set_nodelay(true);
448                let received_cookie = match Cookie::receive_async(&mut socket).await {
449                    Err(_) => continue, // XXX: Log the error?
450                    Ok(received_cookie) => received_cookie,
451                };
452                if received_cookie != cookie {
453                    continue;   // XXX: Log the error?
454                }
455
456                let (reader, writer) = socket.into_split();
457
458                use tokio_util::compat::TokioAsyncReadCompatExt;
459                use tokio_util::compat::TokioAsyncWriteCompatExt;
460                let (reader, writer) = (reader.compat(), writer.compat_write());
461
462                let network =
463                    twoparty::VatNetwork::new(reader, writer,
464                                            Side::Server, Default::default());
465
466                let rpc_system = handler.handle(network);
467                let _ = tokio::task::spawn_local(rpc_system).await;
468            }
469        };
470
471        local.block_on(&self.runtime, server)
472    }
473}
474
475/// Cookies are used to authenticate clients.
476struct Cookie(Encrypted);
477
478impl Cookie {
479    const SIZE: usize = 32;
480
481    /// Make a new cookie.
482    fn new() -> Result<Self> {
483        let mut c = Protected::new(Cookie::SIZE);
484        random(&mut c)
485            .context("Generating authentication token")?;
486        Ok(Cookie(Encrypted::new(c)?))
487    }
488
489    /// Make a new cookie from a slice.
490    fn from(buf: &[u8]) -> Result<Option<Self>> {
491        if buf.len() == Cookie::SIZE {
492            let c = Protected::from(buf);
493            Ok(Some(Cookie(Encrypted::new(c)?)))
494        } else {
495            Ok(None)
496        }
497    }
498
499    /// Given a vector starting with a cookie, extract it and return
500    /// the rest.
501    fn extract(mut buf: Vec<u8>) -> Result<Option<(Self, Vec<u8>)>> {
502        if buf.len() >= Cookie::SIZE {
503            let r = buf.split_off(Cookie::SIZE);
504            Ok(Some((Cookie(Encrypted::new(Protected::from(buf))?), r)))
505        } else {
506            Ok(None)
507        }
508    }
509
510    /// Read a cookie from 'from'.
511    fn receive<R: Read>(from: &mut R) -> Result<Self> {
512        let mut buf = Protected::new(Cookie::SIZE);
513        from.read_exact(&mut buf)?;
514        Ok(Cookie(Encrypted::new(buf)?))
515    }
516
517    /// Asynchronously read a cookie from 'socket'.
518    async fn receive_async(socket: &mut tokio::net::TcpStream) -> io::Result<Cookie> {
519        use tokio::io::AsyncReadExt;
520
521        let mut buf = vec![0; Cookie::SIZE];
522        socket.read_exact(&mut buf).await?;
523        Ok(Cookie::from(&buf)
524           .map_err(|err| {
525               std::io::Error::new(std::io::ErrorKind::Other, err)
526           })?
527           .expect("enough bytes read"))
528    }
529
530
531    /// Write a cookie to 'to'.
532    fn send<W: Write>(&self, to: &mut W) -> io::Result<()> {
533        self.0.map(|cookie| to.write_all(cookie))
534    }
535}
536
537impl PartialEq for Cookie {
538    fn eq(&self, other: &Cookie) -> bool {
539        self.0.map(|a| {
540            other.0.map(|b| {
541                a == b
542            })
543        })
544    }
545}
546
547/// Wraps a cookie file.
548struct CookieFile {
549    path: PathBuf,
550    file: fs::File,
551}
552
553impl CookieFile {
554    /// Opens the specified cookie.
555    ///
556    /// The file is opened, and immediately locked.  (The lock is
557    /// dropped when the file is closed.)
558    fn open(path: &Path) -> Result<CookieFile> {
559        if let Some(parent) = path.parent() {
560            fs::create_dir_all(parent)
561                .with_context(|| format!("Creating {}", parent.display()))?;
562        }
563
564        let mut file = fs::OpenOptions::new();
565        file
566            .read(true)
567            .write(true)
568            .create(true);
569        #[cfg(unix)]
570        file.mode(0o600);
571        let file = file.open(path)
572            .with_context(|| format!("Opening {}", path.display()))?;
573        file.lock_exclusive()
574            .with_context(|| format!("Locking {}", path.display()))?;
575
576        Ok(Self {
577            path: path.to_path_buf(),
578            file,
579        })
580    }
581
582    /// Reads the cookie file.
583    ///
584    /// If the file contains a cookie, returns it and any other data.
585    ///
586    /// Returns `None` if the file does not contain a cookie.
587    fn read(&mut self) -> Result<Option<(Cookie, Vec<u8>)>> {
588        let mut content = vec![];
589        self.file.read_to_end(&mut content)
590            .with_context(|| format!("Opening {}", self.path.display()))?;
591        Ok(Cookie::extract(content)?)
592    }
593
594    /// Writes the specified cookie to the cookie file followed by the
595    /// specified data.
596    ///
597    /// The contents of the cookie file are replaced.
598    fn write(&mut self, cookie: &Cookie, data: &[u8]) -> Result<()> {
599        self.file.rewind()
600            .with_context(|| format!("Rewinding {}", self.path.display()))?;
601        self.file.set_len(0)
602            .with_context(|| format!("Truncating {}", self.path.display()))?;
603        cookie.0.map(|cookie| {
604            self.file.write_all(cookie)
605                .with_context(|| format!("Updating {}", self.path.display()))
606        })?;
607        self.file.write_all(data)
608            .with_context(|| format!("Updating {}", self.path.display()))?;
609
610        Ok(())
611    }
612
613    /// Clears the cookie file.
614    ///
615    /// The cookie file is truncated.
616    fn clear(&mut self) -> Result<()> {
617        self.file.set_len(0)
618            .with_context(|| format!("Truncating {}", self.path.display()))?;
619        Ok(())
620    }
621}
622
623#[derive(thiserror::Error, Debug)]
624/// Errors returned from the network routines.
625pub enum Error {
626    /// Connection closed unexpectedly.
627    #[error("Connection closed unexpectedly.")]
628    ConnectionClosed(Vec<u8>),
629}
630
631/// Result type specialization.
632pub type Result<T> = ::std::result::Result<T, anyhow::Error>;
633
634// Global initialization and cleanup of the Windows Sockets API (WSA) module.
635// NOTE: This has to be top-level in order for `ctor::{ctor, dtor}` to work.
636#[cfg(windows)]
637use std::sync::atomic::{AtomicBool, Ordering};
638#[cfg(windows)]
639static WSA_INITED: AtomicBool = AtomicBool::new(false);
640
641#[cfg(windows)]
642#[ctor::ctor]
643fn wsa_startup() {
644    unsafe {
645        let ret = winsock2::WSAStartup(
646            0x202, // version 2.2
647            &mut std::mem::zeroed(),
648        );
649        WSA_INITED.store(ret != 0, Ordering::SeqCst);
650    }
651}
652
653#[cfg(windows)]
654#[ctor::dtor]
655fn wsa_cleanup() {
656    if WSA_INITED.load(Ordering::SeqCst) {
657        let _ = unsafe { winsock2::WSACleanup() };
658    }
659}
660
661pub(crate) fn new_background_command<S>(program: S) -> Command
662where
663    S: AsRef<std::ffi::OsStr>,
664{
665    let command = Command::new(program);
666
667    #[cfg(windows)]
668    let command = {
669        use std::os::windows::process::CommandExt;
670
671        // see https://docs.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
672        const CREATE_NO_WINDOW: u32 = 0x08000000;
673        let mut command = command;
674        command.creation_flags(CREATE_NO_WINDOW);
675        command
676    };
677
678    command
679}