1use crate::{AsMemcachedValue, Client, Error, Status};
2
3use crate::parser::{
4 parse_meta_arithmetic_response, parse_meta_delete_response, parse_meta_get_response,
5 parse_meta_set_response,
6};
7use crate::parser::{MetaResponse, MetaValue};
8
9use std::future::Future;
10
11use tokio::io::AsyncWriteExt;
12
13pub trait MetaProtocol {
15 fn meta_get<K: AsRef<[u8]>>(
31 &mut self,
32 key: K,
33 is_quiet: bool,
34 opaque: Option<&[u8]>,
35 meta_flags: Option<&[&str]>,
36 ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
37
38 fn meta_set<K, V>(
56 &mut self,
57 key: K,
58 value: V,
59 is_quiet: bool,
60 opaque: Option<&[u8]>,
61 meta_flags: Option<&[&str]>,
62 ) -> impl Future<Output = Result<Option<MetaValue>, Error>>
63 where
64 K: AsRef<[u8]>,
65 V: AsMemcachedValue;
66
67 fn meta_delete<K: AsRef<[u8]>>(
80 &mut self,
81 key: K,
82 is_quiet: bool,
83 opaque: Option<&[u8]>,
84 meta_flags: Option<&[&str]>,
85 ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
86
87 fn meta_increment<K: AsRef<[u8]>>(
111 &mut self,
112 key: K,
113 is_quiet: bool,
114 opaque: Option<&[u8]>,
115 delta: Option<u64>,
116 meta_flags: Option<&[&str]>,
117 ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
118
119 fn meta_decrement<K: AsRef<[u8]>>(
143 &mut self,
144 key: K,
145 is_quiet: bool,
146 opaque: Option<&[u8]>,
147 delta: Option<u64>,
148 meta_flags: Option<&[&str]>,
149 ) -> impl Future<Output = Result<Option<MetaValue>, Error>>;
150}
151
152impl MetaProtocol for Client {
153 async fn meta_get<K: AsRef<[u8]>>(
154 &mut self,
155 key: K,
156 is_quiet: bool,
157 opaque: Option<&[u8]>,
158 meta_flags: Option<&[&str]>,
159 ) -> Result<Option<MetaValue>, Error> {
160 let kr = Self::validate_key_length(key.as_ref())?;
161
162 if let Some(opaque) = &opaque {
163 Self::validate_opaque_length(opaque)?;
164 }
165
166 self.conn.write_all(b"mg ").await?;
167 self.conn.write_all(kr).await?;
168
169 Self::check_and_write_opaque(self, opaque).await?;
170
171 Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
172
173 Self::check_and_write_quiet_mode(self, is_quiet).await?;
174
175 self.conn.flush().await?;
176
177 match self.drive_receive(parse_meta_get_response).await? {
178 MetaResponse::Status(Status::NotFound) => Ok(None),
179 MetaResponse::Status(Status::NoOp) => Ok(None),
180 MetaResponse::Status(s) => Err(s.into()),
181 MetaResponse::Data(d) => d
182 .map(|mut items| {
183 let item = items.remove(0);
184 Ok(item)
185 })
186 .transpose(),
187 }
188 }
189
190 async fn meta_set<K, V>(
191 &mut self,
192 key: K,
193 value: V,
194 is_quiet: bool,
195 opaque: Option<&[u8]>,
196 meta_flags: Option<&[&str]>,
197 ) -> Result<Option<MetaValue>, Error>
198 where
199 K: AsRef<[u8]>,
200 V: AsMemcachedValue,
201 {
202 let kr = Self::validate_key_length(key.as_ref())?;
203
204 if let Some(opaque) = &opaque {
205 Self::validate_opaque_length(opaque)?;
206 }
207
208 let vr = value.as_bytes();
209
210 self.conn.write_all(b"ms ").await?;
211 self.conn.write_all(kr).await?;
212
213 let vlen = vr.len().to_string();
214 self.conn.write_all(b" ").await?;
215 self.conn.write_all(vlen.as_ref()).await?;
216
217 Self::check_and_write_opaque(self, opaque).await?;
218
219 Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
220
221 if is_quiet {
222 self.conn.write_all(b" q").await?;
223 }
224
225 self.conn.write_all(b"\r\n").await?;
226 self.conn.write_all(vr.as_ref()).await?;
227 self.conn.write_all(b"\r\n").await?;
228
229 if is_quiet {
230 self.conn.write_all(b"mn\r\n").await?;
231 }
232
233 self.conn.flush().await?;
234
235 match self.drive_receive(parse_meta_set_response).await? {
236 MetaResponse::Status(Status::Stored) => Ok(None),
237 MetaResponse::Status(Status::NoOp) => Ok(None),
238 MetaResponse::Status(s) => Err(s.into()),
239 MetaResponse::Data(d) => d
240 .map(|mut items| {
241 let item = items.remove(0);
242 Ok(item)
243 })
244 .transpose(),
245 }
246 }
247
248 async fn meta_delete<K: AsRef<[u8]>>(
249 &mut self,
250 key: K,
251 is_quiet: bool,
252 opaque: Option<&[u8]>,
253 meta_flags: Option<&[&str]>,
254 ) -> Result<Option<MetaValue>, Error> {
255 let kr = Self::validate_key_length(key.as_ref())?;
256
257 if let Some(opaque) = &opaque {
258 Self::validate_opaque_length(opaque)?;
259 }
260
261 self.conn.write_all(b"md ").await?;
262 self.conn.write_all(kr).await?;
263
264 Self::check_and_write_opaque(self, opaque).await?;
265
266 Self::check_and_write_meta_flags(self, meta_flags, opaque).await?;
267
268 Self::check_and_write_quiet_mode(self, is_quiet).await?;
269
270 self.conn.flush().await?;
271
272 match self.drive_receive(parse_meta_delete_response).await? {
273 MetaResponse::Status(Status::Deleted) => Ok(None),
274 MetaResponse::Status(Status::Exists) => Err(Error::Protocol(Status::Exists)),
275 MetaResponse::Status(Status::NoOp) => Ok(None),
276 MetaResponse::Status(s) => Err(s.into()),
277 MetaResponse::Data(d) => d
278 .map(|mut items| {
279 let item = items.remove(0);
280 Ok(item)
281 })
282 .transpose(),
283 }
284 }
285
286 async fn meta_increment<K: AsRef<[u8]>>(
287 &mut self,
288 key: K,
289 is_quiet: bool,
290 opaque: Option<&[u8]>,
291 delta: Option<u64>,
292 meta_flags: Option<&[&str]>,
293 ) -> Result<Option<MetaValue>, Error> {
294 let kr = Self::validate_key_length(key.as_ref())?;
295
296 if let Some(opaque) = &opaque {
297 Self::validate_opaque_length(opaque)?;
298 }
299
300 self.conn.write_all(b"ma ").await?;
301 self.conn.write_all(kr).await?;
302
303 Self::check_and_write_opaque(self, opaque).await?;
304
305 if let Some(delta) = delta {
307 if delta != 1 {
308 self.conn.write_all(b" D").await?;
309 self.conn.write_all(delta.to_string().as_bytes()).await?;
310 }
311 }
312
313 if let Some(meta_flags) = meta_flags {
314 for flag in meta_flags {
315 if flag.starts_with('M')
318 || flag.starts_with('q')
319 || (flag.starts_with('D') && delta.is_some())
320 || (flag.starts_with('O') && opaque.is_some())
321 {
322 continue;
323 } else {
324 self.conn.write_all(b" ").await?;
325 self.conn.write_all(flag.as_bytes()).await?;
326 }
327 }
328 }
329
330 Self::check_and_write_quiet_mode(self, is_quiet).await?;
331
332 self.conn.flush().await?;
333
334 match self.drive_receive(parse_meta_arithmetic_response).await? {
335 MetaResponse::Status(Status::Stored) => Ok(None),
336 MetaResponse::Status(Status::NoOp) => Ok(None),
337 MetaResponse::Status(s) => Err(s.into()),
338 MetaResponse::Data(d) => d
339 .map(|mut items| {
340 let item = items.remove(0);
341 Ok(item)
342 })
343 .transpose(),
344 }
345 }
346
347 async fn meta_decrement<K: AsRef<[u8]>>(
348 &mut self,
349 key: K,
350 is_quiet: bool,
351 opaque: Option<&[u8]>,
352 delta: Option<u64>,
353 meta_flags: Option<&[&str]>,
354 ) -> Result<Option<MetaValue>, Error> {
355 let kr = Self::validate_key_length(key.as_ref())?;
356
357 if let Some(opaque) = &opaque {
358 Self::validate_opaque_length(opaque)?;
359 }
360
361 self.conn.write_all(b"ma ").await?;
362 self.conn.write_all(kr).await?;
363 self.conn.write_all(b" MD").await?;
364
365 Self::check_and_write_opaque(self, opaque).await?;
366
367 if let Some(delta) = delta {
368 if delta != 1 {
369 self.conn.write_all(b" D").await?;
370 self.conn.write_all(delta.to_string().as_bytes()).await?;
371 }
372 }
373
374 if let Some(meta_flags) = meta_flags {
375 for flag in meta_flags {
376 if flag.starts_with('M')
379 || flag.starts_with('q')
380 || (flag.starts_with('D') && delta.is_some())
381 || (flag.starts_with('O') && opaque.is_some())
382 {
383 continue;
384 } else {
385 self.conn.write_all(b" ").await?;
386 self.conn.write_all(flag.as_bytes()).await?;
387 }
388 }
389 }
390
391 Self::check_and_write_quiet_mode(self, is_quiet).await?;
392
393 self.conn.flush().await?;
394
395 match self.drive_receive(parse_meta_arithmetic_response).await? {
396 MetaResponse::Status(Status::Stored) => Ok(None),
397 MetaResponse::Status(Status::NoOp) => Ok(None),
398 MetaResponse::Status(s) => Err(s.into()),
399 MetaResponse::Data(d) => d
400 .map(|mut items| {
401 let item = items.remove(0);
402 Ok(item)
403 })
404 .transpose(),
405 }
406 }
407}