1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
//! Remote handle to an object.
//!
//! Handles provide the ability to send a shared reference to a local object to
//! a remote endpoint.
//! The remote endpoint cannot access the referenced object remotely but it can send back
//! the handle (or a clone of it), which can then be dereferenced locally.
//!
//! Handles can be cloned and they are reference counted.
//! If all handles to the object are dropped, it is dropped automatically.
//!
//! # Usage
//!
//! [Create a handle](Handle::new) to an object and send it to a remote endpoint, for example
//! over a channel from the [rch](crate::rch) module.
//! When you receive a handle that was created locally from the remote endpoint, use [Handle::as_ref]
//! to access the object it references.
//! Calling [Handle::as_ref] on a handle that was not created locally will result in an error.
//!
//! # Security
//!
//! When sending a handle an UUID is generated, associated with the object and send to the
//! remote endpoint.
//! Since the object itself is never send, there is no risk that private data within the object
//! may be accessed remotely.
//!
//! The UUID is associated with the [channel multiplexer](crate::chmux) over which the
//! handle was sent to the remote endpoint and a received handle can only access the object
//! if it is received over the same channel multiplexer connection.
//! Thus, even if the UUID is eavesdropped during transmission, another remote endpoint connected
//! via a different channel multiplexer connection will not be able to access the object
//! by constructing and sending back a handle with the eavesdropped UUID.
//!
//! # Example
//!
//! In the following example the server creates a handle and sends it to the client.
//! The client tries to dereference a handle but receives an error because the handle
//! was not created locally.
//! The client then sends back the handle to the server.
//! The server can successfully dereference the received handle.
//!
//! ```
//! use remoc::prelude::*;
//! use remoc::robj::handle::{Handle, HandleError};
//!
//! // This would be run on the client.
//! async fn client(
//!     mut tx: rch::base::Sender<Handle<String>>,
//!     mut rx: rch::base::Receiver<Handle<String>>,
//! ) {
//!     let handle = rx.recv().await.unwrap().unwrap();
//!     assert!(matches!(handle.as_ref().await, Err(HandleError::Unknown)));
//!     tx.send(handle).await.unwrap();
//! }
//!
//! // This would be run on the server.
//! async fn server(
//!     mut tx: rch::base::Sender<Handle<String>>,
//!     mut rx: rch::base::Receiver<Handle<String>>,
//! ) {
//!     let data = "private data".to_string();
//!     let handle = Handle::new(data);
//!     assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
//!
//!     tx.send(handle).await.unwrap();
//!
//!     let handle = rx.recv().await.unwrap().unwrap();
//!     assert_eq!(*handle.as_ref().await.unwrap(), "private data".to_string());
//! }
//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
//! ```

use serde::{Deserialize, Serialize};
use std::{
    fmt,
    marker::PhantomData,
    mem,
    ops::{Deref, DerefMut},
    sync::Arc,
};
use tokio::sync::{OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard};
use uuid::Uuid;

use crate::{
    chmux::{AnyBox, AnyEntry},
    codec,
    rch::{
        base::{PortDeserializer, PortSerializer},
        mpsc,
    },
};

/// An error during getting the value of a handle.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum HandleError {
    /// The value of the handle is not stored locally or it has already
    /// been taken.
    Unknown,
    /// The values of the handle is of another type.
    MismatchedType(String),
}

impl fmt::Display for HandleError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            HandleError::Unknown => write!(f, "unknown, taken or non-local handle"),
            HandleError::MismatchedType(ty) => write!(f, "mismatched handle type: {ty}"),
        }
    }
}

impl std::error::Error for HandleError {}

/// Provider for a handle.
///
/// Dropping the provider causes all corresponding handles to become
/// invalid and the underlying object to be dropped.
pub struct Provider {
    keep_tx: Option<tokio::sync::watch::Sender<bool>>,
}

impl fmt::Debug for Provider {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Provider").finish()
    }
}

impl Provider {
    /// Keeps the provider alive until the handle is dropped.
    pub fn keep(mut self) {
        let _ = self.keep_tx.take().unwrap().send(true);
    }

    /// Waits until the handle provider can be safely dropped.
    ///
    /// This is the case when the handle is dropped.
    pub async fn done(&mut self) {
        self.keep_tx.as_mut().unwrap().closed().await
    }
}

impl Drop for Provider {
    fn drop(&mut self) {
        // empty
    }
}

/// Handle state.
#[derive(Clone)]
enum State<Codec> {
    /// Empty (for dropping).
    Empty,
    /// Value has been created locally.
    LocalCreated {
        /// Reference to value.
        entry: AnyEntry,
        /// Keep notification.
        keep_rx: tokio::sync::watch::Receiver<bool>,
    },
    /// Value is stored locally and handle has been received from
    /// a remote endpoint.
    LocalReceived {
        /// Reference to value.
        entry: AnyEntry,
        /// Id in local handle storage.
        id: Uuid,
        /// Dropped notification.
        dropped_tx: mpsc::Sender<(), Codec, 1>,
    },
    /// Value is stored on a remote endpoint.
    Remote {
        /// Id in remote handle storage.
        id: Uuid,
        /// Dropped notification.
        dropped_tx: mpsc::Sender<(), Codec, 1>,
    },
}

impl<Codec> Default for State<Codec> {
    fn default() -> Self {
        Self::Empty
    }
}

impl<Codec> fmt::Debug for State<Codec> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Self::Empty => write!(f, "EmptyHandle"),
            Self::LocalCreated { .. } => write!(f, "LocalCreatedHandle"),
            Self::LocalReceived { id, .. } => f.debug_struct("LocalReceivedHandle").field("id", id).finish(),
            Self::Remote { id, .. } => f.debug_struct("RemoteHandle").field("id", id).finish(),
        }
    }
}

/// A handle to a value that is possibly stored on a remote endpoint.
///
/// If the value is stored locally, the handle can be used to obtain the value
/// or a (mutable) reference to it.
///
/// See [module-level documentation](self) for details.
#[derive(Clone)]
pub struct Handle<T, Codec = codec::Default> {
    state: State<Codec>,
    _data: PhantomData<T>,
}

impl<T, Codec> fmt::Debug for Handle<T, Codec> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", &self.state)
    }
}

impl<T, Codec> Handle<T, Codec>
where
    T: Send + Sync + 'static,
    Codec: codec::Codec,
{
    /// Creates a new handle for the value.
    ///
    /// There is *no* requirement on `T` to be [remote sendable](crate::RemoteSend), since
    /// it is never send to a remote endpoint.
    pub fn new(value: T) -> Self {
        let (handle, provider) = Self::provided(value);
        provider.keep();
        handle
    }

    /// Creates a new handle for the value and returns it together
    /// with its provider.
    ///
    /// This allows you to drop the object without relying upon all handles being
    /// dropped, possibly by a remote endpoint.
    /// This is especially useful when you connect to untrusted remote endpoints
    /// that could try to obtain and keep a large number of handles to
    /// perform a denial of service attack by exhausting your memory.
    pub fn provided(value: T) -> (Self, Provider) {
        let (keep_tx, keep_rx) = tokio::sync::watch::channel(false);

        let handle = Self {
            state: State::LocalCreated {
                entry: Arc::new(tokio::sync::RwLock::new(Some(Box::new(value)))),
                keep_rx,
            },
            _data: PhantomData,
        };
        let provider = Provider { keep_tx: Some(keep_tx) };

        (handle, provider)
    }

    /// Takes the value of the handle and returns it, if it is stored locally.
    ///
    /// The handle and all its clones become invalid.
    ///
    /// This blocks until all existing read and write reference have been released.
    #[inline]
    pub async fn into_inner(mut self) -> Result<T, HandleError> {
        let entry = match mem::take(&mut self.state) {
            State::LocalCreated { entry, .. } => entry,
            State::LocalReceived { entry, .. } => entry,
            _ => return Err(HandleError::Unknown),
        };

        let mut entry = entry.write().await;
        match entry.take() {
            Some(any) => match any.downcast::<T>() {
                Ok(value) => Ok(*value),
                Err(any) => Err(HandleError::MismatchedType(format!("{:?}", (*any).type_id()))),
            },
            None => Err(HandleError::Unknown),
        }
    }

    /// Returns a reference to the value of the handle, if it is stored locally.
    ///
    /// This blocks until all existing write reference have been released.
    #[inline]
    pub async fn as_ref(&self) -> Result<Ref<T>, HandleError> {
        let entry = match &self.state {
            State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
            _ => return Err(HandleError::Unknown),
        };

        let entry = entry.read_owned().await;
        match &*entry {
            Some(any) => {
                if !any.is::<T>() {
                    return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
                }
                let value_ref = OwnedRwLockReadGuard::map(entry, |entry| {
                    entry.as_ref().unwrap().downcast_ref::<T>().unwrap()
                });
                Ok(Ref(value_ref))
            }
            None => Err(HandleError::Unknown),
        }
    }

    /// Returns a mutable reference to the value of the handle, if it is stored locally.
    ///
    /// This blocks until all existing read and write reference have been released.
    #[inline]
    pub async fn as_mut(&mut self) -> Result<RefMut<T>, HandleError> {
        let entry = match &self.state {
            State::LocalCreated { entry, .. } | State::LocalReceived { entry, .. } => entry.clone(),
            _ => return Err(HandleError::Unknown),
        };

        let entry = entry.write_owned().await;
        match &*entry {
            Some(any) => {
                if !any.is::<T>() {
                    return Err(HandleError::MismatchedType(format!("{:?}", (**any).type_id())));
                }
                let value_ref = OwnedRwLockWriteGuard::map(entry, |entry| {
                    entry.as_mut().unwrap().downcast_mut::<T>().unwrap()
                });
                Ok(RefMut(value_ref))
            }
            None => Err(HandleError::Unknown),
        }
    }

    /// Change the data type of the handle.
    ///
    /// Before the handle can be dereferenced the type must be changed back to the original
    /// type, otherwise a [HandleError::MismatchedType] error will occur.
    ///
    /// This is useful when you need to send a handle of a private type to a remote endpoint.
    /// You can do so by creating a public, empty proxy struct and sending handles of this
    /// type to remote endpoints.
    pub fn cast<TNew>(self) -> Handle<TNew, Codec> {
        Handle { state: self.state.clone(), _data: PhantomData }
    }
}

impl<T, Codec> Drop for Handle<T, Codec> {
    fn drop(&mut self) {
        // empty
    }
}

/// Handle in transport.
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "Codec: codec::Codec"))]
#[serde(bound(deserialize = "Codec: codec::Codec"))]
pub(crate) struct TransportedHandle<T, Codec> {
    /// Handle id.
    id: Uuid,
    /// Dropped notification.
    dropped_tx: mpsc::Sender<(), Codec, 1>,
    /// Data type.
    data: PhantomData<T>,
    /// Codec
    codec: PhantomData<Codec>,
}

impl<T, Codec> Serialize for Handle<T, Codec>
where
    Codec: codec::Codec,
{
    /// Serializes this handle for sending over a chmux channel.
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let (id, dropped_tx) = match self.state.clone() {
            State::LocalCreated { entry, mut keep_rx, .. } => {
                let handle_storage = PortSerializer::storage()?;
                let id = handle_storage.insert(entry.clone());

                let (dropped_tx, dropped_rx) = mpsc::channel(1);
                let dropped_tx = dropped_tx.set_buffer::<1>();
                let mut dropped_rx = dropped_rx.set_buffer::<1>();

                tokio::spawn(async move {
                    loop {
                        if *keep_rx.borrow_and_update() {
                            let _ = dropped_rx.recv().await;
                        } else {
                            tokio::select! {
                                biased;
                                res = keep_rx.changed() => {
                                    if res.is_err() {
                                        break;
                                    }
                                },
                                _ = dropped_rx.recv() => (),
                            }
                        }
                    }

                    handle_storage.remove(id);
                });

                (id, dropped_tx)
            }
            State::LocalReceived { id, dropped_tx, .. } | State::Remote { id, dropped_tx } => (id, dropped_tx),
            State::Empty => unreachable!("state is only empty when dropping"),
        };

        let transported = TransportedHandle::<T, Codec> { id, dropped_tx, data: PhantomData, codec: PhantomData };

        transported.serialize(serializer)
    }
}

impl<'de, T, Codec> Deserialize<'de> for Handle<T, Codec>
where
    Codec: codec::Codec,
{
    /// Deserializes this handle after it has been received over a chmux channel.
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let TransportedHandle { id, dropped_tx, .. } = TransportedHandle::<T, Codec>::deserialize(deserializer)?;

        let handle_storage = PortDeserializer::storage()?;
        let state = match handle_storage.remove(id) {
            Some(entry) => State::LocalReceived { entry, id, dropped_tx },
            None => State::Remote { id, dropped_tx },
        };

        Ok(Self { state, _data: PhantomData })
    }
}

/// An owned reference to the value of a handle.
pub struct Ref<T>(OwnedRwLockReadGuard<Option<AnyBox>, T>);

impl<T> Deref for Ref<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<T> fmt::Debug for Ref<T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", &**self)
    }
}

/// An owned mutable reference to the value of a handle.
pub struct RefMut<T>(OwnedRwLockMappedWriteGuard<Option<AnyBox>, T>);

impl<T> Deref for RefMut<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<T> DerefMut for RefMut<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

impl<T> fmt::Debug for RefMut<T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", &**self)
    }
}