1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
// Buttplug Rust Source Code File - See https://buttplug.io for more info. // // Copyright 2016-2020 Nonpolynomial Labs LLC. All rights reserved. // // Licensed under the BSD 3-Clause license. See LICENSE file in the project root // for full license information. //! Buttplug futures utilities. Mostly used for building message futures in the //! client, used to wait on responses from the server. use core::pin::Pin; use futures::{ future::Future, task::{Context, Poll, Waker}, }; use std::sync::{Arc, Mutex, MutexGuard}; /// Struct used for facilitating resolving futures across contexts. /// /// Since ButtplugFuture is [Pinned][Pin], we can't just go passing it around /// tasks or threads. This struct is therefore used to get replies from other /// contexts while letting the future stay pinned. It holds the reply to the /// future, as well as a [Waker] for waking up the future when the reply is set. #[derive(Debug, Clone)] pub struct ButtplugFutureState<T> { reply: Option<T>, waker: Option<Waker>, } // For some reason, deriving default above doesn't work, but doing an explicit // derive here does work. impl<T> Default for ButtplugFutureState<T> { fn default() -> Self { ButtplugFutureState::<T> { reply: None, waker: None, } } } impl<T> ButtplugFutureState<T> { /// Sets the response for the future, firing the waker. /// /// When a response is received from whatever we're waiting on, this function /// takes the response, updates the state struct, and calls [Waker::wake] so /// that the corresponding future can finish. /// /// # Panics /// /// If the reply is set twice, the library will panic. We have no way of /// resolving two replies to the same future, so this is considered a /// catastrophic error. pub fn set_reply(&mut self, reply: T) { if self.reply.is_some() { panic!("set_reply_msg called multiple times on the same future."); } self.reply = Some(reply); if self.waker.is_some() { self.waker.take().unwrap().wake(); } } } /// Shared [ButtplugFutureState] type. /// /// [ButtplugFutureState] is made to be shared across tasks, and we'll never /// know if those tasks are running on single or multithreaded executors. /// /// # Panics and notes on setting replies /// /// The lock for a [ButtplugFutureState] should only ever be taken when the /// reply is being set (which the `set_reply` method does internally), and there /// should never be a point where the reply is set twice (See the panic /// documentation for [ButtplugFutureState]). In order to make sure we never /// block, we always lock using try_lock with .expect(). If try_lock fails, this /// means we're already in a double reply situation, and therefore we'll panic /// on the .expect(). Any panic from this should be considered a library error /// and reported as a bug. #[derive(Debug)] pub struct ButtplugFutureStateShared<T> { /// The internal state of the future. When `set_reply` is run, we fill this in /// with the value we want the related future to resolve with. state: Arc<Mutex<ButtplugFutureState<T>>>, } unsafe impl<T> Sync for ButtplugFutureStateShared<T> { } unsafe impl<T> Send for ButtplugFutureStateShared<T> { } impl<T> ButtplugFutureStateShared<T> { pub fn new(state: ButtplugFutureState<T>) -> Self { Self { state: Arc::new(Mutex::new(state)), } } /// Locks and returns a [MutexGuard]. /// /// See [ButtplugFutureStateShared] struct documentation for more info on /// locking. /// /// # Visibility /// /// The only thing that needs to read the reply from a future is our poll /// method, in this module. Everything else should just be setting replies, /// and can use set_reply accordingly. pub(super) fn lock(&self) -> MutexGuard<'_, ButtplugFutureState<T>> { // There should only ever be lock contention if we're polling while // settings, which should rarely if ever happen. self.state.lock().unwrap() } /// Locks immediately and sets the reply for the internal waker, or panics if /// lock is held. /// /// See [ButtplugFutureStateShared] struct documentation for more info on /// locking. pub fn set_reply(&self, reply: T) { self.lock().set_reply(reply); } } impl<T> Default for ButtplugFutureStateShared<T> { fn default() -> Self { Self { state: Arc::new(Mutex::new(ButtplugFutureState::<T>::default())), } } } impl<T> Clone for ButtplugFutureStateShared<T> { fn clone(&self) -> Self { Self { state: self.state.clone(), } } } /// [Future] implementation for long operations in Buttplug. /// /// This is a convenience struct, used for handling indeterminately long /// operations, like Buttplug's request/reply communications between the client /// and server. It allows us to say what type we expect back, then hold a waker /// that we can pass around as needed. #[derive(Debug)] pub struct ButtplugFuture<T> { /// State that holds the waker for the future, and the reply (once set). /// /// ## Notes /// /// This needs to be an [Arc]<[Mutex]<T>> in order to make it mutable under /// pinning when dealing with being a future. There is a chance we could do /// this as a [Pin::get_unchecked_mut] borrow, which would be way faster, but /// that's dicey and hasn't been proven as needed for speed yet. waker_state: ButtplugFutureStateShared<T>, } // TODO Should we implement drop on this? // // It'd be nice if the future would yell if its dropping and the waker didn't // fire? Otherwise it seems like we could have quiet deadlocks. impl<T> Default for ButtplugFuture<T> { fn default() -> Self { ButtplugFuture::<T> { waker_state: ButtplugFutureStateShared::<T>::default(), } } } impl<T> ButtplugFuture<T> { /// Returns a clone of the state, used for moving the state across contexts /// (tasks/threads/etc...). pub fn get_state_clone(&self) -> ButtplugFutureStateShared<T> { self.waker_state.clone() } } impl<T> Future for ButtplugFuture<T> { type Output = T; /// Wakes up when the Output type reply has been set in the /// [ButtplugFutureStateShared]. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { // This is the only place lock_now_or_panic should be called, since we're // reading the value. let mut waker_state = self.waker_state.lock(); if waker_state.reply.is_some() { let msg = waker_state.reply.take().unwrap(); Poll::Ready(msg) } else { waker_state.waker = Some(cx.waker().clone()); Poll::Pending } } }