second_music_system/
query.rs1use std::{
7 cell::UnsafeCell,
8 fmt::{Debug, Display, Formatter, Result as FmtResult},
9 future::Future,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14 task::Poll,
15};
16
17use futures::task::AtomicWaker;
18
19struct Inner<T: Send + Sized> {
20 ready: AtomicBool,
21 waker: AtomicWaker,
22 response: UnsafeCell<Option<T>>,
23}
24
25unsafe impl<T: Send + Sync> Sync for Inner<T> {}
26
27pub struct Response<T: Send + Sized>(Arc<Inner<T>>);
30pub struct Responder<T: Send + Sized>(Arc<Inner<T>>);
33
34impl<T: Send + Sized> Debug for Responder<T> {
35 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
36 write!(f, "query::Responder")
37 }
38}
39
40impl<T: Send + Sized> Debug for Response<T> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
44 write!(f, "query::Response(")?;
45 if self.0.ready.load(Ordering::Relaxed) {
46 write!(f, "ready")?;
47 } else {
48 write!(f, "pending...")?;
49 }
50 write!(f, ")")
51 }
52}
53
54impl<T: Send + Sized> Drop for Responder<T> {
55 fn drop(&mut self) {
56 self.0.ready.store(true, Ordering::Release)
57 }
58}
59
60pub fn make<T: Send + Sized>() -> (Responder<T>, Response<T>) {
63 let inner = Arc::new(Inner {
64 ready: AtomicBool::new(false),
65 waker: AtomicWaker::new(),
66 response: UnsafeCell::new(None),
67 });
68 (Responder(inner.clone()), Response(inner))
69}
70
71impl<T: Send + Sized> Responder<T> {
72 pub fn respond(self, value: T) {
73 unsafe { self.0.response.get().write(Some(value)) };
74 self.0.ready.store(true, Ordering::Release);
75 self.0.waker.wake();
76 }
77}
78
79impl<T: Send + Sized> Response<T> {
80 pub fn poll(&self) -> bool {
83 self.0.ready.load(Ordering::Relaxed)
84 }
85 pub fn get(&mut self) -> Option<&T> {
89 match self.try_get() {
90 Ok(x) => Some(x),
91 Err(TryGetError::NotReady) => None,
92 Err(e) => panic!("{e}"),
93 }
94 }
95 pub fn take(&mut self) -> Option<T> {
99 match self.try_take() {
100 Ok(x) => Some(x),
101 Err(TryGetError::NotReady) => None,
102 Err(e) => panic!("{e}"),
103 }
104 }
105 pub fn try_get(&mut self) -> Result<&T, TryGetError> {
108 if self.0.ready.load(Ordering::Acquire) {
109 match unsafe { self.0.response.get().as_ref().unwrap().as_ref() } {
110 Some(x) => Ok(x),
111 None => Err(TryGetError::MissingResponse),
112 }
113 } else {
114 Err(TryGetError::NotReady)
115 }
116 }
117 pub fn try_take(&mut self) -> Result<T, TryTakeError> {
122 if self.0.ready.load(Ordering::Acquire) {
123 match unsafe { self.0.response.get().as_mut().unwrap().take() } {
124 Some(x) => Ok(x),
125 None => Err(TryTakeError::MissingResponse),
126 }
127 } else {
128 Err(TryTakeError::NotReady)
129 }
130 }
131}
132
133#[derive(PartialEq, Eq, Clone, Copy, Debug)]
134pub enum TryGetError {
135 NotReady,
136 MissingResponse,
137}
138pub type TryTakeError = TryGetError;
139
140impl Display for TryGetError {
141 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
142 match self {
143 TryGetError::NotReady => write!(f, "response not ready yet"),
144 TryGetError::MissingResponse => write!(f, "response ready but missing (responder dropped unspent, or .take()/.try_take()/.await performed more than once)"),
145 }
146 }
147}
148
149impl<T: Send + Sized> Future for Response<T> {
150 type Output = T;
151 fn poll(
152 mut self: std::pin::Pin<&mut Self>,
153 cx: &mut std::task::Context<'_>,
154 ) -> Poll<T> {
155 match self.take() {
156 Some(x) => Poll::Ready(x),
157 None => {
158 self.0.waker.register(cx.waker());
159 match self.take() {
160 Some(x) => Poll::Ready(x),
161 None => Poll::Pending,
162 }
163 }
164 }
165 }
166}