remoc/robj/handle.rs
1//! Remote handle to an object.
2//!
3//! Handles provide the ability to send a shared reference to a local object to
4//! a remote endpoint.
5//! The remote endpoint cannot access the referenced object remotely but it can send back
6//! the handle (or a clone of it), which can then be dereferenced locally.
7//!
8//! Handles can be cloned and they are reference counted.
9//! If all handles to the object are dropped, it is dropped automatically.
10//!
11//! # Usage
12//!
13//! [Create a handle](Handle::new) to an object and send it to a remote endpoint, for example
14//! over a channel from the [rch](crate::rch) module.
15//! When you receive a handle that was created locally from the remote endpoint, use [Handle::as_ref]
16//! to access the object it references.
17//! Calling [Handle::as_ref] on a handle that was not created locally will result in an error.
18//!
19//! # Security
20//!
21//! When sending a handle an UUID is generated, associated with the object and send to the
22//! remote endpoint.
23//! Since the object itself is never send, there is no risk that private data within the object
24//! may be accessed remotely.
25//!
26//! The UUID is associated with the [channel multiplexer](crate::chmux) over which the
27//! handle was sent to the remote endpoint and a received handle can only access the object
28//! if it is received over the same channel multiplexer connection.
29//! Thus, even if the UUID is eavesdropped during transmission, another remote endpoint connected
30//! via a different channel multiplexer connection will not be able to access the object
31//! by constructing and sending back a handle with the eavesdropped UUID.
32//!
33//! # Example
34//!
35//! In the following example the server creates a handle and sends it to the client.
36//! The client tries to dereference a handle but receives an error because the handle
37//! was not created locally.
38//! The client then sends back the handle to the server.
39//! The server can successfully dereference the received handle.
40//!
41//! ```
42//! use remoc::prelude::*;
43//! use remoc::robj::handle::{Handle, HandleError};
44//!
45//! // This would be run on the client.
46//! async fn client(
47//! mut tx: rch::base::Sender<Handle<String>>,
48//! mut rx: rch::base::Receiver<Handle<String>>,
49//! ) {
50//! let handle = rx.recv().await.unwrap().unwrap();
51//! assert!(matches!(handle.as_ref().await, Err(HandleError::Unknown)));
52//! tx.send(handle).await.unwrap();
53//! }
54//!
55//! // This would be run on the server.
56//! async fn server(
57//! mut tx: rch::base::Sender<Handle<String>>,
58//! mut rx: rch::base::Receiver<Handle<String>>,
59//! ) {
60//! let data = "private data".to_string();
61//! let handle = Handle::new(data);
62//! assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
63//!
64//! tx.send(handle).await.unwrap();
65//!
66//! let handle = rx.recv().await.unwrap().unwrap();
67//! assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
68//! }
69//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
70//! ```
71
72use serde::{Deserialize, Serialize};
73use std::{
74 fmt,
75 marker::PhantomData,
76 mem,
77 ops::{Deref, DerefMut},
78 sync::Arc,
79};
80use tokio::sync::{OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard};
81use tracing::Instrument;
82use uuid::Uuid;
83
84use crate::{
85 chmux::{AnyBox, AnyEntry},
86 codec, exec,
87 rch::{
88 base::{PortDeserializer, PortSerializer},
89 mpsc,
90 },
91};
92
93/// An error during getting the value of a handle.
94#[derive(Clone, Debug, Serialize, Deserialize)]
95pub enum HandleError {
96 /// The value of the handle is not stored locally or it has already
97 /// been taken.
98 Unknown,
99 /// The values of the handle is of another type.
100 MismatchedType(String),
101}
102
103impl fmt::Display for HandleError {
104 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105 match self {
106 HandleError::Unknown => write!(f, "unknown, taken or non-local handle"),
107 HandleError::MismatchedType(ty) => write!(f, "mismatched handle type: {ty}"),
108 }
109 }
110}
111
112impl std::error::Error for HandleError {}
113
114/// Provider for a handle.
115///
116/// Dropping the provider causes all corresponding handles to become
117/// invalid and the underlying object to be dropped.
118pub struct Provider {
119 keep_tx: Option<tokio::sync::watch::Sender<bool>>,
120}
121
122impl fmt::Debug for Provider {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 f.debug_struct("Provider").finish()
125 }
126}
127
128impl Provider {
129 /// Keeps the provider alive until the handle is dropped.
130 pub fn keep(mut self) {
131 let _ = self.keep_tx.take().unwrap().send(true);
132 }
133
134 /// Waits until the handle provider can be safely dropped.
135 ///
136 /// This is the case when the handle is dropped.
137 pub async fn done(&mut self) {
138 self.keep_tx.as_mut().unwrap().closed().await
139 }
140}
141
142impl Drop for Provider {
143 fn drop(&mut self) {
144 // empty
145 }
146}
147
148/// Handle state.
149#[derive(Clone, Default)]
150enum State<Codec> {
151 /// Empty (for dropping).
152 #[default]
153 Empty,
154 /// Value has been created locally.
155 LocalCreated {
156 /// Reference to value.
157 entry: AnyEntry,
158 /// Keep notification.
159 keep_rx: tokio::sync::watch::Receiver<bool>,
160 },
161 /// Value is stored locally and handle has been received from
162 /// a remote endpoint.
163 LocalReceived {
164 /// Reference to value.
165 entry: AnyEntry,
166 /// Id in local handle storage.
167 id: Uuid,
168 /// Dropped notification.
169 dropped_tx: mpsc::Sender<(), Codec, 1>,
170 },
171 /// Value is stored on a remote endpoint.
172 Remote {
173 /// Id in remote handle storage.
174 id: Uuid,
175 /// Dropped notification.
176 dropped_tx: mpsc::Sender<(), Codec, 1>,
177 },
178}
179
180impl<Codec> fmt::Debug for State<Codec> {
181 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182 match self {
183 Self::Empty => write!(f, "Empty"),
184 Self::LocalCreated { .. } => write!(f, "LocalCreated"),
185 Self::LocalReceived { id, .. } => f.debug_struct("LocalReceived").field("id", id).finish(),
186 Self::Remote { id, .. } => f.debug_struct("Remote").field("id", id).finish(),
187 }
188 }
189}
190
191/// A handle to a value that is possibly stored on a remote endpoint.
192///
193/// If the value is stored locally, the handle can be used to obtain the value
194/// or a (mutable) reference to it.
195///
196/// See [module-level documentation](self) for details.
197#[derive(Clone)]
198pub struct Handle<T, Codec = codec::Default> {
199 state: State<Codec>,
200 _data: PhantomData<T>,
201}
202
203impl<T, Codec> fmt::Debug for Handle<T, Codec> {
204 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
205 write!(f, "{:?}", &self.state)
206 }
207}
208
209impl<T, Codec> Handle<T, Codec>
210where
211 T: Send + Sync + 'static,
212 Codec: codec::Codec,
213{
214 /// Creates a new handle for the value.
215 ///
216 /// There is *no* requirement on `T` to be [remote sendable](crate::RemoteSend), since
217 /// it is never send to a remote endpoint.
218 pub fn new(value: T) -> Self {
219 let (handle, provider) = Self::provided(value);
220 provider.keep();
221 handle
222 }
223
224 /// Creates a new handle for the value and returns it together
225 /// with its provider.
226 ///
227 /// This allows you to drop the object without relying upon all handles being
228 /// dropped, possibly by a remote endpoint.
229 /// This is especially useful when you connect to untrusted remote endpoints
230 /// that could try to obtain and keep a large number of handles to
231 /// perform a denial of service attack by exhausting your memory.
232 pub fn provided(value: T) -> (Self, Provider) {
233 let (keep_tx, keep_rx) = tokio::sync::watch::channel(false);
234
235 let handle = Self {
236 state: State::LocalCreated {
237 entry: Arc::new(tokio::sync::RwLock::new(Some(Box::new(value)))),
238 keep_rx,
239 },
240 _data: PhantomData,
241 };
242 let provider = Provider { keep_tx: Some(keep_tx) };
243
244 (handle, provider)
245 }
246
247 /// Takes the value of the handle and returns it, if it is stored locally.
248 ///
249 /// The handle and all its clones become invalid.
250 ///
251 /// This blocks until all existing read and write reference have been released.
252 pub async fn into_inner(mut self) -> Result<T, HandleError> {
253 let entry = match mem::take(&mut self.state) {
254 State::LocalCreated { entry, .. } => entry,
255 State::LocalReceived { entry, .. } => entry,
256 _ => return Err(HandleError::Unknown),
257 };
258
259 let mut entry = entry.write().await;
260 match entry.take() {
261 Some(any) => match any.downcast::<T>() {
262 Ok(value) => Ok(*value),
263 Err(any) => Err(HandleError::MismatchedType(format!("{:?}", (*any).type_id()))),
264 },
265 None => Err(HandleError::Unknown),
266 }
267 }
268
269 /// Returns a reference to the value of the handle, if it is stored locally.
270 ///
271 /// This blocks until all existing write reference have been released.
272 pub async fn as_ref(&self) -> Result<Ref<T>, HandleError> {
273 let entry = match &self.state {
274 State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
275 _ => return Err(HandleError::Unknown),
276 };
277
278 let entry = entry.read_owned().await;
279 match &*entry {
280 Some(any) => {
281 if !any.is::<T>() {
282 return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
283 }
284 let value_ref = OwnedRwLockReadGuard::map(entry, |entry| {
285 entry.as_ref().unwrap().downcast_ref::<T>().unwrap()
286 });
287 Ok(Ref(value_ref))
288 }
289 None => Err(HandleError::Unknown),
290 }
291 }
292
293 /// Returns a mutable reference to the value of the handle, if it is stored locally.
294 ///
295 /// This blocks until all existing read and write reference have been released.
296 pub async fn as_mut(&mut self) -> Result<RefMut<T>, HandleError> {
297 let entry = match &self.state {
298 State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
299 _ => return Err(HandleError::Unknown),
300 };
301
302 let entry = entry.write_owned().await;
303 match &*entry {
304 Some(any) => {
305 if !any.is::<T>() {
306 return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
307 }
308 let value_ref = OwnedRwLockWriteGuard::map(entry, |entry| {
309 entry.as_mut().unwrap().downcast_mut::<T>().unwrap()
310 });
311 Ok(RefMut(value_ref))
312 }
313 None => Err(HandleError::Unknown),
314 }
315 }
316
317 /// Change the data type of the handle.
318 ///
319 /// Before the handle can be dereferenced the type must be changed back to the original
320 /// type, otherwise a [HandleError::MismatchedType] error will occur.
321 ///
322 /// This is useful when you need to send a handle of a private type to a remote endpoint.
323 /// You can do so by creating a public, empty proxy struct and sending handles of this
324 /// type to remote endpoints.
325 pub fn cast<TNew>(self) -> Handle<TNew, Codec> {
326 Handle { state: self.state.clone(), _data: PhantomData }
327 }
328}
329
330impl<T, Codec> Drop for Handle<T, Codec> {
331 fn drop(&mut self) {
332 // empty
333 }
334}
335
336/// Handle in transport.
337#[derive(Debug, Serialize, Deserialize)]
338#[serde(bound(serialize = "Codec: codec::Codec"))]
339#[serde(bound(deserialize = "Codec: codec::Codec"))]
340pub(crate) struct TransportedHandle<T, Codec> {
341 /// Handle id.
342 id: Uuid,
343 /// Dropped notification.
344 dropped_tx: mpsc::Sender<(), Codec, 1>,
345 /// Data type.
346 data: PhantomData<T>,
347 /// Codec
348 codec: PhantomData<Codec>,
349}
350
351impl<T, Codec> Serialize for Handle<T, Codec>
352where
353 Codec: codec::Codec,
354{
355 /// Serializes this handle for sending over a chmux channel.
356 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
357 where
358 S: serde::Serializer,
359 {
360 let (id, dropped_tx) = match self.state.clone() {
361 State::LocalCreated { entry, mut keep_rx, .. } => {
362 let handle_storage = PortSerializer::storage()?;
363 let id = handle_storage.insert(entry.clone());
364
365 let (dropped_tx, dropped_rx) = mpsc::channel(1);
366 let dropped_tx = dropped_tx.set_buffer::<1>();
367 let mut dropped_rx = dropped_rx.set_buffer::<1>();
368
369 exec::spawn(
370 async move {
371 loop {
372 if *keep_rx.borrow_and_update() {
373 let _ = dropped_rx.recv().await;
374 break;
375 } else {
376 tokio::select! {
377 biased;
378 res = keep_rx.changed() => {
379 if !*keep_rx.borrow_and_update() && res.is_err() {
380 break;
381 }
382 },
383 _ = dropped_rx.recv() => break,
384 }
385 }
386 }
387
388 handle_storage.remove(id);
389 }
390 .in_current_span(),
391 );
392
393 (id, dropped_tx)
394 }
395 State::LocalReceived { id, dropped_tx, .. } | State::Remote { id, dropped_tx } => (id, dropped_tx),
396 State::Empty => unreachable!("state is only empty when dropping"),
397 };
398
399 let transported = TransportedHandle::<T, Codec> { id, dropped_tx, data: PhantomData, codec: PhantomData };
400
401 transported.serialize(serializer)
402 }
403}
404
405impl<'de, T, Codec> Deserialize<'de> for Handle<T, Codec>
406where
407 Codec: codec::Codec,
408{
409 /// Deserializes this handle after it has been received over a chmux channel.
410 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
411 where
412 D: serde::Deserializer<'de>,
413 {
414 let TransportedHandle { id, dropped_tx, .. } = TransportedHandle::<T, Codec>::deserialize(deserializer)?;
415
416 let handle_storage = PortDeserializer::storage()?;
417 let state = match handle_storage.remove(id) {
418 Some(entry) => State::LocalReceived { entry, id, dropped_tx },
419 None => State::Remote { id, dropped_tx },
420 };
421
422 Ok(Self { state, _data: PhantomData })
423 }
424}
425
426/// An owned reference to the value of a handle.
427pub struct Ref<T>(OwnedRwLockReadGuard<Option<AnyBox>, T>);
428
429impl<T> Deref for Ref<T> {
430 type Target = T;
431
432 fn deref(&self) -> &Self::Target {
433 &self.0
434 }
435}
436
437impl<T> fmt::Debug for Ref<T>
438where
439 T: fmt::Debug,
440{
441 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
442 write!(f, "{:?}", &**self)
443 }
444}
445
446/// An owned mutable reference to the value of a handle.
447pub struct RefMut<T>(OwnedRwLockMappedWriteGuard<Option<AnyBox>, T>);
448
449impl<T> Deref for RefMut<T> {
450 type Target = T;
451
452 fn deref(&self) -> &Self::Target {
453 &self.0
454 }
455}
456
457impl<T> DerefMut for RefMut<T> {
458 fn deref_mut(&mut self) -> &mut Self::Target {
459 &mut self.0
460 }
461}
462
463impl<T> fmt::Debug for RefMut<T>
464where
465 T: fmt::Debug,
466{
467 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
468 write!(f, "{:?}", &**self)
469 }
470}