1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Buttplug Rust Source Code File - See https://buttplug.io for more info.

//

// Copyright 2016-2020 Nonpolynomial Labs LLC. All rights reserved.

//

// Licensed under the BSD 3-Clause license. See LICENSE file in the project root

// for full license information.


//! Buttplug futures utilities. Mostly used for building message futures in the

//! client, used to wait on responses from the server.


use core::pin::Pin;
use futures::{
  future::Future,
  task::{Context, Poll, Waker},
};
use std::sync::{Arc, Mutex, MutexGuard};

/// Struct used for facilitating resolving futures across contexts.

///

/// Since ButtplugFuture is [Pinned][Pin], we can't just go passing it around

/// tasks or threads. This struct is therefore used to get replies from other

/// contexts while letting the future stay pinned. It holds the reply to the

/// future, as well as a [Waker] for waking up the future when the reply is set.

#[derive(Debug, Clone)]
pub struct ButtplugFutureState<T> {
  reply: Option<T>,
  waker: Option<Waker>,
}

// For some reason, deriving default above doesn't work, but doing an explicit

// derive here does work.

impl<T> Default for ButtplugFutureState<T> {
  fn default() -> Self {
    ButtplugFutureState::<T> {
      reply: None,
      waker: None,
    }
  }
}

impl<T> ButtplugFutureState<T> {
  /// Sets the response for the future, firing the waker.

  ///

  /// When a response is received from whatever we're waiting on, this function

  /// takes the response, updates the state struct, and calls [Waker::wake] so

  /// that the corresponding future can finish.

  ///

  /// # Panics

  ///

  /// If the reply is set twice, the library will panic. We have no way of

  /// resolving two replies to the same future, so this is considered a

  /// catastrophic error.

  pub fn set_reply(&mut self, reply: T) {
    if self.reply.is_some() {
      panic!("set_reply_msg called multiple times on the same future.");
    }

    self.reply = Some(reply);

    if self.waker.is_some() {
      self.waker.take().unwrap().wake();
    }
  }
}

/// Shared [ButtplugFutureState] type.

///

/// [ButtplugFutureState] is made to be shared across tasks, and we'll never

/// know if those tasks are running on single or multithreaded executors.

///

/// # Panics and notes on setting replies

///

/// The lock for a [ButtplugFutureState] should only ever be taken when the

/// reply is being set (which the `set_reply` method does internally), and there

/// should never be a point where the reply is set twice (See the panic

/// documentation for [ButtplugFutureState]). In order to make sure we never

/// block, we always lock using try_lock with .expect(). If try_lock fails, this

/// means we're already in a double reply situation, and therefore we'll panic

/// on the .expect(). Any panic from this should be considered a library error

/// and reported as a bug.

#[derive(Debug)]
pub struct ButtplugFutureStateShared<T> {
  /// The internal state of the future. When `set_reply` is run, we fill this in

  /// with the value we want the related future to resolve with.

  state: Arc<Mutex<ButtplugFutureState<T>>>,
}

unsafe impl<T> Sync for ButtplugFutureStateShared<T> {
}
unsafe impl<T> Send for ButtplugFutureStateShared<T> {
}

impl<T> ButtplugFutureStateShared<T> {
  pub fn new(state: ButtplugFutureState<T>) -> Self {
    Self {
      state: Arc::new(Mutex::new(state)),
    }
  }

  /// Locks and returns a [MutexGuard].

  ///

  /// See [ButtplugFutureStateShared] struct documentation for more info on

  /// locking.

  ///

  /// # Visibility

  ///

  /// The only thing that needs to read the reply from a future is our poll

  /// method, in this module. Everything else should just be setting replies,

  /// and can use set_reply accordingly.

  pub(super) fn lock(&self) -> MutexGuard<'_, ButtplugFutureState<T>> {
    // There should only ever be lock contention if we're polling while

    // settings, which should rarely if ever happen.

    self.state.lock().unwrap()
  }

  /// Locks immediately and sets the reply for the internal waker, or panics if

  /// lock is held.

  ///

  /// See [ButtplugFutureStateShared] struct documentation for more info on

  /// locking.

  pub fn set_reply(&self, reply: T) {
    self.lock().set_reply(reply);
  }
}

impl<T> Default for ButtplugFutureStateShared<T> {
  fn default() -> Self {
    Self {
      state: Arc::new(Mutex::new(ButtplugFutureState::<T>::default())),
    }
  }
}

impl<T> Clone for ButtplugFutureStateShared<T> {
  fn clone(&self) -> Self {
    Self {
      state: self.state.clone(),
    }
  }
}

/// [Future] implementation for long operations in Buttplug.

///

/// This is a convenience struct, used for handling indeterminately long

/// operations, like Buttplug's request/reply communications between the client

/// and server. It allows us to say what type we expect back, then hold a waker

/// that we can pass around as needed.

#[derive(Debug)]
pub struct ButtplugFuture<T> {
  /// State that holds the waker for the future, and the reply (once set).

  ///

  /// ## Notes

  ///

  /// This needs to be an [Arc]<[Mutex]<T>> in order to make it mutable under

  /// pinning when dealing with being a future. There is a chance we could do

  /// this as a [Pin::get_unchecked_mut] borrow, which would be way faster, but

  /// that's dicey and hasn't been proven as needed for speed yet.

  waker_state: ButtplugFutureStateShared<T>,
}

// TODO Should we implement drop on this?

//

// It'd be nice if the future would yell if its dropping and the waker didn't

// fire? Otherwise it seems like we could have quiet deadlocks.


impl<T> Default for ButtplugFuture<T> {
  fn default() -> Self {
    ButtplugFuture::<T> {
      waker_state: ButtplugFutureStateShared::<T>::default(),
    }
  }
}

impl<T> ButtplugFuture<T> {
  /// Returns a clone of the state, used for moving the state across contexts

  /// (tasks/threads/etc...).

  pub fn get_state_clone(&self) -> ButtplugFutureStateShared<T> {
    self.waker_state.clone()
  }
}

impl<T> Future for ButtplugFuture<T> {
  type Output = T;

  /// Wakes up when the Output type reply has been set in the

  /// [ButtplugFutureStateShared].

  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    // This is the only place lock_now_or_panic should be called, since we're

    // reading the value.

    let mut waker_state = self.waker_state.lock();
    if waker_state.reply.is_some() {
      let msg = waker_state.reply.take().unwrap();
      Poll::Ready(msg)
    } else {
      waker_state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}