tokio_memcache/client/
connection.rs1use tokio_core::net::TcpStream;
2use tokio_service::Service;
3use tokio_proto::pipeline::ClientService;
4use futures::{future, Future, BoxFuture};
5
6use super::protocol::{MemcachedProto};
7use protocol::{Request, Response, Command, extras};
8use errors::MemcacheError;
9use types::{AsArgument, FromResponse, Expiration};
10
11type Result<T> = BoxFuture<T, MemcacheError>;
12
13pub struct Connection {
14 pub inner: ClientService<TcpStream, MemcachedProto>,
15}
16
17impl Connection {
18
19 pub fn get<K, R>(&self, key: &K) -> BoxFuture<R, MemcacheError>
20 where K: AsArgument, R: FromResponse {
21 let mut request = Request::new(Command::Get);
22 request.set_key(key);
23
24 self.send(request).and_then(|resp| {
25 match R::try_from(resp.value().unwrap()) {
26 Err(e) => future::err(MemcacheError::from(e)),
27 Ok(res) => future::ok(res)
28 }
29 }).boxed()
30 }
31
32 pub fn set<K, V, E>(&self, key: &K, value: &V, expiration: E) -> Result<()>
33 where K: AsArgument, V: AsArgument, E: Expiration {
34 let mut request = Request::new(Command::Set);
35 request.set_key(key);
36 request.set_value(value);
37 request.set_extras(extras::SetExtras{
38 flags: 0,
39 expiration: expiration.as_value(),
40 });
41
42 self.send(request).and_then(|_| future::ok(())).boxed()
43 }
44
45 pub fn add<K, V, E>(&self, key: &K, value: &V, expiration: E) -> BoxFuture<(), MemcacheError>
46 where K: AsArgument, V: AsArgument, E: Expiration {
47 let mut request = Request::new(Command::Add);
48 request.set_key(key);
49 request.set_extras(extras::AddExtras{
50 flags: 0,
51 expiration: expiration.as_value(),
52 });
53 request.set_value(value);
54
55 self.send(request).and_then(|_| future::ok(())).boxed()
56 }
57
58 pub fn send(&self, req: Request) -> Result<Response> {
65 self.call(req)
66 }
67
68}
69
70impl Service for Connection {
71 type Request = Request;
72 type Response = Response;
73 type Error = MemcacheError;
74 type Future = BoxFuture<Self::Response, Self::Error>;
75
76 fn call(&self, req: Self::Request) -> Self::Future {
77 self.inner.call(req).then(|res| {
78 match res {
79 Ok(resp) => {
80 if resp.is_ok() {
81 future::ok(resp)
82 } else {
83 future::err(MemcacheError::from(resp))
84 }
85 },
86 Err(e) => future::err(MemcacheError::from(e)),
87 }
88 }).boxed()
89 }
90}
91