capnp_rpc/
reconnect.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::rc::Rc;
4
5use capnp::capability::{FromClientHook, Promise};
6use capnp::private::capability::{ClientHook, RequestHook};
7use futures::TryFutureExt;
8
9/// Trait implemented by the reconnecting client to set new connection out-of-band.
10///
11/// When using [`auto_reconnect`] or [`lazy_auto_reconnect`] it is not always optimal
12/// to wait for a call to fail with [`Disconnected`](capnp::ErrorKind::Disconnected)
13/// before replacing the client that is wrapped with a new fresh one.
14///
15/// Sometimes we know by other means that a client has gone away. It could be that we
16/// have clients that automatically sends us a new capability when it reconnects to us.
17///
18/// For these situations you can use the implementation of this trait that you get from
19/// [`auto_reconnect`] or [`lazy_auto_reconnect`] to manually set the target of the
20/// wrapped client.
21///
22/// # Example
23///
24/// ```ignore
25/// // The reconnecting client that automatically calls connect
26/// let (foo_client, set_target) = auto_reconnect(|| {
27///     Ok(new_future_client(connect()))
28/// })?;
29///
30/// // do work with foo_client
31/// ...
32///
33/// // We become aware that the client has gone so reconnect manually
34/// set_target.set_target(new_future_client(connect()));
35///
36/// // do more work with foo_client
37/// ...
38/// ```
39pub trait SetTarget<C> {
40    /// Adds a new reference to this implementation of SetTarget.
41    ///
42    /// This is mostly to get around that `Clone` requires `Sized` and so you need this
43    /// trick to get a copy of the `Box<dyn SetTarget<C>>` you got from making the
44    /// reconnecting client.
45    fn add_ref(&self) -> Box<dyn SetTarget<C>>;
46
47    /// Sets the target client of the reconnecting client that this trait implementation is
48    /// for.
49    fn set_target(&self, target: C);
50}
51
52impl<C> Clone for Box<dyn SetTarget<C>> {
53    fn clone(&self) -> Self {
54        self.add_ref()
55    }
56}
57
58struct ClientInner<F, C> {
59    connect: F,
60    current: Option<Box<dyn ClientHook>>,
61    generation: usize,
62    marker: PhantomData<C>,
63}
64
65impl<F, C> ClientInner<F, C>
66where
67    F: FnMut() -> capnp::Result<C>,
68    F: 'static,
69    C: FromClientHook,
70{
71    fn get_current(&mut self) -> Box<dyn ClientHook> {
72        if let Some(hook) = self.current.as_ref() {
73            hook.add_ref()
74        } else {
75            let hook = match (self.connect)() {
76                Ok(hook) => hook.into_client_hook(),
77                Err(err) => crate::broken::new_cap(err),
78            };
79            self.current = Some(hook.add_ref());
80            hook
81        }
82    }
83}
84
85struct Client<F, C> {
86    inner: Rc<RefCell<ClientInner<F, C>>>,
87}
88
89impl<F, C> Client<F, C>
90where
91    F: FnMut() -> capnp::Result<C>,
92    F: 'static,
93    C: FromClientHook,
94    C: 'static,
95{
96    pub fn new(connect: F) -> Client<F, C> {
97        Client {
98            inner: Rc::new(RefCell::new(ClientInner {
99                connect,
100                generation: 0,
101                current: None,
102                marker: PhantomData,
103            })),
104        }
105    }
106
107    pub fn get_current(&self) -> Box<dyn ClientHook> {
108        self.inner.borrow_mut().get_current()
109    }
110
111    fn wrap<T: 'static>(&self, promise: Promise<T, capnp::Error>) -> Promise<T, capnp::Error> {
112        let c = self.clone();
113        let generation = self.inner.borrow().generation;
114        Promise::from_future(promise.map_err(move |err| {
115            if err.kind == capnp::ErrorKind::Disconnected
116                && generation == c.inner.borrow().generation
117            {
118                let mut inner = c.inner.borrow_mut();
119                inner.generation = generation + 1;
120                match (inner.connect)() {
121                    Ok(hook) => inner.current = Some(hook.into_client_hook()),
122                    Err(err) => inner.current = Some(crate::broken::new_cap(err)),
123                }
124            }
125            err
126        }))
127    }
128}
129
130impl<F: 'static, C> SetTarget<C> for Client<F, C>
131where
132    F: 'static,
133    C: FromClientHook,
134    C: 'static,
135{
136    fn add_ref(&self) -> Box<dyn SetTarget<C>> {
137        Box::new(self.clone())
138    }
139
140    fn set_target(&self, target: C) {
141        self.inner.borrow_mut().current = Some(target.into_client_hook());
142    }
143}
144
145impl<F, C> Clone for Client<F, C> {
146    fn clone(&self) -> Self {
147        Self {
148            inner: self.inner.clone(),
149        }
150    }
151}
152
153impl<F, C> ClientHook for Client<F, C>
154where
155    F: FnMut() -> capnp::Result<C>,
156    F: 'static,
157    C: FromClientHook,
158    C: 'static,
159{
160    fn add_ref(&self) -> Box<dyn ClientHook> {
161        Box::new(self.clone())
162    }
163
164    fn new_call(
165        &self,
166        interface_id: u64,
167        method_id: u16,
168        size_hint: Option<capnp::MessageSize>,
169    ) -> capnp::capability::Request<capnp::any_pointer::Owned, capnp::any_pointer::Owned> {
170        let result = self
171            .get_current()
172            .new_call(interface_id, method_id, size_hint);
173        let hook = Request::new(self.clone(), result.hook);
174        capnp::capability::Request::new(Box::new(hook))
175    }
176
177    fn call(
178        &self,
179        interface_id: u64,
180        method_id: u16,
181        params: Box<dyn capnp::private::capability::ParamsHook>,
182        results: Box<dyn capnp::private::capability::ResultsHook>,
183    ) -> Promise<(), capnp::Error> {
184        let result = self
185            .get_current()
186            .call(interface_id, method_id, params, results);
187        self.wrap(result)
188    }
189
190    fn get_brand(&self) -> usize {
191        0
192    }
193
194    fn get_ptr(&self) -> usize {
195        (self.inner.as_ref()) as *const _ as usize
196    }
197
198    fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
199        None
200    }
201
202    fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, capnp::Error>> {
203        None
204    }
205
206    fn when_resolved(&self) -> Promise<(), capnp::Error> {
207        Promise::ok(())
208    }
209}
210
211struct Request<F, C> {
212    parent: Client<F, C>,
213    inner: Box<dyn RequestHook>,
214}
215
216impl<F, C> Request<F, C> {
217    fn new(parent: Client<F, C>, inner: Box<dyn RequestHook>) -> Request<F, C> {
218        Request { parent, inner }
219    }
220}
221
222impl<F, C> RequestHook for Request<F, C>
223where
224    F: FnMut() -> capnp::Result<C>,
225    F: 'static,
226    C: FromClientHook,
227    C: 'static,
228{
229    fn get(&mut self) -> capnp::any_pointer::Builder<'_> {
230        self.inner.get()
231    }
232
233    fn get_brand(&self) -> usize {
234        0
235    }
236
237    fn send(self: Box<Self>) -> capnp::capability::RemotePromise<capnp::any_pointer::Owned> {
238        let parent = self.parent;
239        let mut result = self.inner.send();
240        result.promise = parent.wrap(result.promise);
241        result
242    }
243
244    fn send_streaming(self: Box<Self>) -> Promise<(), capnp::Error> {
245        todo!()
246    }
247
248    fn tail_send(
249        self: Box<Self>,
250    ) -> Option<(
251        u32,
252        Promise<(), capnp::Error>,
253        Box<dyn capnp::private::capability::PipelineHook>,
254    )> {
255        todo!()
256    }
257}
258
259/// Creates a new client that reconnects when getting [`ErrorKind::Disconnected`](capnp::ErrorKind::Disconnected) errors.
260///
261/// Usually when you get a [`Disconnected`](capnp::ErrorKind::Disconnected) error response from calling a method on a capability
262/// it means the end of that capability for good. And so you can't call methods on that
263/// capability any more.
264///
265/// When you have a way of getting the capability back: Be it from a bootstrap or because
266/// the capability is persistent this method can help you wrap that reconnection logic into a client
267/// that automatically runs the logic whenever a method call returns [`Disconnected`](capnp::ErrorKind::Disconnected).
268///
269/// The way it works is that you provide a closure that returns a fresh client or a permanent error and
270/// you get a new connected client and a [`SetTarget`] interface that you can optionally use to prematurely
271/// replace the client.
272///
273/// There is one caveat though: The original request that got a [`Disconnected`](capnp::ErrorKind::Disconnected)
274/// will still get that response. It is up to the caller to retry the call if relevant. `auto_reconnect`` only
275/// deals with the calls that come after.
276///
277/// # Example
278///
279/// ```capnp
280/// # Cap'n Proto schema
281/// interface Foo {
282///     identity @0 (x: UInt32) -> (y: UInt32);
283/// }
284/// ```
285///
286/// ```ignore
287/// // A simple bootstrapped tcp connection to remote.example.com
288/// async fn connect() -> capnp::Result<foo_client::Client> {
289///     let stream = tokio::net::TcpStream::connect(&"remote.example.com:3001").await?;
290///     stream.set_nodelay(true)?;
291///     let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
292///
293///     let network = Box::new(twoparty::VatNetwork::new(
294///         futures::io::BufReader::new(reader),
295///         futures::io::BufWriter::new(writer),
296///         rpc_twoparty_capnp::Side::Client,
297///         Default::default(),
298///     ));
299///
300///     let mut rpc_system = RpcSystem::new(network, None);
301///     let foo_client: foo_client::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
302///     tokio::task::spawn_local(rpc_system);
303///     Ok(foo_client)
304/// }
305/// // The reconnecting client that automatically calls connect
306/// let (foo_client, _) = auto_reconnect(|| {
307///     // By using new_future_client we delay any calls until we have a new connection.
308///     Ok(new_future_client(connect()))
309/// })?;
310/// // Calling Foo like normally.
311/// let mut request = foo_client.identity_request();
312/// request.get().set_x(123);
313/// let promise = request.send().promise.and_then(|response| {
314///     println!("results = {}", response.get()?.get_y());
315///     Ok(())
316/// });
317/// ```
318pub fn auto_reconnect<F, C>(mut connect: F) -> capnp::Result<(C, Box<dyn SetTarget<C>>)>
319where
320    F: FnMut() -> capnp::Result<C>,
321    F: 'static,
322    C: FromClientHook,
323    C: 'static,
324{
325    let current = connect()?;
326    let c = Client::new(connect);
327    c.set_target(current);
328    let hook: Box<dyn ClientHook> = Box::new(c.clone());
329    Ok((FromClientHook::new(hook), Box::new(c)))
330}
331
332/// Creates a new client that lazily connect and also reconnects when getting [`ErrorKind::Disconnected`](capnp::ErrorKind::Disconnected) errors.
333///
334/// For explanation of how this functions see: [`auto_reconnect`]
335///
336/// The main difference between [`auto_reconnect`] and this function is that while [`auto_reconnect`] will call
337/// the closure immediately to get an inner client to wrap, this function starts out disconnected and only calls
338/// the closure to get the actual client when the capability is first used.
339pub fn lazy_auto_reconnect<F, C>(connect: F) -> (C, Box<dyn SetTarget<C>>)
340where
341    F: FnMut() -> capnp::Result<C>,
342    F: 'static,
343    C: FromClientHook,
344    C: 'static,
345{
346    let c: Client<F, C> = Client::new(connect);
347    let hook: Box<dyn ClientHook> = Box::new(c.clone());
348    (FromClientHook::new(hook), Box::new(c))
349}