bonsaidb_core/keyvalue/implementation/
increment.rs1use std::marker::PhantomData;
2
3use futures::{Future, FutureExt};
4
5use super::{BuilderState, Command, KeyOperation, KeyValue, Output};
6use crate::keyvalue::{AsyncKeyValue, IncompatibleTypeError, Numeric, Value};
7use crate::Error;
8
9#[must_use = "the key-value operation is not performed until execute() is called"]
11pub struct Builder<'a, KeyValue, V> {
12 kv: &'a KeyValue,
13 namespace: Option<String>,
14 key: String,
15 increment: bool,
16 amount: Numeric,
17 saturating: bool,
18 _value: PhantomData<V>,
19}
20
21impl<'a, K, V> Builder<'a, K, V>
22where
23 K: KeyValue,
24 V: TryFrom<Numeric, Error = IncompatibleTypeError>,
25{
26 pub(crate) const fn new(
27 kv: &'a K,
28 namespace: Option<String>,
29 increment: bool,
30 key: String,
31 amount: Numeric,
32 ) -> Self {
33 Self {
34 key,
35 kv,
36 namespace,
37 increment,
38 amount,
39 saturating: true,
40 _value: PhantomData,
41 }
42 }
43
44 pub const fn allow_overflow(mut self) -> Self {
46 self.saturating = false;
47 self
48 }
49
50 #[allow(clippy::missing_panics_doc)]
52 pub fn execute(self) -> Result<V, Error> {
53 let Self {
54 kv,
55 namespace,
56 key,
57 increment,
58 amount,
59 saturating,
60 ..
61 } = self;
62 let result = kv.execute_key_operation(KeyOperation {
63 namespace,
64 key,
65 command: if increment {
66 Command::Increment { amount, saturating }
67 } else {
68 Command::Decrement { amount, saturating }
69 },
70 })?;
71 if let Output::Value(Some(Value::Numeric(value))) = result {
72 Ok(V::try_from(value).expect("server should send back identical type"))
73 } else {
74 unreachable!("Unexpected result from key value operation")
75 }
76 }
77}
78
79#[must_use = "futures do nothing unless you `.await` or poll them"]
81pub struct AsyncBuilder<'a, KeyValue, V> {
82 state: BuilderState<'a, Options<'a, KeyValue>, Result<V, Error>>,
83}
84
85struct Options<'a, KeyValue> {
86 kv: &'a KeyValue,
87 namespace: Option<String>,
88 key: String,
89 increment: bool,
90 amount: Numeric,
91 saturating: bool,
92}
93
94impl<'a, K, V> AsyncBuilder<'a, K, V>
95where
96 K: AsyncKeyValue,
97{
98 pub(crate) const fn new(
99 kv: &'a K,
100 namespace: Option<String>,
101 increment: bool,
102 key: String,
103 amount: Numeric,
104 ) -> Self {
105 Self {
106 state: BuilderState::Pending(Some(Options {
107 key,
108 kv,
109 namespace,
110 increment,
111 amount,
112 saturating: true,
113 })),
114 }
115 }
116
117 fn options(&mut self) -> &mut Options<'a, K> {
118 if let BuilderState::Pending(Some(options)) = &mut self.state {
119 options
120 } else {
121 panic!("Attempted to use after retrieving the result")
122 }
123 }
124
125 pub fn allow_overflow(mut self) -> Self {
127 self.options().saturating = false;
128 self
129 }
130}
131
132impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
133where
134 K: AsyncKeyValue,
135 V: TryFrom<Numeric, Error = IncompatibleTypeError>,
136{
137 type Output = Result<V, Error>;
138
139 fn poll(
140 mut self: std::pin::Pin<&mut Self>,
141 cx: &mut std::task::Context<'_>,
142 ) -> std::task::Poll<Self::Output> {
143 match &mut self.state {
144 BuilderState::Executing(future) => future.as_mut().poll(cx),
145 BuilderState::Pending(builder) => {
146 let Options {
147 kv,
148 namespace,
149 key,
150 increment,
151 amount,
152 saturating,
153 } = builder.take().expect("expected builder to have options");
154 let future = async move {
155 let result = kv
156 .execute_key_operation(KeyOperation {
157 namespace,
158 key,
159 command: if increment {
160 Command::Increment { amount, saturating }
161 } else {
162 Command::Decrement { amount, saturating }
163 },
164 })
165 .await?;
166 if let Output::Value(Some(Value::Numeric(value))) = result {
167 Ok(V::try_from(value).expect("server should send back identical type"))
168 } else {
169 unreachable!("Unexpected result from key value operation")
170 }
171 }
172 .boxed();
173
174 self.state = BuilderState::Executing(future);
175 self.poll(cx)
176 }
177 }
178 }
179}