remoc/robj/
handle.rs

1//! Remote handle to an object.
2//!
3//! Handles provide the ability to send a shared reference to a local object to
4//! a remote endpoint.
5//! The remote endpoint cannot access the referenced object remotely but it can send back
6//! the handle (or a clone of it), which can then be dereferenced locally.
7//!
8//! Handles can be cloned and they are reference counted.
9//! If all handles to the object are dropped, it is dropped automatically.
10//!
11//! # Usage
12//!
13//! [Create a handle](Handle::new) to an object and send it to a remote endpoint, for example
14//! over a channel from the [rch](crate::rch) module.
15//! When you receive a handle that was created locally from the remote endpoint, use [Handle::as_ref]
16//! to access the object it references.
17//! Calling [Handle::as_ref] on a handle that was not created locally will result in an error.
18//!
19//! # Security
20//!
21//! When sending a handle an UUID is generated, associated with the object and send to the
22//! remote endpoint.
23//! Since the object itself is never send, there is no risk that private data within the object
24//! may be accessed remotely.
25//!
26//! The UUID is associated with the [channel multiplexer](crate::chmux) over which the
27//! handle was sent to the remote endpoint and a received handle can only access the object
28//! if it is received over the same channel multiplexer connection.
29//! Thus, even if the UUID is eavesdropped during transmission, another remote endpoint connected
30//! via a different channel multiplexer connection will not be able to access the object
31//! by constructing and sending back a handle with the eavesdropped UUID.
32//!
33//! # Example
34//!
35//! In the following example the server creates a handle and sends it to the client.
36//! The client tries to dereference a handle but receives an error because the handle
37//! was not created locally.
38//! The client then sends back the handle to the server.
39//! The server can successfully dereference the received handle.
40//!
41//! ```
42//! use remoc::prelude::*;
43//! use remoc::robj::handle::{Handle, HandleError};
44//!
45//! // This would be run on the client.
46//! async fn client(
47//!     mut tx: rch::base::Sender<Handle<String>>,
48//!     mut rx: rch::base::Receiver<Handle<String>>,
49//! ) {
50//!     let handle = rx.recv().await.unwrap().unwrap();
51//!     assert!(matches!(handle.as_ref().await, Err(HandleError::Unknown)));
52//!     tx.send(handle).await.unwrap();
53//! }
54//!
55//! // This would be run on the server.
56//! async fn server(
57//!     mut tx: rch::base::Sender<Handle<String>>,
58//!     mut rx: rch::base::Receiver<Handle<String>>,
59//! ) {
60//!     let data = "private data".to_string();
61//!     let handle = Handle::new(data);
62//!     assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
63//!
64//!     tx.send(handle).await.unwrap();
65//!
66//!     let handle = rx.recv().await.unwrap().unwrap();
67//!     assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
68//! }
69//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
70//! ```
71
72use serde::{Deserialize, Serialize};
73use std::{
74    fmt,
75    marker::PhantomData,
76    mem,
77    ops::{Deref, DerefMut},
78    sync::Arc,
79};
80use tokio::sync::{OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard};
81use tracing::Instrument;
82use uuid::Uuid;
83
84use crate::{
85    chmux::{AnyBox, AnyEntry},
86    codec, exec,
87    rch::{
88        base::{PortDeserializer, PortSerializer},
89        mpsc,
90    },
91};
92
93/// An error during getting the value of a handle.
94#[derive(Clone, Debug, Serialize, Deserialize)]
95pub enum HandleError {
96    /// The value of the handle is not stored locally or it has already
97    /// been taken.
98    Unknown,
99    /// The values of the handle is of another type.
100    MismatchedType(String),
101}
102
103impl fmt::Display for HandleError {
104    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105        match self {
106            HandleError::Unknown => write!(f, "unknown, taken or non-local handle"),
107            HandleError::MismatchedType(ty) => write!(f, "mismatched handle type: {ty}"),
108        }
109    }
110}
111
112impl std::error::Error for HandleError {}
113
114/// Provider for a handle.
115///
116/// Dropping the provider causes all corresponding handles to become
117/// invalid and the underlying object to be dropped.
118pub struct Provider {
119    keep_tx: Option<tokio::sync::watch::Sender<bool>>,
120}
121
122impl fmt::Debug for Provider {
123    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124        f.debug_struct("Provider").finish()
125    }
126}
127
128impl Provider {
129    /// Keeps the provider alive until the handle is dropped.
130    pub fn keep(mut self) {
131        let _ = self.keep_tx.take().unwrap().send(true);
132    }
133
134    /// Waits until the handle provider can be safely dropped.
135    ///
136    /// This is the case when the handle is dropped.
137    pub async fn done(&mut self) {
138        self.keep_tx.as_mut().unwrap().closed().await
139    }
140}
141
142impl Drop for Provider {
143    fn drop(&mut self) {
144        // empty
145    }
146}
147
148/// Handle state.
149#[derive(Clone, Default)]
150enum State<Codec> {
151    /// Empty (for dropping).
152    #[default]
153    Empty,
154    /// Value has been created locally.
155    LocalCreated {
156        /// Reference to value.
157        entry: AnyEntry,
158        /// Keep notification.
159        keep_rx: tokio::sync::watch::Receiver<bool>,
160    },
161    /// Value is stored locally and handle has been received from
162    /// a remote endpoint.
163    LocalReceived {
164        /// Reference to value.
165        entry: AnyEntry,
166        /// Id in local handle storage.
167        id: Uuid,
168        /// Dropped notification.
169        dropped_tx: mpsc::Sender<(), Codec, 1>,
170    },
171    /// Value is stored on a remote endpoint.
172    Remote {
173        /// Id in remote handle storage.
174        id: Uuid,
175        /// Dropped notification.
176        dropped_tx: mpsc::Sender<(), Codec, 1>,
177    },
178}
179
180impl<Codec> fmt::Debug for State<Codec> {
181    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182        match self {
183            Self::Empty => write!(f, "Empty"),
184            Self::LocalCreated { .. } => write!(f, "LocalCreated"),
185            Self::LocalReceived { id, .. } => f.debug_struct("LocalReceived").field("id", id).finish(),
186            Self::Remote { id, .. } => f.debug_struct("Remote").field("id", id).finish(),
187        }
188    }
189}
190
191/// A handle to a value that is possibly stored on a remote endpoint.
192///
193/// If the value is stored locally, the handle can be used to obtain the value
194/// or a (mutable) reference to it.
195///
196/// See [module-level documentation](self) for details.
197#[derive(Clone)]
198pub struct Handle<T, Codec = codec::Default> {
199    state: State<Codec>,
200    _data: PhantomData<T>,
201}
202
203impl<T, Codec> fmt::Debug for Handle<T, Codec> {
204    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
205        write!(f, "{:?}", &self.state)
206    }
207}
208
209impl<T, Codec> Handle<T, Codec>
210where
211    T: Send + Sync + 'static,
212    Codec: codec::Codec,
213{
214    /// Creates a new handle for the value.
215    ///
216    /// There is *no* requirement on `T` to be [remote sendable](crate::RemoteSend), since
217    /// it is never send to a remote endpoint.
218    pub fn new(value: T) -> Self {
219        let (handle, provider) = Self::provided(value);
220        provider.keep();
221        handle
222    }
223
224    /// Creates a new handle for the value and returns it together
225    /// with its provider.
226    ///
227    /// This allows you to drop the object without relying upon all handles being
228    /// dropped, possibly by a remote endpoint.
229    /// This is especially useful when you connect to untrusted remote endpoints
230    /// that could try to obtain and keep a large number of handles to
231    /// perform a denial of service attack by exhausting your memory.
232    pub fn provided(value: T) -> (Self, Provider) {
233        let (keep_tx, keep_rx) = tokio::sync::watch::channel(false);
234
235        let handle = Self {
236            state: State::LocalCreated {
237                entry: Arc::new(tokio::sync::RwLock::new(Some(Box::new(value)))),
238                keep_rx,
239            },
240            _data: PhantomData,
241        };
242        let provider = Provider { keep_tx: Some(keep_tx) };
243
244        (handle, provider)
245    }
246
247    /// Takes the value of the handle and returns it, if it is stored locally.
248    ///
249    /// The handle and all its clones become invalid.
250    ///
251    /// This blocks until all existing read and write reference have been released.
252    pub async fn into_inner(mut self) -> Result<T, HandleError> {
253        let entry = match mem::take(&mut self.state) {
254            State::LocalCreated { entry, .. } => entry,
255            State::LocalReceived { entry, .. } => entry,
256            _ => return Err(HandleError::Unknown),
257        };
258
259        let mut entry = entry.write().await;
260        match entry.take() {
261            Some(any) => match any.downcast::<T>() {
262                Ok(value) => Ok(*value),
263                Err(any) => Err(HandleError::MismatchedType(format!("{:?}", (*any).type_id()))),
264            },
265            None => Err(HandleError::Unknown),
266        }
267    }
268
269    /// Returns a reference to the value of the handle, if it is stored locally.
270    ///
271    /// This blocks until all existing write reference have been released.
272    pub async fn as_ref(&self) -> Result<Ref<T>, HandleError> {
273        let entry = match &self.state {
274            State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
275            _ => return Err(HandleError::Unknown),
276        };
277
278        let entry = entry.read_owned().await;
279        match &*entry {
280            Some(any) => {
281                if !any.is::<T>() {
282                    return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
283                }
284                let value_ref = OwnedRwLockReadGuard::map(entry, |entry| {
285                    entry.as_ref().unwrap().downcast_ref::<T>().unwrap()
286                });
287                Ok(Ref(value_ref))
288            }
289            None => Err(HandleError::Unknown),
290        }
291    }
292
293    /// Returns a mutable reference to the value of the handle, if it is stored locally.
294    ///
295    /// This blocks until all existing read and write reference have been released.
296    pub async fn as_mut(&mut self) -> Result<RefMut<T>, HandleError> {
297        let entry = match &self.state {
298            State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
299            _ => return Err(HandleError::Unknown),
300        };
301
302        let entry = entry.write_owned().await;
303        match &*entry {
304            Some(any) => {
305                if !any.is::<T>() {
306                    return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
307                }
308                let value_ref = OwnedRwLockWriteGuard::map(entry, |entry| {
309                    entry.as_mut().unwrap().downcast_mut::<T>().unwrap()
310                });
311                Ok(RefMut(value_ref))
312            }
313            None => Err(HandleError::Unknown),
314        }
315    }
316
317    /// Change the data type of the handle.
318    ///
319    /// Before the handle can be dereferenced the type must be changed back to the original
320    /// type, otherwise a [HandleError::MismatchedType] error will occur.
321    ///
322    /// This is useful when you need to send a handle of a private type to a remote endpoint.
323    /// You can do so by creating a public, empty proxy struct and sending handles of this
324    /// type to remote endpoints.
325    pub fn cast<TNew>(self) -> Handle<TNew, Codec> {
326        Handle { state: self.state.clone(), _data: PhantomData }
327    }
328}
329
330impl<T, Codec> Drop for Handle<T, Codec> {
331    fn drop(&mut self) {
332        // empty
333    }
334}
335
336/// Handle in transport.
337#[derive(Debug, Serialize, Deserialize)]
338#[serde(bound(serialize = "Codec: codec::Codec"))]
339#[serde(bound(deserialize = "Codec: codec::Codec"))]
340pub(crate) struct TransportedHandle<T, Codec> {
341    /// Handle id.
342    id: Uuid,
343    /// Dropped notification.
344    dropped_tx: mpsc::Sender<(), Codec, 1>,
345    /// Data type.
346    data: PhantomData<T>,
347    /// Codec
348    codec: PhantomData<Codec>,
349}
350
351impl<T, Codec> Serialize for Handle<T, Codec>
352where
353    Codec: codec::Codec,
354{
355    /// Serializes this handle for sending over a chmux channel.
356    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
357    where
358        S: serde::Serializer,
359    {
360        let (id, dropped_tx) = match self.state.clone() {
361            State::LocalCreated { entry, mut keep_rx, .. } => {
362                let handle_storage = PortSerializer::storage()?;
363                let id = handle_storage.insert(entry.clone());
364
365                let (dropped_tx, dropped_rx) = mpsc::channel(1);
366                let dropped_tx = dropped_tx.set_buffer::<1>();
367                let mut dropped_rx = dropped_rx.set_buffer::<1>();
368
369                exec::spawn(
370                    async move {
371                        loop {
372                            if *keep_rx.borrow_and_update() {
373                                let _ = dropped_rx.recv().await;
374                                break;
375                            } else {
376                                tokio::select! {
377                                    biased;
378                                    res = keep_rx.changed() => {
379                                        if !*keep_rx.borrow_and_update() && res.is_err() {
380                                            break;
381                                        }
382                                    },
383                                    _ = dropped_rx.recv() => break,
384                                }
385                            }
386                        }
387
388                        handle_storage.remove(id);
389                    }
390                    .in_current_span(),
391                );
392
393                (id, dropped_tx)
394            }
395            State::LocalReceived { id, dropped_tx, .. } | State::Remote { id, dropped_tx } => (id, dropped_tx),
396            State::Empty => unreachable!("state is only empty when dropping"),
397        };
398
399        let transported = TransportedHandle::<T, Codec> { id, dropped_tx, data: PhantomData, codec: PhantomData };
400
401        transported.serialize(serializer)
402    }
403}
404
405impl<'de, T, Codec> Deserialize<'de> for Handle<T, Codec>
406where
407    Codec: codec::Codec,
408{
409    /// Deserializes this handle after it has been received over a chmux channel.
410    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
411    where
412        D: serde::Deserializer<'de>,
413    {
414        let TransportedHandle { id, dropped_tx, .. } = TransportedHandle::<T, Codec>::deserialize(deserializer)?;
415
416        let handle_storage = PortDeserializer::storage()?;
417        let state = match handle_storage.remove(id) {
418            Some(entry) => State::LocalReceived { entry, id, dropped_tx },
419            None => State::Remote { id, dropped_tx },
420        };
421
422        Ok(Self { state, _data: PhantomData })
423    }
424}
425
426/// An owned reference to the value of a handle.
427pub struct Ref<T>(OwnedRwLockReadGuard<Option<AnyBox>, T>);
428
429impl<T> Deref for Ref<T> {
430    type Target = T;
431
432    fn deref(&self) -> &Self::Target {
433        &self.0
434    }
435}
436
437impl<T> fmt::Debug for Ref<T>
438where
439    T: fmt::Debug,
440{
441    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
442        write!(f, "{:?}", &**self)
443    }
444}
445
446/// An owned mutable reference to the value of a handle.
447pub struct RefMut<T>(OwnedRwLockMappedWriteGuard<Option<AnyBox>, T>);
448
449impl<T> Deref for RefMut<T> {
450    type Target = T;
451
452    fn deref(&self) -> &Self::Target {
453        &self.0
454    }
455}
456
457impl<T> DerefMut for RefMut<T> {
458    fn deref_mut(&mut self) -> &mut Self::Target {
459        &mut self.0
460    }
461}
462
463impl<T> fmt::Debug for RefMut<T>
464where
465    T: fmt::Debug,
466{
467    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
468        write!(f, "{:?}", &**self)
469    }
470}