capnp/
capability.rs

1// Copyright (c) 2013-2015 Sandstorm Development Group, Inc. and contributors
2// Licensed under the MIT License:
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20// THE SOFTWARE.
21
22//! Hooks for the RPC system.
23//!
24//! Roughly corresponds to capability.h in the C++ implementation.
25
26#[cfg(feature = "alloc")]
27use core::future::Future;
28#[cfg(feature = "alloc")]
29use core::marker::PhantomData;
30#[cfg(feature = "rpc_try")]
31use core::ops::Try;
32#[cfg(feature = "alloc")]
33use core::pin::Pin;
34#[cfg(feature = "alloc")]
35use core::task::Poll;
36
37use crate::any_pointer;
38#[cfg(feature = "alloc")]
39use crate::private::capability::{ClientHook, ParamsHook, RequestHook, ResponseHook, ResultsHook};
40#[cfg(feature = "alloc")]
41use crate::traits::{Owned, Pipelined};
42#[cfg(feature = "alloc")]
43use crate::{Error, MessageSize};
44
45/// A computation that might eventually resolve to a value of type `T` or to an error
46///  of type `E`. Dropping the promise cancels the computation.
47#[cfg(feature = "alloc")]
48#[must_use = "futures do nothing unless polled"]
49pub struct Promise<T, E> {
50    inner: PromiseInner<T, E>,
51}
52
53#[cfg(feature = "alloc")]
54enum PromiseInner<T, E> {
55    Immediate(Result<T, E>),
56    Deferred(Pin<alloc::boxed::Box<dyn Future<Output = core::result::Result<T, E>> + 'static>>),
57    Empty,
58}
59
60// Allow Promise<T,E> to be Unpin, regardless of whether T and E are.
61#[cfg(feature = "alloc")]
62impl<T, E> Unpin for PromiseInner<T, E> {}
63
64#[cfg(feature = "alloc")]
65impl<T, E> Promise<T, E> {
66    pub fn ok(value: T) -> Self {
67        Self {
68            inner: PromiseInner::Immediate(Ok(value)),
69        }
70    }
71
72    pub fn err(error: E) -> Self {
73        Self {
74            inner: PromiseInner::Immediate(Err(error)),
75        }
76    }
77
78    pub fn from_future<F>(f: F) -> Self
79    where
80        F: Future<Output = core::result::Result<T, E>> + 'static,
81    {
82        Self {
83            inner: PromiseInner::Deferred(alloc::boxed::Box::pin(f)),
84        }
85    }
86}
87
88#[cfg(feature = "alloc")]
89impl<T, E> Future for Promise<T, E> {
90    type Output = core::result::Result<T, E>;
91
92    fn poll(self: Pin<&mut Self>, cx: &mut ::core::task::Context) -> Poll<Self::Output> {
93        match self.get_mut().inner {
94            PromiseInner::Empty => panic!("Promise polled after done."),
95            ref mut imm @ PromiseInner::Immediate(_) => {
96                match core::mem::replace(imm, PromiseInner::Empty) {
97                    PromiseInner::Immediate(r) => Poll::Ready(r),
98                    _ => unreachable!(),
99                }
100            }
101            PromiseInner::Deferred(ref mut f) => f.as_mut().poll(cx),
102        }
103    }
104}
105
106#[cfg(feature = "alloc")]
107#[cfg(feature = "rpc_try")]
108impl<T> core::ops::Try for Promise<T, crate::Error> {
109    type Output = Self;
110    type Residual = Result<core::convert::Infallible, crate::Error>;
111
112    fn from_output(output: Self::Output) -> Self {
113        output
114    }
115
116    fn branch(self) -> core::ops::ControlFlow<Self::Residual, Self::Output> {
117        unimplemented!();
118    }
119}
120
121#[cfg(feature = "alloc")]
122#[cfg(feature = "rpc_try")]
123impl<T> core::ops::FromResidual for Promise<T, crate::Error> {
124    fn from_residual(residual: <Self as Try>::Residual) -> Self {
125        match residual {
126            Ok(_) => unimplemented!(),
127            Err(e) => Self::err(e),
128        }
129    }
130}
131
132/// A promise for a result from a method call.
133#[cfg(feature = "alloc")]
134#[must_use]
135pub struct RemotePromise<Results>
136where
137    Results: Pipelined + Owned + 'static,
138{
139    pub promise: Promise<Response<Results>, crate::Error>,
140    pub pipeline: Results::Pipeline,
141}
142
143/// A response from a method call, as seen by the client.
144#[cfg(feature = "alloc")]
145pub struct Response<Results> {
146    pub marker: PhantomData<Results>,
147    pub hook: alloc::boxed::Box<dyn ResponseHook>,
148}
149
150#[cfg(feature = "alloc")]
151impl<Results> Response<Results>
152where
153    Results: Pipelined + Owned,
154{
155    pub fn new(hook: alloc::boxed::Box<dyn ResponseHook>) -> Self {
156        Self {
157            marker: PhantomData,
158            hook,
159        }
160    }
161    pub fn get(&self) -> crate::Result<Results::Reader<'_>> {
162        self.hook.get()?.get_as()
163    }
164}
165
166/// A method call that has not been sent yet.
167#[cfg(feature = "alloc")]
168pub struct Request<Params, Results> {
169    pub marker: PhantomData<(Params, Results)>,
170    pub hook: alloc::boxed::Box<dyn RequestHook>,
171}
172
173#[cfg(feature = "alloc")]
174impl<Params, Results> Request<Params, Results>
175where
176    Params: Owned,
177{
178    pub fn new(hook: alloc::boxed::Box<dyn RequestHook>) -> Self {
179        Self {
180            hook,
181            marker: PhantomData,
182        }
183    }
184
185    pub fn get(&mut self) -> Params::Builder<'_> {
186        self.hook.get().get_as().unwrap()
187    }
188
189    pub fn set(&mut self, from: Params::Reader<'_>) -> crate::Result<()> {
190        self.hook.get().set_as(from)
191    }
192}
193
194#[cfg(feature = "alloc")]
195impl<Params, Results> Request<Params, Results>
196where
197    Results: Pipelined + Owned + 'static + Unpin,
198    <Results as Pipelined>::Pipeline: FromTypelessPipeline,
199{
200    pub fn send(self) -> RemotePromise<Results> {
201        let RemotePromise {
202            promise, pipeline, ..
203        } = self.hook.send();
204        let typed_promise = Promise::from_future(async move {
205            Ok(Response {
206                hook: promise.await?.hook,
207                marker: PhantomData,
208            })
209        });
210        RemotePromise {
211            promise: typed_promise,
212            pipeline: FromTypelessPipeline::new(pipeline),
213        }
214    }
215}
216
217/// A method call that has not been sent yet.
218#[cfg(feature = "alloc")]
219pub struct StreamingRequest<Params> {
220    pub marker: PhantomData<Params>,
221    pub hook: alloc::boxed::Box<dyn RequestHook>,
222}
223
224#[cfg(feature = "alloc")]
225impl<Params> StreamingRequest<Params>
226where
227    Params: Owned,
228{
229    pub fn get(&mut self) -> Params::Builder<'_> {
230        self.hook.get().get_as().unwrap()
231    }
232
233    pub fn send(self) -> Promise<(), Error> {
234        self.hook.send_streaming()
235    }
236}
237
238/// The values of the parameters passed to a method call, as seen by the server.
239#[cfg(feature = "alloc")]
240pub struct Params<T> {
241    pub marker: PhantomData<T>,
242    pub hook: alloc::boxed::Box<dyn ParamsHook>,
243}
244
245#[cfg(feature = "alloc")]
246impl<T> Params<T> {
247    pub fn new(hook: alloc::boxed::Box<dyn ParamsHook>) -> Self {
248        Self {
249            marker: PhantomData,
250            hook,
251        }
252    }
253    pub fn get(&self) -> crate::Result<T::Reader<'_>>
254    where
255        T: Owned,
256    {
257        self.hook.get()?.get_as()
258    }
259}
260
261/// The return values of a method, written in-place by the method body.
262#[cfg(feature = "alloc")]
263pub struct Results<T> {
264    pub marker: PhantomData<T>,
265    pub hook: alloc::boxed::Box<dyn ResultsHook>,
266}
267
268#[cfg(feature = "alloc")]
269impl<T> Results<T>
270where
271    T: Owned,
272{
273    pub fn new(hook: alloc::boxed::Box<dyn ResultsHook>) -> Self {
274        Self {
275            marker: PhantomData,
276            hook,
277        }
278    }
279
280    pub fn get(&mut self) -> T::Builder<'_> {
281        self.hook.get().unwrap().get_as().unwrap()
282    }
283
284    pub fn set(&mut self, other: T::Reader<'_>) -> crate::Result<()> {
285        self.hook.get().unwrap().set_as(other)
286    }
287
288    /// Call this method to signal that all of the capabilities have been filled in for this
289    /// `Results` and that pipelined calls should be allowed to start using those capabilities.
290    /// (Usually pipelined calls are enqueued until the initial call completes.)
291    pub fn set_pipeline(&mut self) -> crate::Result<()> {
292        self.hook.set_pipeline()
293    }
294}
295
296pub trait FromTypelessPipeline {
297    fn new(typeless: any_pointer::Pipeline) -> Self;
298}
299
300/// Trait implemented (via codegen) by all user-defined capability client types.
301#[cfg(feature = "alloc")]
302pub trait FromClientHook {
303    /// Wraps a client hook to create a new client.
304    fn new(hook: alloc::boxed::Box<dyn ClientHook>) -> Self;
305
306    /// Unwraps client to get the underlying client hook.
307    fn into_client_hook(self) -> alloc::boxed::Box<dyn ClientHook>;
308
309    /// Gets a reference to the underlying client hook.
310    fn as_client_hook(&self) -> &dyn ClientHook;
311
312    /// Casts `self` to another instance of `FromClientHook`. This always succeeds,
313    /// but if the underlying capability does not actually implement `T`'s interface,
314    /// then method calls will fail with "unimplemented" errors.
315    fn cast_to<T: FromClientHook + Sized>(self) -> T
316    where
317        Self: Sized,
318    {
319        FromClientHook::new(self.into_client_hook())
320    }
321}
322
323/// An untyped client.
324#[cfg(feature = "alloc")]
325pub struct Client {
326    pub hook: alloc::boxed::Box<dyn ClientHook>,
327}
328
329#[cfg(feature = "alloc")]
330impl Client {
331    pub fn new(hook: alloc::boxed::Box<dyn ClientHook>) -> Self {
332        Self { hook }
333    }
334
335    pub fn new_call<Params, Results>(
336        &self,
337        interface_id: u64,
338        method_id: u16,
339        size_hint: Option<MessageSize>,
340    ) -> Request<Params, Results> {
341        let typeless = self.hook.new_call(interface_id, method_id, size_hint);
342        Request {
343            hook: typeless.hook,
344            marker: PhantomData,
345        }
346    }
347
348    pub fn new_streaming_call<Params>(
349        &self,
350        interface_id: u64,
351        method_id: u16,
352        size_hint: Option<MessageSize>,
353    ) -> StreamingRequest<Params> {
354        let typeless = self.hook.new_call(interface_id, method_id, size_hint);
355        StreamingRequest {
356            hook: typeless.hook,
357            marker: PhantomData,
358        }
359    }
360
361    /// If the capability is actually only a promise, the returned promise resolves once the
362    /// capability itself has resolved to its final destination (or propagates the exception if
363    /// the capability promise is rejected).  This is mainly useful for error-checking in the case
364    /// where no calls are being made.  There is no reason to wait for this before making calls; if
365    /// the capability does not resolve, the call results will propagate the error.
366    pub fn when_resolved(&self) -> Promise<(), Error> {
367        self.hook.when_resolved()
368    }
369}
370
371/// The return value of Server::dispatch_call().
372#[cfg(feature = "alloc")]
373pub struct DispatchCallResult {
374    /// Promise for completion of the call.
375    pub promise: Promise<(), Error>,
376
377    /// If true, this method was declared as `-> stream;`. If this call throws
378    /// an exception, then all future calls on the capability with throw the
379    /// same exception.
380    pub is_streaming: bool,
381}
382
383#[cfg(feature = "alloc")]
384impl DispatchCallResult {
385    pub fn new(promise: Promise<(), Error>, is_streaming: bool) -> Self {
386        Self {
387            promise,
388            is_streaming,
389        }
390    }
391}
392
393/// An untyped server.
394#[cfg(feature = "alloc")]
395pub trait Server {
396    fn dispatch_call(
397        &mut self,
398        interface_id: u64,
399        method_id: u16,
400        params: Params<any_pointer::Owned>,
401        results: Results<any_pointer::Owned>,
402    ) -> DispatchCallResult;
403}
404
405/// Trait to track the relationship between generated Server traits and Client structs.
406#[cfg(feature = "alloc")]
407pub trait FromServer<S>: FromClientHook {
408    // Implemented by the generated ServerDispatch struct.
409    type Dispatch: Server + 'static + core::ops::DerefMut<Target = S>;
410
411    fn from_server(s: S) -> Self::Dispatch;
412}
413
414/// Gets the "resolved" version of a capability. One place this is useful is for pre-resolving
415/// the argument to `capnp_rpc::CapabilityServerSet::get_local_server_of_resolved()`.
416#[cfg(feature = "alloc")]
417pub async fn get_resolved_cap<C: FromClientHook>(cap: C) -> C {
418    let mut hook = cap.into_client_hook();
419    let _ = hook.when_resolved().await;
420    while let Some(resolved) = hook.get_resolved() {
421        hook = resolved;
422    }
423    FromClientHook::new(hook)
424}