capnp_rpc/
lib.rs

1// Copyright (c) 2013-2017 Sandstorm Development Group, Inc. and contributors
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE.
20
21//! An implementation of the [Cap'n Proto remote procedure call](https://capnproto.org/rpc.html)
22//! protocol. Includes all [Level 1](https://capnproto.org/rpc.html#protocol-features) features.
23//!
24//! # Example
25//!
26//! ```capnp
27//! # Cap'n Proto schema
28//! interface Foo {
29//!     identity @0 (x: UInt32) -> (y: UInt32);
30//! }
31//! ```
32//!
33//! ```ignore
34//! // Rust server defining an implementation of Foo.
35//! struct FooImpl;
36//! impl foo::Server for FooImpl {
37//!     async fn identity(
38//!         self: Rc<Self>,
39//!         params: foo::IdentityParams,
40//!         mut results: foo::IdentityResults
41//!     ) -> Result<(), ::capnp::Error> {
42//!         let x = params.get()?.get_x();
43//!         results.get().set_y(x);
44//!         Ok(())
45//!     }
46//! }
47//! ```
48//!
49//! ```ignore
50//! // Rust client calling a remote implementation of Foo.
51//! let mut request = foo_client.identity_request();
52//! request.get().set_x(123);
53//! let promise = request.send().promise.and_then(|response| {
54//!     println!("results = {}", response.get()?.get_y());
55//!     Ok(())
56//! });
57//! ```
58//!
59//! For a more complete example, see <https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/calculator>
60
61use capnp::capability::Promise;
62use capnp::private::capability::ClientHook;
63use capnp::Error;
64use futures::channel::oneshot;
65use futures::{Future, FutureExt, TryFutureExt};
66use std::cell::RefCell;
67use std::pin::Pin;
68use std::rc::{Rc, Weak};
69use std::task::{Context, Poll};
70
71pub use crate::rpc::Disconnector;
72use crate::task_set::TaskSet;
73
74pub use crate::reconnect::{auto_reconnect, lazy_auto_reconnect, SetTarget};
75
76/// Code generated from
77/// [rpc.capnp](https://github.com/capnproto/capnproto/blob/master/c%2B%2B/src/capnp/rpc.capnp).
78pub mod rpc_capnp;
79
80/// Code generated from
81/// [rpc-twoparty.capnp](https://github.com/capnproto/capnproto/blob/master/c%2B%2B/src/capnp/rpc-twoparty.capnp).
82pub mod rpc_twoparty_capnp;
83
84/// Like [`try!()`], but for functions that return a [`Promise<T, E>`] rather than a [`Result<T, E>`].
85///
86/// Unwraps a `Result<T, E>`. In the case of an error `Err(e)`, immediately returns from the
87/// enclosing function with `Promise::err(e)`.
88#[macro_export]
89macro_rules! pry {
90    ($expr:expr) => {
91        match $expr {
92            ::std::result::Result::Ok(val) => val,
93            ::std::result::Result::Err(err) => {
94                return ::capnp::capability::Promise::err(::std::convert::From::from(err))
95            }
96        }
97    };
98}
99
100mod attach;
101mod broken;
102mod flow_control;
103mod local;
104mod queued;
105mod reconnect;
106mod rpc;
107mod sender_queue;
108mod split;
109mod task_set;
110pub mod twoparty;
111
112use capnp::message;
113
114/// A message to be sent by a [`VatNetwork`].
115pub trait OutgoingMessage {
116    /// Gets the message body, which the caller may fill in any way it wants.
117    ///
118    /// The standard RPC implementation initializes it as a Message as defined
119    /// in `schema/rpc.capnp`.
120    fn get_body(&mut self) -> ::capnp::Result<::capnp::any_pointer::Builder<'_>>;
121
122    /// Same as `get_body()`, but returns the corresponding reader type.
123    fn get_body_as_reader(&self) -> ::capnp::Result<::capnp::any_pointer::Reader<'_>>;
124
125    /// Sends the message. Returns a promise that resolves once the send has completed.
126    /// Dropping the returned promise does *not* cancel the send.
127    fn send(
128        self: Box<Self>,
129    ) -> (
130        Promise<(), Error>,
131        Rc<message::Builder<message::HeapAllocator>>,
132    );
133
134    /// Takes the inner message out of `self`.
135    fn take(self: Box<Self>) -> ::capnp::message::Builder<::capnp::message::HeapAllocator>;
136
137    /// Gets the total size of the message, for flow control purposes. Although the caller
138    /// could also call get_body().target_size(), doing that would walk the message tree,
139    /// whereas typical implementations can compute the size more cheaply by summing
140    /// segment sizes.
141    fn size_in_words(&self) -> usize;
142}
143
144/// A message received from a [`VatNetwork`].
145pub trait IncomingMessage {
146    /// Gets the message body, to be interpreted by the caller.
147    ///
148    /// The standard RPC implementation interprets it as a Message as defined
149    /// in `schema/rpc.capnp`.
150    fn get_body(&self) -> ::capnp::Result<::capnp::any_pointer::Reader<'_>>;
151}
152
153/// A two-way RPC connection.
154///
155/// A connection can be created by [`VatNetwork::connect()`].
156pub trait Connection<VatId> {
157    /// Returns the connected vat's authenticated VatId.  It is the VatNetwork's
158    /// responsibility to authenticate this, so that the caller can be assured
159    /// that they are really talking to the identified vat and not an imposter.
160    fn get_peer_vat_id(&self) -> VatId;
161
162    /// Allocates a new message to be sent on this connection.
163    ///
164    /// If `first_segment_word_size` is non-zero, it should be treated as a
165    /// hint suggesting how large to make the first segment.  This is entirely
166    /// a hint and the connection may adjust it up or down.  If it is zero,
167    /// the connection should choose the size itself.
168    fn new_outgoing_message(&mut self, first_segment_word_size: u32) -> Box<dyn OutgoingMessage>;
169
170    /// Waits for a message to be received and returns it.  If the read stream cleanly terminates,
171    /// returns None. If any other problem occurs, returns an Error.
172    fn receive_incoming_message(&mut self) -> Promise<Option<Box<dyn IncomingMessage>>, Error>;
173
174    /// Constructs a flow controller for a new stream on this connection.
175    ///
176    /// Returns (fc, p), where fc is the new flow controller and p is a promise
177    /// that must be polled in order to drive the flow controller.
178    fn new_stream(&mut self) -> (Box<dyn FlowController>, Promise<(), Error>) {
179        let (fc, f) = crate::flow_control::FixedWindowFlowController::new(
180            crate::flow_control::DEFAULT_WINDOW_SIZE,
181        );
182        (Box::new(fc), f)
183    }
184
185    /// Waits until all outgoing messages have been sent, then shuts down the outgoing stream. The
186    /// returned promise resolves after shutdown is complete.
187    fn shutdown(&mut self, result: ::capnp::Result<()>) -> Promise<(), Error>;
188}
189
190/// Tracks a particular RPC stream in order to implement a flow control algorithm.
191pub trait FlowController {
192    fn send(
193        &mut self,
194        message: Box<dyn OutgoingMessage>,
195        ack: Promise<(), Error>,
196    ) -> Promise<(), Error>;
197    fn wait_all_acked(&mut self) -> Promise<(), Error>;
198}
199
200/// Network facility between vats, it determines how to form connections between
201/// vats.
202///
203/// ## Vat
204///
205/// Cap'n Proto RPC operates between vats, where a "vat" is some sort of host of
206/// objects.  Typically one Cap'n Proto process (in the Unix sense) is one vat.
207pub trait VatNetwork<VatId> {
208    /// Connects to `host_id`.
209    ///
210    /// Returns None if `host_id` refers to the local vat.
211    fn connect(&mut self, host_id: VatId) -> Option<Box<dyn Connection<VatId>>>;
212
213    /// Waits for the next incoming connection and return it.
214    fn accept(&mut self) -> Promise<Box<dyn Connection<VatId>>, ::capnp::Error>;
215
216    /// A promise that cannot be resolved until the shutdown.
217    fn drive_until_shutdown(&mut self) -> Promise<(), Error>;
218}
219
220/// A portal to objects available on the network.
221///
222/// The RPC implementation sits on top of an implementation of [`VatNetwork`], which
223/// determines how to form connections between vats. The RPC implementation determines
224/// how to use such connections to manage object references and make method calls.
225///
226/// At the moment, this is all rather more general than it needs to be, because the only
227/// implementation of `VatNetwork` is [`twoparty::VatNetwork`]. However, eventually we
228/// will need to have more sophisticated `VatNetwork` implementations, in order to support
229/// [level 3](https://capnproto.org/rpc.html#protocol-features) features.
230///
231/// An `RpcSystem` is a non-`Send`able `Future` and needs to be driven by a task
232/// executor. A common way accomplish that is to pass the `RpcSystem` to
233/// `tokio::task::spawn_local()`.
234#[must_use = "futures do nothing unless polled"]
235pub struct RpcSystem<VatId>
236where
237    VatId: 'static,
238{
239    network: Box<dyn crate::VatNetwork<VatId>>,
240
241    bootstrap_cap: Box<dyn ClientHook>,
242
243    // XXX To handle three or more party networks, this should be a map from connection pointers
244    // to connection states.
245    connection_state: Rc<RefCell<Option<Rc<rpc::ConnectionState<VatId>>>>>,
246
247    tasks: TaskSet<Error>,
248    handle: crate::task_set::TaskSetHandle<Error>,
249}
250
251impl<VatId> RpcSystem<VatId> {
252    /// Constructs a new `RpcSystem` with the given network and bootstrap capability.
253    pub fn new(
254        mut network: Box<dyn crate::VatNetwork<VatId>>,
255        bootstrap: Option<::capnp::capability::Client>,
256    ) -> Self {
257        let bootstrap_cap = match bootstrap {
258            Some(cap) => cap.hook,
259            None => broken::new_cap(Error::failed("no bootstrap capability".to_string())),
260        };
261        let (mut handle, tasks) = TaskSet::new(Box::new(SystemTaskReaper));
262
263        let mut handle1 = handle.clone();
264        handle.add(network.drive_until_shutdown().then(move |r| {
265            let r = match r {
266                Ok(()) => Ok(()),
267                Err(e) => {
268                    if e.kind != ::capnp::ErrorKind::Disconnected {
269                        // Don't report disconnects as an error.
270                        Err(e)
271                    } else {
272                        Ok(())
273                    }
274                }
275            };
276
277            handle1.terminate(r);
278            Promise::ok(())
279        }));
280
281        let mut result = Self {
282            network,
283            bootstrap_cap,
284            connection_state: Rc::new(RefCell::new(None)),
285
286            tasks,
287            handle: handle.clone(),
288        };
289
290        let accept_loop = result.accept_loop();
291        handle.add(accept_loop);
292        result
293    }
294
295    /// Connects to the given vat and returns its bootstrap interface, returns
296    /// a client that can be used to invoke the bootstrap interface.
297    pub fn bootstrap<T>(&mut self, vat_id: VatId) -> T
298    where
299        T: ::capnp::capability::FromClientHook,
300    {
301        let Some(connection) = self.network.connect(vat_id) else {
302            return T::new(self.bootstrap_cap.clone());
303        };
304        let connection_state = Self::get_connection_state(
305            &self.connection_state,
306            self.bootstrap_cap.clone(),
307            connection,
308            self.handle.clone(),
309        );
310
311        let hook = rpc::ConnectionState::bootstrap(&connection_state);
312        T::new(hook)
313    }
314
315    // not really a loop, because it doesn't need to be for the two party case
316    fn accept_loop(&mut self) -> Promise<(), Error> {
317        let connection_state_ref = self.connection_state.clone();
318        let bootstrap_cap = self.bootstrap_cap.clone();
319        let handle = self.handle.clone();
320        Promise::from_future(self.network.accept().map_ok(move |connection| {
321            Self::get_connection_state(&connection_state_ref, bootstrap_cap, connection, handle);
322        }))
323    }
324
325    // If `connection_state_ref` is not already populated, populates it with a new
326    // `ConnectionState` built from a local bootstrap capability and `connection`,
327    // spawning any background tasks onto `handle`. Returns the resulting value
328    // held in `connection_state_ref`.
329    fn get_connection_state(
330        connection_state_ref: &Rc<RefCell<Option<Rc<rpc::ConnectionState<VatId>>>>>,
331        bootstrap_cap: Box<dyn ClientHook>,
332        connection: Box<dyn crate::Connection<VatId>>,
333        mut handle: crate::task_set::TaskSetHandle<Error>,
334    ) -> Rc<rpc::ConnectionState<VatId>> {
335        // TODO this needs to be updated once we allow more general VatNetworks.
336        let (tasks, result) = match *connection_state_ref.borrow() {
337            Some(ref connection_state) => {
338                // return early.
339                return connection_state.clone();
340            }
341            None => {
342                let (on_disconnect_fulfiller, on_disconnect_promise) =
343                    oneshot::channel::<Promise<(), Error>>();
344                let connection_state_ref1 = connection_state_ref.clone();
345                handle.add(on_disconnect_promise.then(move |shutdown_promise| {
346                    *connection_state_ref1.borrow_mut() = None;
347                    match shutdown_promise {
348                        Ok(s) => s,
349                        Err(e) => Promise::err(Error::failed(format!("{e}"))),
350                    }
351                }));
352                rpc::ConnectionState::new(bootstrap_cap, connection, on_disconnect_fulfiller)
353            }
354        };
355        *connection_state_ref.borrow_mut() = Some(result.clone());
356        handle.add(tasks);
357        result
358    }
359
360    /// Returns a `Disconnector` future that can be run to cleanly close the connection to this `RpcSystem`'s network.
361    /// You should get the `Disconnector` before you spawn the `RpcSystem`.
362    pub fn get_disconnector(&self) -> rpc::Disconnector<VatId> {
363        rpc::Disconnector::new(self.connection_state.clone())
364    }
365}
366
367impl<VatId> Future for RpcSystem<VatId>
368where
369    VatId: 'static,
370{
371    type Output = Result<(), Error>;
372    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
373        Pin::new(&mut self.tasks).poll(cx)
374    }
375}
376
377/// Creates a new local RPC client of type `C` out of an object that implements a server trait `S`.
378pub fn new_client<C, S>(s: S) -> C
379where
380    C: capnp::capability::FromServer<S>,
381{
382    new_client_from_rc(Rc::new(s))
383}
384
385/// Variant of `new_client` that works on an `Rc<S>`.
386pub fn new_client_from_rc<C, S>(s: Rc<S>) -> C
387where
388    C: capnp::capability::FromServer<S>,
389{
390    capnp::capability::FromClientHook::new(Box::new(local::Client::new(
391        <C as capnp::capability::FromServer<S>>::from_server(s),
392    )))
393}
394
395/// Collection of unwrappable capabilities.
396///
397/// Allows a server to recognize its own capabilities when passed back to it, and obtain the
398/// underlying Server objects associated with them. Holds only weak references to Server objects
399/// allowing Server objects to be dropped when dropped by the remote client. Call the `gc` method
400/// to reclaim memory used for Server objects that have been dropped.
401pub struct CapabilityServerSet<S, C>
402where
403    C: capnp::capability::FromServer<S>,
404{
405    caps: std::collections::HashMap<usize, Weak<S>>,
406    marker: std::marker::PhantomData<C>,
407}
408
409impl<S, C> Default for CapabilityServerSet<S, C>
410where
411    C: capnp::capability::FromServer<S>,
412{
413    fn default() -> Self {
414        Self {
415            caps: std::default::Default::default(),
416            marker: std::marker::PhantomData,
417        }
418    }
419}
420
421impl<S, C> CapabilityServerSet<S, C>
422where
423    C: capnp::capability::FromServer<S>,
424{
425    pub fn new() -> Self {
426        Self::default()
427    }
428
429    /// Adds a new capability to the set and returns a client backed by it.
430    pub fn new_client(&mut self, s: S) -> C {
431        self.new_client_from_rc(Rc::new(s))
432    }
433
434    /// Variant of `new_client` that works on an `Rc<S>`.
435    pub fn new_client_from_rc(&mut self, rc: Rc<S>) -> C {
436        let weak = Rc::downgrade(&rc);
437        let ptr = Rc::as_ptr(&rc) as usize;
438        self.caps.insert(ptr, weak);
439
440        let dispatch = <C as capnp::capability::FromServer<S>>::from_server(rc);
441        capnp::capability::FromClientHook::new(Box::new(local::Client::new(dispatch)))
442    }
443
444    /// Looks up a capability and returns its underlying server object, if found.
445    /// Fully resolves the capability before looking it up.
446    pub async fn get_local_server(&self, client: &C) -> Option<Rc<S>>
447    where
448        C: capnp::capability::FromClientHook,
449    {
450        let resolved: C = capnp::capability::get_resolved_cap(
451            capnp::capability::FromClientHook::new(client.as_client_hook().add_ref()),
452        )
453        .await;
454        let hook = resolved.into_client_hook();
455        let ptr = hook.get_ptr();
456        self.caps.get(&ptr).and_then(|c| c.upgrade())
457    }
458
459    /// Looks up a capability and returns its underlying server object, if found.
460    /// Does *not* attempt to resolve the capability first, so you will usually want
461    /// to call `get_resolved_cap()` before calling this. The advantage of this method
462    /// over `get_local_server()` is that this one is synchronous and borrows `self`
463    /// over a shorter span (which can be very important if `self` is inside a `RefCell`).
464    pub fn get_local_server_of_resolved(&self, client: &C) -> Option<Rc<S>>
465    where
466        C: capnp::capability::FromClientHook,
467    {
468        let hook = client.as_client_hook();
469        let ptr = hook.get_ptr();
470        self.caps.get(&ptr).and_then(|c| c.upgrade())
471    }
472
473    /// Reclaim memory used for Server objects that no longer exist.
474    pub fn gc(&mut self) {
475        self.caps.retain(|_, c| c.strong_count() > 0);
476    }
477}
478
479/// Creates a `Client` from a future that resolves to a `Client`.
480///
481/// Any calls that arrive before the resolution are accumulated in a queue.
482pub fn new_future_client<T>(
483    client_future: impl ::futures::Future<Output = Result<T, Error>> + 'static,
484) -> T
485where
486    T: ::capnp::capability::FromClientHook,
487{
488    let mut queued_client = crate::queued::Client::new(None);
489    let weak_client = Rc::downgrade(&queued_client.inner);
490
491    queued_client.drive(client_future.then(move |r| {
492        if let Some(queued_inner) = weak_client.upgrade() {
493            crate::queued::ClientInner::resolve(&queued_inner, r.map(|c| c.into_client_hook()));
494        }
495        Promise::ok(())
496    }));
497
498    T::new(Box::new(queued_client))
499}
500
501struct SystemTaskReaper;
502impl crate::task_set::TaskReaper<Error> for SystemTaskReaper {
503    fn task_failed(&mut self, error: Error) {
504        println!("ERROR: {error}");
505    }
506}
507
508pub struct ImbuedMessageBuilder<A>
509where
510    A: ::capnp::message::Allocator,
511{
512    builder: ::capnp::message::Builder<A>,
513    cap_table: Vec<Option<Box<dyn ::capnp::private::capability::ClientHook>>>,
514}
515
516impl<A> ImbuedMessageBuilder<A>
517where
518    A: ::capnp::message::Allocator,
519{
520    pub fn new(allocator: A) -> Self {
521        Self {
522            builder: ::capnp::message::Builder::new(allocator),
523            cap_table: Vec::new(),
524        }
525    }
526
527    pub fn get_root<'a, T>(&'a mut self) -> ::capnp::Result<T>
528    where
529        T: ::capnp::traits::FromPointerBuilder<'a>,
530    {
531        use capnp::traits::ImbueMut;
532        let mut root: ::capnp::any_pointer::Builder = self.builder.get_root()?;
533        root.imbue_mut(&mut self.cap_table);
534        root.get_as()
535    }
536
537    pub fn set_root<T: ::capnp::traits::Owned>(
538        &mut self,
539        value: impl ::capnp::traits::SetterInput<T>,
540    ) -> ::capnp::Result<()> {
541        use capnp::traits::ImbueMut;
542        let mut root: ::capnp::any_pointer::Builder = self.builder.get_root()?;
543        root.imbue_mut(&mut self.cap_table);
544        root.set_as(value)
545    }
546}
547
548fn canceled_to_error(_e: futures::channel::oneshot::Canceled) -> Error {
549    Error::failed("oneshot was canceled".to_string())
550}