bonsaidb_core/keyvalue/implementation/
set.rs

1use std::ops::Add;
2use std::time::{Duration, SystemTime};
3
4use futures::{Future, FutureExt};
5use serde::{Deserialize, Serialize};
6
7use super::{
8    BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
9    Timestamp,
10};
11use crate::keyvalue::{AsyncKeyValue, SetCommand, Value};
12use crate::Error;
13
14/// Builder for a [`Command::Set`] key-value operation.
15#[must_use = "the key-value operation is not performed until execute() is called"]
16pub struct Builder<'a, KeyValue, V> {
17    kv: &'a KeyValue,
18    namespace: Option<String>,
19    key: String,
20    value: PendingValue<'a, V>,
21    expiration: Option<Timestamp>,
22    keep_existing_expiration: bool,
23    check: Option<KeyCheck>,
24}
25
26impl<'a, K, V> Builder<'a, K, V>
27where
28    K: KeyValue,
29    V: Serialize + Send + Sync,
30{
31    pub(crate) const fn new(
32        kv: &'a K,
33        namespace: Option<String>,
34        key: String,
35        value: PendingValue<'a, V>,
36    ) -> Self {
37        Self {
38            key,
39            value,
40            kv,
41            namespace,
42            expiration: None,
43            keep_existing_expiration: false,
44            check: None,
45        }
46    }
47
48    /// Set this key to expire after `duration` from now.
49    pub fn expire_in(mut self, duration: Duration) -> Self {
50        // TODO consider using checked_add here and making it return an error.
51        self.expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
52        self
53    }
54
55    /// Set this key to expire at the provided `time`.
56    pub fn expire_at(mut self, time: SystemTime) -> Self {
57        // TODO consider using checked_add here and making it return an error.
58        self.expiration = Some(Timestamp::from(time));
59        self
60    }
61
62    /// If the key already exists, do not update the currently set expiration.
63    pub const fn keep_existing_expiration(mut self) -> Self {
64        self.keep_existing_expiration = true;
65        self
66    }
67
68    /// Only set the value if this key already exists.
69    pub const fn only_if_exists(mut self) -> Self {
70        self.check = Some(KeyCheck::OnlyIfPresent);
71        self
72    }
73
74    /// Only set the value if this key isn't present.
75    pub const fn only_if_vacant(mut self) -> Self {
76        self.check = Some(KeyCheck::OnlyIfVacant);
77        self
78    }
79
80    /// Executes the Set operation, requesting the previous value be returned.
81    /// If no change is made, None will be returned.
82    #[allow(clippy::missing_panics_doc)]
83    pub fn returning_previous(self) -> Result<Option<Value>, Error> {
84        let Self {
85            kv,
86            namespace,
87            key,
88            value,
89            expiration,
90            keep_existing_expiration,
91            check,
92        } = self;
93
94        let result = kv.execute_key_operation(KeyOperation {
95            namespace,
96            key,
97            command: Command::Set(SetCommand {
98                value: value.prepare()?,
99                expiration,
100                keep_existing_expiration,
101                check,
102                return_previous_value: true,
103            }),
104        })?;
105        match result {
106            Output::Value(value) => Ok(value),
107            Output::Status(KeyStatus::NotChanged) => Ok(None),
108            Output::Status(_) => unreachable!("Unexpected output from Set"),
109        }
110    }
111
112    /// Executes the Set operation, requesting the previous value be returned.
113    /// If no change is made, None will be returned.
114    #[allow(clippy::missing_panics_doc)]
115    pub fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
116        self,
117    ) -> Result<Option<OtherV>, Error> {
118        self.returning_previous()?
119            .map(|value| value.deserialize())
120            .transpose()
121    }
122
123    /// Executes the operation using the configured options.
124    pub fn execute(self) -> Result<KeyStatus, Error> {
125        let Self {
126            kv,
127            namespace,
128            key,
129            value,
130            expiration,
131            keep_existing_expiration,
132            check,
133        } = self;
134        let result = kv.execute_key_operation(KeyOperation {
135            namespace,
136            key,
137            command: Command::Set(SetCommand {
138                value: value.prepare()?,
139                expiration,
140                keep_existing_expiration,
141                check,
142                return_previous_value: false,
143            }),
144        })?;
145        if let Output::Status(status) = result {
146            Ok(status)
147        } else {
148            unreachable!("Unexpected output from Set")
149        }
150    }
151}
152
153/// Builder for a [`Command::Set`] key-value operation.
154#[must_use = "futures do nothing unless you `.await` or poll them"]
155pub struct AsyncBuilder<'a, KeyValue, V> {
156    state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
157}
158
159struct Options<'a, KeyValue, V> {
160    kv: &'a KeyValue,
161    namespace: Option<String>,
162    key: String,
163    value: PendingValue<'a, V>,
164    expiration: Option<Timestamp>,
165    keep_existing_expiration: bool,
166    check: Option<KeyCheck>,
167}
168
169impl<'a, K, V> AsyncBuilder<'a, K, V>
170where
171    K: AsyncKeyValue,
172    V: Serialize + Send + Sync,
173{
174    pub(crate) const fn new(
175        kv: &'a K,
176        namespace: Option<String>,
177        key: String,
178        value: PendingValue<'a, V>,
179    ) -> Self {
180        Self {
181            state: BuilderState::Pending(Some(Options {
182                key,
183                value,
184                kv,
185                namespace,
186                expiration: None,
187                keep_existing_expiration: false,
188                check: None,
189            })),
190        }
191    }
192
193    fn options(&mut self) -> &mut Options<'a, K, V> {
194        if let BuilderState::Pending(Some(options)) = &mut self.state {
195            options
196        } else {
197            panic!("Attempted to use after retrieving the result")
198        }
199    }
200
201    /// Set this key to expire after `duration` from now.
202    pub fn expire_in(mut self, duration: Duration) -> Self {
203        // TODO consider using checked_add here and making it return an error.
204        self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
205        self
206    }
207
208    /// Set this key to expire at the provided `time`.
209    pub fn expire_at(mut self, time: SystemTime) -> Self {
210        // TODO consider using checked_add here and making it return an error.
211        self.options().expiration = Some(Timestamp::from(time));
212        self
213    }
214
215    /// If the key already exists, do not update the currently set expiration.
216    pub fn keep_existing_expiration(mut self) -> Self {
217        self.options().keep_existing_expiration = true;
218        self
219    }
220
221    /// Only set the value if this key already exists.
222    pub fn only_if_exists(mut self) -> Self {
223        self.options().check = Some(KeyCheck::OnlyIfPresent);
224        self
225    }
226
227    /// Only set the value if this key isn't present.
228    pub fn only_if_vacant(mut self) -> Self {
229        self.options().check = Some(KeyCheck::OnlyIfVacant);
230        self
231    }
232
233    /// Executes the Set operation, requesting the previous value be returned.
234    /// If no change is made, None will be returned.
235    #[allow(clippy::missing_panics_doc)]
236    pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
237        if let BuilderState::Pending(Some(builder)) = self.state {
238            let Options {
239                kv,
240                namespace,
241                key,
242                value,
243                expiration,
244                keep_existing_expiration,
245                check,
246            } = builder;
247
248            let result = kv
249                .execute_key_operation(KeyOperation {
250                    namespace,
251                    key,
252                    command: Command::Set(SetCommand {
253                        value: value.prepare()?,
254                        expiration,
255                        keep_existing_expiration,
256                        check,
257                        return_previous_value: true,
258                    }),
259                })
260                .await?;
261            match result {
262                Output::Value(value) => Ok(value),
263                Output::Status(KeyStatus::NotChanged) => Ok(None),
264                Output::Status(_) => unreachable!("Unexpected output from Set"),
265            }
266        } else {
267            panic!("Using future after it's been executed")
268        }
269    }
270
271    /// Executes the Set operation, requesting the previous value be returned.
272    /// If no change is made, None will be returned.
273    #[allow(clippy::missing_panics_doc)]
274    pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
275        self,
276    ) -> Result<Option<OtherV>, Error> {
277        self.returning_previous()
278            .await?
279            .map(|value| value.deserialize())
280            .transpose()
281    }
282}
283
284impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
285where
286    K: AsyncKeyValue,
287    V: Serialize + Send + Sync,
288{
289    type Output = Result<KeyStatus, Error>;
290
291    fn poll(
292        mut self: std::pin::Pin<&mut Self>,
293        cx: &mut std::task::Context<'_>,
294    ) -> std::task::Poll<Self::Output> {
295        match &mut self.state {
296            BuilderState::Executing(future) => future.as_mut().poll(cx),
297            BuilderState::Pending(builder) => {
298                let Options {
299                    kv,
300                    namespace,
301                    key,
302                    value,
303                    expiration,
304                    keep_existing_expiration,
305                    check,
306                } = builder.take().expect("expected builder to have options");
307                let future = async move {
308                    let result = kv
309                        .execute_key_operation(KeyOperation {
310                            namespace,
311                            key,
312                            command: Command::Set(SetCommand {
313                                value: value.prepare()?,
314                                expiration,
315                                keep_existing_expiration,
316                                check,
317                                return_previous_value: false,
318                            }),
319                        })
320                        .await?;
321                    if let Output::Status(status) = result {
322                        Ok(status)
323                    } else {
324                        unreachable!("Unexpected output from Set")
325                    }
326                }
327                .boxed();
328
329                self.state = BuilderState::Executing(future);
330                self.poll(cx)
331            }
332        }
333    }
334}