tinylfu_cached/cache/command/
acknowledgement.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll, Waker};
6use parking_lot::Mutex;
7use crate::cache::command::{CommandStatus, RejectionReason};
8
9pub struct CommandAcknowledgement {
26 handle: CommandAcknowledgementHandle,
27}
28
29pub struct CommandAcknowledgementHandle {
35 done: AtomicBool,
36 status: Arc<Mutex<CommandStatus>>,
37 waker_state: Arc<Mutex<WakerState>>,
38}
39
40pub(crate) struct WakerState {
41 waker: Option<Waker>,
42}
43
44impl CommandAcknowledgement {
46 pub(crate) fn new() -> Arc<CommandAcknowledgement> {
47 Arc::new(
48 CommandAcknowledgement {
49 handle: CommandAcknowledgementHandle {
50 done: AtomicBool::new(false),
51 status: Arc::new(Mutex::new(CommandStatus::Pending)),
52 waker_state: Arc::new(Mutex::new(WakerState {
53 waker: None
54 })),
55 },
56 }
57 )
58 }
59 pub(crate) fn accepted() -> Arc<CommandAcknowledgement> {
60 Arc::new(
61 CommandAcknowledgement {
62 handle: CommandAcknowledgementHandle {
63 done: AtomicBool::new(true),
64 status: Arc::new(Mutex::new(CommandStatus::Accepted)),
65 waker_state: Arc::new(Mutex::new(WakerState {
66 waker: None
67 })),
68 },
69 }
70 )
71 }
72 pub(crate) fn rejected(reason: RejectionReason) -> Arc<CommandAcknowledgement> {
73 Arc::new(
74 CommandAcknowledgement {
75 handle: CommandAcknowledgementHandle {
76 done: AtomicBool::new(true),
77 status: Arc::new(Mutex::new(CommandStatus::Rejected(reason))),
78 waker_state: Arc::new(Mutex::new(WakerState {
79 waker: None
80 })),
81 },
82 }
83 )
84 }
85
86 pub(crate) fn done(&self, status: CommandStatus) {
88 self.handle.done(status);
89 }
90
91 pub fn handle(&self) -> &CommandAcknowledgementHandle {
92 &self.handle
93 }
94}
95
96impl CommandAcknowledgementHandle {
97 pub(crate) fn done(&self, status: CommandStatus) {
99 self.done.store(true, Ordering::Release);
100 *self.status.lock() = status;
101 if let Some(waker) = &self.waker_state.lock().waker {
102 waker.wake_by_ref();
103 }
104 }
105}
106
107impl Future for &CommandAcknowledgementHandle {
111 type Output = CommandStatus;
112
113 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
114 let mut guard = self.waker_state.lock();
115 match guard.waker.as_ref() {
116 Some(waker) => {
117 if !waker.will_wake(context.waker()) {
118 guard.waker = Some(context.waker().clone());
119 }
120 }
121 None => {
122 guard.waker = Some(context.waker().clone());
123 }
124 }
125 if self.done.load(Ordering::Acquire) {
126 return Poll::Ready(*self.status.lock());
127 }
128 Poll::Pending
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use crate::cache::command::acknowledgement::CommandAcknowledgement;
135 use crate::cache::command::{CommandStatus, RejectionReason};
136
137 #[tokio::test]
138 async fn acknowledge() {
139 let acknowledgement = CommandAcknowledgement::new();
140 tokio::spawn({
141 let acknowledgement = acknowledgement.clone();
142 async move {
143 acknowledgement.done(CommandStatus::Accepted);
144 }
145 });
146
147 let response = acknowledgement.handle().await;
148 assert_eq!(CommandStatus::Accepted, response);
149 }
150
151 #[tokio::test]
152 async fn accepted() {
153 let acknowledgement = CommandAcknowledgement::accepted();
154 let response = acknowledgement.handle().await;
155 assert_eq!(CommandStatus::Accepted, response);
156 }
157
158 #[tokio::test]
159 async fn rejected() {
160 let acknowledgement = CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists);
161 let response = acknowledgement.handle().await;
162 assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), response);
163 }
164}