second_music_system/
query.rs

1//! This module is used to asynchronously answer simple queries across thread
2//! boundaries. A query is a question that can only be answered by one thread,
3//! and only be answered to one thread. In essence, this is a one-shot
4//! single-producer-single-consumer channel.
5
6use std::{
7    cell::UnsafeCell,
8    fmt::{Debug, Display, Formatter, Result as FmtResult},
9    future::Future,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14    task::Poll,
15};
16
17use futures::task::AtomicWaker;
18
19struct Inner<T: Send + Sized> {
20    ready: AtomicBool,
21    waker: AtomicWaker,
22    response: UnsafeCell<Option<T>>,
23}
24
25unsafe impl<T: Send + Sync> Sync for Inner<T> {}
26
27/// The response to a query.
28/// [See module-level documentation for more info.](index.html)
29pub struct Response<T: Send + Sized>(Arc<Inner<T>>);
30/// The means with which to respond to a query.
31/// [See module-level documentation for more info.](index.html)
32pub struct Responder<T: Send + Sized>(Arc<Inner<T>>);
33
34impl<T: Send + Sized> Debug for Responder<T> {
35    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
36        write!(f, "query::Responder")
37    }
38}
39
40// specialization is unstable (#31844) so we can't have a specialized version
41// for T: Debug
42impl<T: Send + Sized> Debug for Response<T> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
44        write!(f, "query::Response(")?;
45        if self.0.ready.load(Ordering::Relaxed) {
46            write!(f, "ready")?;
47        } else {
48            write!(f, "pending...")?;
49        }
50        write!(f, ")")
51    }
52}
53
54impl<T: Send + Sized> Drop for Responder<T> {
55    fn drop(&mut self) {
56        self.0.ready.store(true, Ordering::Release)
57    }
58}
59
60/// Initiates a query.
61/// [See module-level documentation for more info.](index.html)
62pub fn make<T: Send + Sized>() -> (Responder<T>, Response<T>) {
63    let inner = Arc::new(Inner {
64        ready: AtomicBool::new(false),
65        waker: AtomicWaker::new(),
66        response: UnsafeCell::new(None),
67    });
68    (Responder(inner.clone()), Response(inner))
69}
70
71impl<T: Send + Sized> Responder<T> {
72    pub fn respond(self, value: T) {
73        unsafe { self.0.response.get().write(Some(value)) };
74        self.0.ready.store(true, Ordering::Release);
75        self.0.waker.wake();
76    }
77}
78
79impl<T: Send + Sized> Response<T> {
80    /// Checks if the response is ready yet. If this function returns true,
81    /// `get` and `take` will only return `Some` or panic, never return `None`.
82    pub fn poll(&self) -> bool {
83        self.0.ready.load(Ordering::Relaxed)
84    }
85    /// If the response is ready, returns `Some(&response)`. If it's not ready
86    /// yet, returns `None`. If the responder was dropped without sending a
87    /// response, or the response has arrived but was already taken, panics.
88    pub fn get(&mut self) -> Option<&T> {
89        match self.try_get() {
90            Ok(x) => Some(x),
91            Err(TryGetError::NotReady) => None,
92            Err(e) => panic!("{e}"),
93        }
94    }
95    /// If the response is ready, returns `Some(response)`. If it's not ready
96    /// yet, returns `None`. If the responder was dropped without sending a
97    /// response, or the response has arrived but was already taken, panics.
98    pub fn take(&mut self) -> Option<T> {
99        match self.try_take() {
100            Ok(x) => Some(x),
101            Err(TryGetError::NotReady) => None,
102            Err(e) => panic!("{e}"),
103        }
104    }
105    /// If the response is ready, returns `Ok(&response)`. In any other
106    /// circumstances, returns a `TryGetError` variant.
107    pub fn try_get(&mut self) -> Result<&T, TryGetError> {
108        if self.0.ready.load(Ordering::Acquire) {
109            match unsafe { self.0.response.get().as_ref().unwrap().as_ref() } {
110                Some(x) => Ok(x),
111                None => Err(TryGetError::MissingResponse),
112            }
113        } else {
114            Err(TryGetError::NotReady)
115        }
116    }
117    /// If the response is ready, returns `Ok(response)`. In any other
118    /// circumstances, returns a `TryTakeError` variant. It is a logic error to
119    /// do anything with this `Response` after a successful `take` or
120    /// `try_take`.
121    pub fn try_take(&mut self) -> Result<T, TryTakeError> {
122        if self.0.ready.load(Ordering::Acquire) {
123            match unsafe { self.0.response.get().as_mut().unwrap().take() } {
124                Some(x) => Ok(x),
125                None => Err(TryTakeError::MissingResponse),
126            }
127        } else {
128            Err(TryTakeError::NotReady)
129        }
130    }
131}
132
133#[derive(PartialEq, Eq, Clone, Copy, Debug)]
134pub enum TryGetError {
135    NotReady,
136    MissingResponse,
137}
138pub type TryTakeError = TryGetError;
139
140impl Display for TryGetError {
141    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
142        match self {
143            TryGetError::NotReady => write!(f, "response not ready yet"),
144            TryGetError::MissingResponse => write!(f, "response ready but missing (responder dropped unspent, or .take()/.try_take()/.await performed more than once)"),
145        }
146    }
147}
148
149impl<T: Send + Sized> Future for Response<T> {
150    type Output = T;
151    fn poll(
152        mut self: std::pin::Pin<&mut Self>,
153        cx: &mut std::task::Context<'_>,
154    ) -> Poll<T> {
155        match self.take() {
156            Some(x) => Poll::Ready(x),
157            None => {
158                self.0.waker.register(cx.waker());
159                match self.take() {
160                    Some(x) => Poll::Ready(x),
161                    None => Poll::Pending,
162                }
163            }
164        }
165    }
166}