Skip to main content

kevy_client_async/
cmd_list.rs

1//! Async mirror of list commands on `kevy_client::Connection`.
2
3use std::io;
4
5use kevy_resp::Reply;
6
7use crate::codec::AsyncRespCodec;
8use crate::conn::AsyncConnection;
9use crate::reply::{array_to_bulks, string, unexpected, vec2};
10use crate::transport::AsyncTransport;
11
12impl AsyncConnection {
13    /// `LPUSH key value [value ...]`. Returns new list length.
14    pub async fn lpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
15        list_push(self.codec_mut(), b"LPUSH", key, values).await
16    }
17
18    /// `RPUSH key value [value ...]`. Returns new list length.
19    pub async fn rpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
20        list_push(self.codec_mut(), b"RPUSH", key, values).await
21    }
22
23    /// `LPOP key count`. Returns up to `count` head values; empty if
24    /// absent / drained.
25    pub async fn lpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
26        list_pop(self.codec_mut(), b"LPOP", key, count).await
27    }
28
29    /// `RPOP key count`. Symmetric to `lpop` from the tail.
30    pub async fn rpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
31        list_pop(self.codec_mut(), b"RPOP", key, count).await
32    }
33
34    /// `LLEN key`. 0 if absent.
35    pub async fn llen(&mut self, key: &[u8]) -> io::Result<usize> {
36        match self.codec_mut().request(&vec2(b"LLEN", key)).await? {
37            Reply::Int(n) if n >= 0 => Ok(n as usize),
38            Reply::Error(e) => Err(io::Error::other(string(e))),
39            other => Err(unexpected(other)),
40        }
41    }
42
43    /// `LRANGE key start stop`. Negative offsets count from tail.
44    pub async fn lrange(
45        &mut self,
46        key: &[u8],
47        start: i64,
48        stop: i64,
49    ) -> io::Result<Vec<Vec<u8>>> {
50        let args = vec![
51            b"LRANGE".to_vec(),
52            key.to_vec(),
53            start.to_string().into_bytes(),
54            stop.to_string().into_bytes(),
55        ];
56        match self.codec_mut().request(&args).await? {
57            Reply::Array(items) => array_to_bulks(items),
58            Reply::Error(e) => Err(io::Error::other(string(e))),
59            other => Err(unexpected(other)),
60        }
61    }
62}
63
64async fn list_push<T: AsyncTransport>(
65    c: &mut AsyncRespCodec<T>,
66    verb: &[u8],
67    key: &[u8],
68    values: &[&[u8]],
69) -> io::Result<usize> {
70    let mut args = Vec::with_capacity(values.len() + 2);
71    args.push(verb.to_vec());
72    args.push(key.to_vec());
73    args.extend(values.iter().map(|v| v.to_vec()));
74    match c.request(&args).await? {
75        Reply::Int(n) if n >= 0 => Ok(n as usize),
76        Reply::Error(e) => Err(io::Error::other(string(e))),
77        other => Err(unexpected(other)),
78    }
79}
80
81async fn list_pop<T: AsyncTransport>(
82    c: &mut AsyncRespCodec<T>,
83    verb: &[u8],
84    key: &[u8],
85    count: usize,
86) -> io::Result<Vec<Vec<u8>>> {
87    let args = vec![verb.to_vec(), key.to_vec(), count.to_string().into_bytes()];
88    match c.request(&args).await? {
89        Reply::Array(items) => array_to_bulks(items),
90        Reply::Bulk(v) => Ok(vec![v]),
91        Reply::Nil => Ok(Vec::new()),
92        Reply::Error(e) => Err(io::Error::other(string(e))),
93        other => Err(unexpected(other)),
94    }
95}