capnp_rpc/reconnect.rs
1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::rc::Rc;
4
5use capnp::capability::{FromClientHook, Promise};
6use capnp::private::capability::{ClientHook, RequestHook};
7use futures::TryFutureExt;
8
9/// Trait implemented by the reconnecting client to set new connection out-of-band.
10///
11/// When using [`auto_reconnect`] or [`lazy_auto_reconnect`] it is not always optimal
12/// to wait for a call to fail with [`Disconnected`](capnp::ErrorKind::Disconnected)
13/// before replacing the client that is wrapped with a new fresh one.
14///
15/// Sometimes we know by other means that a client has gone away. It could be that we
16/// have clients that automatically sends us a new capability when it reconnects to us.
17///
18/// For these situations you can use the implementation of this trait that you get from
19/// [`auto_reconnect`] or [`lazy_auto_reconnect`] to manually set the target of the
20/// wrapped client.
21///
22/// # Example
23///
24/// ```ignore
25/// // The reconnecting client that automatically calls connect
26/// let (foo_client, set_target) = auto_reconnect(|| {
27/// Ok(new_future_client(connect()))
28/// })?;
29///
30/// // do work with foo_client
31/// ...
32///
33/// // We become aware that the client has gone so reconnect manually
34/// set_target.set_target(new_future_client(connect()));
35///
36/// // do more work with foo_client
37/// ...
38/// ```
39pub trait SetTarget<C> {
40 /// Adds a new reference to this implementation of SetTarget.
41 ///
42 /// This is mostly to get around that `Clone` requires `Sized` and so you need this
43 /// trick to get a copy of the `Box<dyn SetTarget<C>>` you got from making the
44 /// reconnecting client.
45 fn add_ref(&self) -> Box<dyn SetTarget<C>>;
46
47 /// Sets the target client of the reconnecting client that this trait implementation is
48 /// for.
49 fn set_target(&self, target: C);
50}
51
52impl<C> Clone for Box<dyn SetTarget<C>> {
53 fn clone(&self) -> Self {
54 self.add_ref()
55 }
56}
57
58struct ClientInner<F, C> {
59 connect: F,
60 current: Option<Box<dyn ClientHook>>,
61 generation: usize,
62 marker: PhantomData<C>,
63}
64
65impl<F, C> ClientInner<F, C>
66where
67 F: FnMut() -> capnp::Result<C>,
68 F: 'static,
69 C: FromClientHook,
70{
71 fn get_current(&mut self) -> Box<dyn ClientHook> {
72 if let Some(hook) = self.current.as_ref() {
73 hook.add_ref()
74 } else {
75 let hook = match (self.connect)() {
76 Ok(hook) => hook.into_client_hook(),
77 Err(err) => crate::broken::new_cap(err),
78 };
79 self.current = Some(hook.add_ref());
80 hook
81 }
82 }
83}
84
85struct Client<F, C> {
86 inner: Rc<RefCell<ClientInner<F, C>>>,
87}
88
89impl<F, C> Client<F, C>
90where
91 F: FnMut() -> capnp::Result<C>,
92 F: 'static,
93 C: FromClientHook,
94 C: 'static,
95{
96 pub fn new(connect: F) -> Client<F, C> {
97 Client {
98 inner: Rc::new(RefCell::new(ClientInner {
99 connect,
100 generation: 0,
101 current: None,
102 marker: PhantomData,
103 })),
104 }
105 }
106
107 pub fn get_current(&self) -> Box<dyn ClientHook> {
108 self.inner.borrow_mut().get_current()
109 }
110
111 fn wrap<T: 'static>(&self, promise: Promise<T, capnp::Error>) -> Promise<T, capnp::Error> {
112 let c = self.clone();
113 let generation = self.inner.borrow().generation;
114 Promise::from_future(promise.map_err(move |err| {
115 if err.kind == capnp::ErrorKind::Disconnected
116 && generation == c.inner.borrow().generation
117 {
118 let mut inner = c.inner.borrow_mut();
119 inner.generation = generation + 1;
120 match (inner.connect)() {
121 Ok(hook) => inner.current = Some(hook.into_client_hook()),
122 Err(err) => inner.current = Some(crate::broken::new_cap(err)),
123 }
124 }
125 err
126 }))
127 }
128}
129
130impl<F: 'static, C> SetTarget<C> for Client<F, C>
131where
132 F: 'static,
133 C: FromClientHook,
134 C: 'static,
135{
136 fn add_ref(&self) -> Box<dyn SetTarget<C>> {
137 Box::new(self.clone())
138 }
139
140 fn set_target(&self, target: C) {
141 self.inner.borrow_mut().current = Some(target.into_client_hook());
142 }
143}
144
145impl<F, C> Clone for Client<F, C> {
146 fn clone(&self) -> Self {
147 Self {
148 inner: self.inner.clone(),
149 }
150 }
151}
152
153impl<F, C> ClientHook for Client<F, C>
154where
155 F: FnMut() -> capnp::Result<C>,
156 F: 'static,
157 C: FromClientHook,
158 C: 'static,
159{
160 fn add_ref(&self) -> Box<dyn ClientHook> {
161 Box::new(self.clone())
162 }
163
164 fn new_call(
165 &self,
166 interface_id: u64,
167 method_id: u16,
168 size_hint: Option<capnp::MessageSize>,
169 ) -> capnp::capability::Request<capnp::any_pointer::Owned, capnp::any_pointer::Owned> {
170 let result = self
171 .get_current()
172 .new_call(interface_id, method_id, size_hint);
173 let hook = Request::new(self.clone(), result.hook);
174 capnp::capability::Request::new(Box::new(hook))
175 }
176
177 fn call(
178 &self,
179 interface_id: u64,
180 method_id: u16,
181 params: Box<dyn capnp::private::capability::ParamsHook>,
182 results: Box<dyn capnp::private::capability::ResultsHook>,
183 ) -> Promise<(), capnp::Error> {
184 let result = self
185 .get_current()
186 .call(interface_id, method_id, params, results);
187 self.wrap(result)
188 }
189
190 fn get_brand(&self) -> usize {
191 0
192 }
193
194 fn get_ptr(&self) -> usize {
195 (self.inner.as_ref()) as *const _ as usize
196 }
197
198 fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
199 None
200 }
201
202 fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, capnp::Error>> {
203 None
204 }
205
206 fn when_resolved(&self) -> Promise<(), capnp::Error> {
207 Promise::ok(())
208 }
209}
210
211struct Request<F, C> {
212 parent: Client<F, C>,
213 inner: Box<dyn RequestHook>,
214}
215
216impl<F, C> Request<F, C> {
217 fn new(parent: Client<F, C>, inner: Box<dyn RequestHook>) -> Request<F, C> {
218 Request { parent, inner }
219 }
220}
221
222impl<F, C> RequestHook for Request<F, C>
223where
224 F: FnMut() -> capnp::Result<C>,
225 F: 'static,
226 C: FromClientHook,
227 C: 'static,
228{
229 fn get(&mut self) -> capnp::any_pointer::Builder<'_> {
230 self.inner.get()
231 }
232
233 fn get_brand(&self) -> usize {
234 0
235 }
236
237 fn send(self: Box<Self>) -> capnp::capability::RemotePromise<capnp::any_pointer::Owned> {
238 let parent = self.parent;
239 let mut result = self.inner.send();
240 result.promise = parent.wrap(result.promise);
241 result
242 }
243
244 fn send_streaming(self: Box<Self>) -> Promise<(), capnp::Error> {
245 todo!()
246 }
247
248 fn tail_send(
249 self: Box<Self>,
250 ) -> Option<(
251 u32,
252 Promise<(), capnp::Error>,
253 Box<dyn capnp::private::capability::PipelineHook>,
254 )> {
255 todo!()
256 }
257}
258
259/// Creates a new client that reconnects when getting [`ErrorKind::Disconnected`](capnp::ErrorKind::Disconnected) errors.
260///
261/// Usually when you get a [`Disconnected`](capnp::ErrorKind::Disconnected) error response from calling a method on a capability
262/// it means the end of that capability for good. And so you can't call methods on that
263/// capability any more.
264///
265/// When you have a way of getting the capability back: Be it from a bootstrap or because
266/// the capability is persistent this method can help you wrap that reconnection logic into a client
267/// that automatically runs the logic whenever a method call returns [`Disconnected`](capnp::ErrorKind::Disconnected).
268///
269/// The way it works is that you provide a closure that returns a fresh client or a permanent error and
270/// you get a new connected client and a [`SetTarget`] interface that you can optionally use to prematurely
271/// replace the client.
272///
273/// There is one caveat though: The original request that got a [`Disconnected`](capnp::ErrorKind::Disconnected)
274/// will still get that response. It is up to the caller to retry the call if relevant. `auto_reconnect`` only
275/// deals with the calls that come after.
276///
277/// # Example
278///
279/// ```capnp
280/// # Cap'n Proto schema
281/// interface Foo {
282/// identity @0 (x: UInt32) -> (y: UInt32);
283/// }
284/// ```
285///
286/// ```ignore
287/// // A simple bootstrapped tcp connection to remote.example.com
288/// async fn connect() -> capnp::Result<foo_client::Client> {
289/// let stream = tokio::net::TcpStream::connect(&"remote.example.com:3001").await?;
290/// stream.set_nodelay(true)?;
291/// let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
292///
293/// let network = Box::new(twoparty::VatNetwork::new(
294/// futures::io::BufReader::new(reader),
295/// futures::io::BufWriter::new(writer),
296/// rpc_twoparty_capnp::Side::Client,
297/// Default::default(),
298/// ));
299///
300/// let mut rpc_system = RpcSystem::new(network, None);
301/// let foo_client: foo_client::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
302/// tokio::task::spawn_local(rpc_system);
303/// Ok(foo_client)
304/// }
305/// // The reconnecting client that automatically calls connect
306/// let (foo_client, _) = auto_reconnect(|| {
307/// // By using new_future_client we delay any calls until we have a new connection.
308/// Ok(new_future_client(connect()))
309/// })?;
310/// // Calling Foo like normally.
311/// let mut request = foo_client.identity_request();
312/// request.get().set_x(123);
313/// let promise = request.send().promise.and_then(|response| {
314/// println!("results = {}", response.get()?.get_y());
315/// Ok(())
316/// });
317/// ```
318pub fn auto_reconnect<F, C>(mut connect: F) -> capnp::Result<(C, Box<dyn SetTarget<C>>)>
319where
320 F: FnMut() -> capnp::Result<C>,
321 F: 'static,
322 C: FromClientHook,
323 C: 'static,
324{
325 let current = connect()?;
326 let c = Client::new(connect);
327 c.set_target(current);
328 let hook: Box<dyn ClientHook> = Box::new(c.clone());
329 Ok((FromClientHook::new(hook), Box::new(c)))
330}
331
332/// Creates a new client that lazily connect and also reconnects when getting [`ErrorKind::Disconnected`](capnp::ErrorKind::Disconnected) errors.
333///
334/// For explanation of how this functions see: [`auto_reconnect`]
335///
336/// The main difference between [`auto_reconnect`] and this function is that while [`auto_reconnect`] will call
337/// the closure immediately to get an inner client to wrap, this function starts out disconnected and only calls
338/// the closure to get the actual client when the capability is first used.
339pub fn lazy_auto_reconnect<F, C>(connect: F) -> (C, Box<dyn SetTarget<C>>)
340where
341 F: FnMut() -> capnp::Result<C>,
342 F: 'static,
343 C: FromClientHook,
344 C: 'static,
345{
346 let c: Client<F, C> = Client::new(connect);
347 let hook: Box<dyn ClientHook> = Box::new(c.clone());
348 (FromClientHook::new(hook), Box::new(c))
349}