bb8_memcached/
lib.rs

1//! Memcached support for the `bb8` connection pool.
2//!
3//! # Example
4//! ```
5//! use futures::future::join_all;
6//! use bb8_memcached::{bb8, MemcacheConnectionManager};
7//!
8//! #[tokio::main]
9//! async fn main() {
10//!     let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
11//!     let pool = bb8::Pool::builder().build(manager).await.unwrap();
12//!
13//!     let mut handles = vec![];
14//!
15//!     for _i in 0..10 {
16//!         let pool = pool.clone();
17//!
18//!         handles.push(tokio::spawn(async move {
19//!             let mut conn = pool.get().await.unwrap();
20//!
21//!             let version = conn.version().await.unwrap();
22//!         }));
23//!     }
24//!
25//!     join_all(handles).await;
26//! }
27//! ```
28
29#![allow(clippy::needless_doctest_main)]
30
31pub use bb8;
32pub use memcache_async;
33
34mod client;
35
36use async_trait::async_trait;
37use client::{Connectable, Connection};
38use std::{
39    io::{self, ErrorKind},
40    time::Duration,
41};
42use url::Url;
43
44/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
45#[derive(Clone, Debug)]
46pub struct MemcacheConnectionManager {
47    uri: Url,
48    /// A tokio controlled timeout for operations get, get_multi, version.
49    memcache_read_timeout: Option<Duration>,
50    /// A tokio controlled timeout for operations set, add, delete, incr, flush.
51    memcache_write_timeout: Option<Duration>,
52}
53
54impl MemcacheConnectionManager {
55    pub fn new<U: Connectable>(u: U) -> Result<MemcacheConnectionManager, io::Error> {
56        Ok(MemcacheConnectionManager {
57            uri: u.get_uri(),
58            memcache_read_timeout: None,
59            memcache_write_timeout: None,
60        })
61    }
62
63    pub fn with_timeouts(mut self, read_timeout: Duration, write_timeout: Duration) -> Self {
64        self.memcache_read_timeout = Some(read_timeout);
65        self.memcache_write_timeout = Some(write_timeout);
66        self
67    }
68}
69
70#[async_trait]
71impl bb8::ManageConnection for MemcacheConnectionManager {
72    type Connection = Connection;
73    type Error = io::Error;
74
75    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
76        Connection::connect(
77            &self.uri,
78            self.memcache_read_timeout,
79            self.memcache_write_timeout,
80        )
81        .await
82    }
83
84    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
85        if conn.is_tainted() {
86            return Err(ErrorKind::ConnectionAborted.into());
87        }
88        conn.version().await.map(|_| ())
89    }
90
91    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
92        conn.is_tainted()
93    }
94}
95
96#[cfg(test)]
97mod test {
98    use super::*;
99    use std::{io::ErrorKind, time::Duration};
100
101    #[tokio::test]
102    async fn test_cache_get() {
103        let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
104        let pool = bb8::Pool::builder().build(manager).await.unwrap();
105
106        let pool = pool.clone();
107        let mut conn = pool.get().await.unwrap();
108
109        assert!(conn.flush().await.is_ok());
110
111        let (key, val) = ("hello", "world");
112        assert_eq!(
113            conn.get(&key).await.unwrap_err().kind(),
114            ErrorKind::NotFound
115        );
116        assert!(conn.set(&key, val.as_bytes(), 0).await.is_ok());
117        assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
118    }
119
120    #[tokio::test]
121    async fn test_cache_add_delete() {
122        let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
123        let pool = bb8::Pool::builder().build(manager).await.unwrap();
124
125        let pool = pool.clone();
126        let mut conn = pool.get().await.unwrap();
127
128        assert!(conn.flush().await.is_ok());
129
130        let (key, val) = ("hello_add_delete", "world");
131        assert!(conn.add(&key, val.as_bytes(), 0).await.is_ok());
132        assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
133
134        // add the same key will fail
135        assert!(conn.add(&key, val.as_bytes(), 0).await.is_err());
136
137        assert!(conn.delete(&key).await.is_ok());
138        assert_eq!(
139            conn.get(&key).await.unwrap_err().kind(),
140            ErrorKind::NotFound
141        );
142    }
143
144    #[tokio::test]
145    async fn test_increment() {
146        let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
147        let pool = bb8::Pool::builder().build(manager).await.unwrap();
148
149        let pool = pool.clone();
150        let mut conn = pool.get().await.unwrap();
151
152        assert!(conn.flush().await.is_ok());
153
154        let (key, val) = ("increment", "0");
155        assert!(conn.set(&key, val.as_bytes(), 0).await.is_ok());
156        assert_eq!(conn.increment(&key, 1).await.unwrap(), 1);
157    }
158
159    #[tokio::test]
160    async fn test_cache_unix_socket() {
161        let manager = MemcacheConnectionManager::new("unix:/tmp/memcached.sock").unwrap();
162        let pool = bb8::Pool::builder().build(manager).await.unwrap();
163
164        let pool = pool.clone();
165        let mut conn = pool.get().await.unwrap();
166
167        assert!(conn.flush().await.is_ok());
168    }
169
170    #[tokio::test]
171    async fn test_connection_timeouts() {
172        let manager = MemcacheConnectionManager::new("tcp://localhost:11211")
173            .unwrap()
174            .with_timeouts(Duration::from_millis(2), Duration::from_millis(5));
175        let pool = bb8::Pool::builder().build(manager).await.unwrap();
176
177        {
178            let mut conn = pool.get().await.unwrap();
179
180            assert!(conn.flush().await.is_ok());
181
182            let key = "hello";
183            assert_eq!(
184                conn.get(&key).await.unwrap_err().kind(),
185                ErrorKind::NotFound
186            );
187
188            assert_eq!(pool.state().connections, 1);
189            assert_eq!(pool.state().statistics.connections_closed_broken, 0);
190        }
191
192        {
193            let mut conn = pool.get().await.unwrap();
194
195            assert!(conn.flush().await.is_ok());
196
197            let key = "hello";
198            // test timeout
199            let v = vec![1; 204_800_000];
200
201            assert_eq!(
202                conn.set(&key, &v, 0).await.unwrap_err().kind(),
203                ErrorKind::TimedOut
204            );
205        }
206
207        // connection should be tainted
208        assert_eq!(pool.state().connections, 0);
209        // has_broken happens first because its called when we try to put a connection back into the pool.
210        // is_valid is called when we try to get a connection from the pool.
211        assert_eq!(pool.state().statistics.connections_closed_broken, 1);
212    }
213}