tinylfu_cached/cache/command/
acknowledgement.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll, Waker};
6use parking_lot::Mutex;
7use crate::cache::command::{CommandStatus, RejectionReason};
8
9/// The execution of every write operation is returned a `CommandAcknowledgement` wrapped inside [`crate::cache::command::command_executor::CommandSendResult`].
10/// `CommandAcknowledgement` provides a handle to the clients to perform `.await` to get the command status.
11///
12/// ```
13/// use tinylfu_cached::cache::cached::CacheD;
14/// use tinylfu_cached::cache::command::CommandStatus;
15/// use tinylfu_cached::cache::config::ConfigBuilder;
16/// #[tokio::main]
17///  async fn main() {
18///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
19///     let status = cached.put("topic", "microservices").unwrap().handle().await;
20///     assert_eq!(CommandStatus::Accepted, status);
21///     let value = cached.get(&"topic");
22///     assert_eq!(Some("microservices"), value);
23/// }
24/// ```
25pub struct CommandAcknowledgement {
26    handle: CommandAcknowledgementHandle,
27}
28
29/// CommandAcknowledgementHandle implements [`std::future::Future`] and returns a [`crate::cache::command::CommandStatus`]
30///
31/// The initial status in the `CommandAcknowledgementHandle` is `CommandStatus::Pending`
32///
33/// The status gets updated when the command is executed by the `crate::cache::command::command_executor::CommandExecutor`.
34pub struct CommandAcknowledgementHandle {
35    done: AtomicBool,
36    status: Arc<Mutex<CommandStatus>>,
37    waker_state: Arc<Mutex<WakerState>>,
38}
39
40pub(crate) struct WakerState {
41    waker: Option<Waker>,
42}
43
44/// CommandAcknowledgement provides a `handle()` method  that returns a reference to the `CommandAcknowledgementHandle`
45impl CommandAcknowledgement {
46    pub(crate) fn new() -> Arc<CommandAcknowledgement> {
47        Arc::new(
48            CommandAcknowledgement {
49                handle: CommandAcknowledgementHandle {
50                    done: AtomicBool::new(false),
51                    status: Arc::new(Mutex::new(CommandStatus::Pending)),
52                    waker_state: Arc::new(Mutex::new(WakerState {
53                        waker: None
54                    })),
55                },
56            }
57        )
58    }
59    pub(crate) fn accepted() -> Arc<CommandAcknowledgement> {
60        Arc::new(
61            CommandAcknowledgement {
62                handle: CommandAcknowledgementHandle {
63                    done: AtomicBool::new(true),
64                    status: Arc::new(Mutex::new(CommandStatus::Accepted)),
65                    waker_state: Arc::new(Mutex::new(WakerState {
66                        waker: None
67                    })),
68                },
69            }
70        )
71    }
72    pub(crate) fn rejected(reason: RejectionReason) -> Arc<CommandAcknowledgement> {
73        Arc::new(
74            CommandAcknowledgement {
75                handle: CommandAcknowledgementHandle {
76                    done: AtomicBool::new(true),
77                    status: Arc::new(Mutex::new(CommandStatus::Rejected(reason))),
78                    waker_state: Arc::new(Mutex::new(WakerState {
79                        waker: None
80                    })),
81                },
82            }
83        )
84    }
85
86    /// Invokes the `done()` method of `CommandAcknowledgementHandle` which changes the `CommandStatus`
87    pub(crate) fn done(&self, status: CommandStatus) {
88        self.handle.done(status);
89    }
90
91    pub fn handle(&self) -> &CommandAcknowledgementHandle {
92        &self.handle
93    }
94}
95
96impl CommandAcknowledgementHandle {
97    /// Marks the flag to indicate that the command execution is done and changes the `CommandStatus`
98    pub(crate) fn done(&self, status: CommandStatus) {
99        self.done.store(true, Ordering::Release);
100        *self.status.lock() = status;
101        if let Some(waker) = &self.waker_state.lock().waker {
102            waker.wake_by_ref();
103        }
104    }
105}
106
107/// Future implementation for CommandAcknowledgementHandle.
108/// The future is complete when the the command is executed by the `crate::cache::command::command_executor::CommandExecutor`.
109/// The completion of future returns [`CommandStatus`].
110impl Future for &CommandAcknowledgementHandle {
111    type Output = CommandStatus;
112
113    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
114        let mut guard = self.waker_state.lock();
115        match guard.waker.as_ref() {
116            Some(waker) => {
117                if !waker.will_wake(context.waker()) {
118                    guard.waker = Some(context.waker().clone());
119                }
120            }
121            None => {
122                guard.waker = Some(context.waker().clone());
123            }
124        }
125        if self.done.load(Ordering::Acquire) {
126            return Poll::Ready(*self.status.lock());
127        }
128        Poll::Pending
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use crate::cache::command::acknowledgement::CommandAcknowledgement;
135    use crate::cache::command::{CommandStatus, RejectionReason};
136
137    #[tokio::test]
138    async fn acknowledge() {
139        let acknowledgement = CommandAcknowledgement::new();
140        tokio::spawn({
141            let acknowledgement = acknowledgement.clone();
142            async move {
143                acknowledgement.done(CommandStatus::Accepted);
144            }
145        });
146
147        let response = acknowledgement.handle().await;
148        assert_eq!(CommandStatus::Accepted, response);
149    }
150
151    #[tokio::test]
152    async fn accepted() {
153        let acknowledgement = CommandAcknowledgement::accepted();
154        let response = acknowledgement.handle().await;
155        assert_eq!(CommandStatus::Accepted, response);
156    }
157
158    #[tokio::test]
159    async fn rejected() {
160        let acknowledgement = CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists);
161        let response = acknowledgement.handle().await;
162        assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), response);
163    }
164}