1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use std::{io, net::SocketAddr};
use tokio::net::TcpStream;
use tokio_util::codec::{Decoder, Framed};
use crate::resp;
pub type RespConnection = Framed<TcpStream, resp::RespCodec>;
pub async fn connect(addr: &SocketAddr) -> Result<RespConnection, io::Error> {
let tcp_stream = TcpStream::connect(addr).await?;
Ok(resp::RespCodec.framed(tcp_stream))
}
#[cfg(test)]
mod test {
use futures_util::{
sink::SinkExt,
stream::{self, StreamExt},
};
use crate::resp;
#[tokio::test]
async fn can_connect() {
let addr = "127.0.0.1:6379".parse().unwrap();
let mut connection = super::connect(&addr).await.expect("Cannot connect");
connection
.send(resp_array!["PING", "TEST"])
.await
.expect("Cannot send PING");
let values: Vec<_> = connection
.take(1)
.map(|r| r.expect("Unexpected invalid data"))
.collect()
.await;
assert_eq!(values.len(), 1);
assert_eq!(values[0], "TEST".into());
}
#[tokio::test]
async fn complex_test() {
let addr = "127.0.0.1:6379".parse().unwrap();
let mut connection = super::connect(&addr).await.expect("Cannot connect");
let mut ops = Vec::new();
ops.push(resp_array!["FLUSH"]);
ops.extend((0..1000).map(|i| resp_array!["SADD", "test_set", format!("VALUE: {}", i)]));
ops.push(resp_array!["SMEMBERS", "test_set"]);
let mut ops_stream = stream::iter(ops).map(Ok);
connection
.send_all(&mut ops_stream)
.await
.expect("Cannot send");
let values: Vec<_> = connection
.skip(1001)
.take(1)
.map(|r| r.expect("Unexpected invalid data"))
.collect()
.await;
assert_eq!(values.len(), 1);
let values = match &values[0] {
resp::RespValue::Array(ref values) => values.clone(),
_ => panic!("Not an array"),
};
assert_eq!(values.len(), 1000);
}
}