1use std::ops::Add;
2use std::time::{Duration, SystemTime};
3
4use futures::{Future, FutureExt};
5use serde::{Deserialize, Serialize};
6
7use super::{
8 BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
9 Timestamp,
10};
11use crate::keyvalue::{AsyncKeyValue, SetCommand, Value};
12use crate::Error;
13
14#[must_use = "the key-value operation is not performed until execute() is called"]
16pub struct Builder<'a, KeyValue, V> {
17 kv: &'a KeyValue,
18 namespace: Option<String>,
19 key: String,
20 value: PendingValue<'a, V>,
21 expiration: Option<Timestamp>,
22 keep_existing_expiration: bool,
23 check: Option<KeyCheck>,
24}
25
26impl<'a, K, V> Builder<'a, K, V>
27where
28 K: KeyValue,
29 V: Serialize + Send + Sync,
30{
31 pub(crate) const fn new(
32 kv: &'a K,
33 namespace: Option<String>,
34 key: String,
35 value: PendingValue<'a, V>,
36 ) -> Self {
37 Self {
38 key,
39 value,
40 kv,
41 namespace,
42 expiration: None,
43 keep_existing_expiration: false,
44 check: None,
45 }
46 }
47
48 pub fn expire_in(mut self, duration: Duration) -> Self {
50 self.expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
52 self
53 }
54
55 pub fn expire_at(mut self, time: SystemTime) -> Self {
57 self.expiration = Some(Timestamp::from(time));
59 self
60 }
61
62 pub const fn keep_existing_expiration(mut self) -> Self {
64 self.keep_existing_expiration = true;
65 self
66 }
67
68 pub const fn only_if_exists(mut self) -> Self {
70 self.check = Some(KeyCheck::OnlyIfPresent);
71 self
72 }
73
74 pub const fn only_if_vacant(mut self) -> Self {
76 self.check = Some(KeyCheck::OnlyIfVacant);
77 self
78 }
79
80 #[allow(clippy::missing_panics_doc)]
83 pub fn returning_previous(self) -> Result<Option<Value>, Error> {
84 let Self {
85 kv,
86 namespace,
87 key,
88 value,
89 expiration,
90 keep_existing_expiration,
91 check,
92 } = self;
93
94 let result = kv.execute_key_operation(KeyOperation {
95 namespace,
96 key,
97 command: Command::Set(SetCommand {
98 value: value.prepare()?,
99 expiration,
100 keep_existing_expiration,
101 check,
102 return_previous_value: true,
103 }),
104 })?;
105 match result {
106 Output::Value(value) => Ok(value),
107 Output::Status(KeyStatus::NotChanged) => Ok(None),
108 Output::Status(_) => unreachable!("Unexpected output from Set"),
109 }
110 }
111
112 #[allow(clippy::missing_panics_doc)]
115 pub fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
116 self,
117 ) -> Result<Option<OtherV>, Error> {
118 self.returning_previous()?
119 .map(|value| value.deserialize())
120 .transpose()
121 }
122
123 pub fn execute(self) -> Result<KeyStatus, Error> {
125 let Self {
126 kv,
127 namespace,
128 key,
129 value,
130 expiration,
131 keep_existing_expiration,
132 check,
133 } = self;
134 let result = kv.execute_key_operation(KeyOperation {
135 namespace,
136 key,
137 command: Command::Set(SetCommand {
138 value: value.prepare()?,
139 expiration,
140 keep_existing_expiration,
141 check,
142 return_previous_value: false,
143 }),
144 })?;
145 if let Output::Status(status) = result {
146 Ok(status)
147 } else {
148 unreachable!("Unexpected output from Set")
149 }
150 }
151}
152
153#[must_use = "futures do nothing unless you `.await` or poll them"]
155pub struct AsyncBuilder<'a, KeyValue, V> {
156 state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
157}
158
159struct Options<'a, KeyValue, V> {
160 kv: &'a KeyValue,
161 namespace: Option<String>,
162 key: String,
163 value: PendingValue<'a, V>,
164 expiration: Option<Timestamp>,
165 keep_existing_expiration: bool,
166 check: Option<KeyCheck>,
167}
168
169impl<'a, K, V> AsyncBuilder<'a, K, V>
170where
171 K: AsyncKeyValue,
172 V: Serialize + Send + Sync,
173{
174 pub(crate) const fn new(
175 kv: &'a K,
176 namespace: Option<String>,
177 key: String,
178 value: PendingValue<'a, V>,
179 ) -> Self {
180 Self {
181 state: BuilderState::Pending(Some(Options {
182 key,
183 value,
184 kv,
185 namespace,
186 expiration: None,
187 keep_existing_expiration: false,
188 check: None,
189 })),
190 }
191 }
192
193 fn options(&mut self) -> &mut Options<'a, K, V> {
194 if let BuilderState::Pending(Some(options)) = &mut self.state {
195 options
196 } else {
197 panic!("Attempted to use after retrieving the result")
198 }
199 }
200
201 pub fn expire_in(mut self, duration: Duration) -> Self {
203 self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
205 self
206 }
207
208 pub fn expire_at(mut self, time: SystemTime) -> Self {
210 self.options().expiration = Some(Timestamp::from(time));
212 self
213 }
214
215 pub fn keep_existing_expiration(mut self) -> Self {
217 self.options().keep_existing_expiration = true;
218 self
219 }
220
221 pub fn only_if_exists(mut self) -> Self {
223 self.options().check = Some(KeyCheck::OnlyIfPresent);
224 self
225 }
226
227 pub fn only_if_vacant(mut self) -> Self {
229 self.options().check = Some(KeyCheck::OnlyIfVacant);
230 self
231 }
232
233 #[allow(clippy::missing_panics_doc)]
236 pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
237 if let BuilderState::Pending(Some(builder)) = self.state {
238 let Options {
239 kv,
240 namespace,
241 key,
242 value,
243 expiration,
244 keep_existing_expiration,
245 check,
246 } = builder;
247
248 let result = kv
249 .execute_key_operation(KeyOperation {
250 namespace,
251 key,
252 command: Command::Set(SetCommand {
253 value: value.prepare()?,
254 expiration,
255 keep_existing_expiration,
256 check,
257 return_previous_value: true,
258 }),
259 })
260 .await?;
261 match result {
262 Output::Value(value) => Ok(value),
263 Output::Status(KeyStatus::NotChanged) => Ok(None),
264 Output::Status(_) => unreachable!("Unexpected output from Set"),
265 }
266 } else {
267 panic!("Using future after it's been executed")
268 }
269 }
270
271 #[allow(clippy::missing_panics_doc)]
274 pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
275 self,
276 ) -> Result<Option<OtherV>, Error> {
277 self.returning_previous()
278 .await?
279 .map(|value| value.deserialize())
280 .transpose()
281 }
282}
283
284impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
285where
286 K: AsyncKeyValue,
287 V: Serialize + Send + Sync,
288{
289 type Output = Result<KeyStatus, Error>;
290
291 fn poll(
292 mut self: std::pin::Pin<&mut Self>,
293 cx: &mut std::task::Context<'_>,
294 ) -> std::task::Poll<Self::Output> {
295 match &mut self.state {
296 BuilderState::Executing(future) => future.as_mut().poll(cx),
297 BuilderState::Pending(builder) => {
298 let Options {
299 kv,
300 namespace,
301 key,
302 value,
303 expiration,
304 keep_existing_expiration,
305 check,
306 } = builder.take().expect("expected builder to have options");
307 let future = async move {
308 let result = kv
309 .execute_key_operation(KeyOperation {
310 namespace,
311 key,
312 command: Command::Set(SetCommand {
313 value: value.prepare()?,
314 expiration,
315 keep_existing_expiration,
316 check,
317 return_previous_value: false,
318 }),
319 })
320 .await?;
321 if let Output::Status(status) = result {
322 Ok(status)
323 } else {
324 unreachable!("Unexpected output from Set")
325 }
326 }
327 .boxed();
328
329 self.state = BuilderState::Executing(future);
330 self.poll(cx)
331 }
332 }
333 }
334}