bb8_memcached/
lib.rs

1//! Memcached support for the `bb8` connection pool.
2//!
3//! # Example
4//! ```
5//! use futures::future::join_all;
6//! use bb8_memcached::{bb8, MemcacheConnectionManager};
7//!
8//! #[tokio::main]
9//! async fn main() {
10//!     let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
11//!     let pool = bb8::Pool::builder().build(manager).await.unwrap();
12//!
13//!     let mut handles = vec![];
14//!
15//!     for _i in 0..10 {
16//!         let pool = pool.clone();
17//!
18//!         handles.push(tokio::spawn(async move {
19//!             let mut conn = pool.get().await.unwrap();
20//!
21//!             let version = conn.version().await.unwrap();
22//!         }));
23//!     }
24//!
25//!     join_all(handles).await;
26//! }
27//! ```
28
29#![allow(clippy::needless_doctest_main)]
30
31pub use bb8;
32pub use memcache_async;
33
34mod client;
35
36use async_trait::async_trait;
37use client::{Connectable, Connection};
38use std::io;
39use url::Url;
40
41/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
42#[derive(Clone, Debug)]
43pub struct MemcacheConnectionManager {
44    uri: Url,
45}
46
47impl MemcacheConnectionManager {
48    pub fn new<U: Connectable>(u: U) -> Result<MemcacheConnectionManager, io::Error> {
49        Ok(MemcacheConnectionManager { uri: u.get_uri() })
50    }
51}
52
53#[async_trait]
54impl bb8::ManageConnection for MemcacheConnectionManager {
55    type Connection = Connection;
56    type Error = io::Error;
57
58    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
59        Connection::connect(&self.uri).await
60    }
61
62    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
63        conn.version().await.map(|_| ())
64    }
65
66    fn has_broken(&self, _: &mut Self::Connection) -> bool {
67        false
68    }
69}
70
71#[cfg(test)]
72mod test {
73    use super::*;
74    use bb8;
75    use std::io::ErrorKind;
76
77    #[tokio::test]
78    async fn test_cache_get() {
79        let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
80        let pool = bb8::Pool::builder().build(manager).await.unwrap();
81
82        let pool = pool.clone();
83        let mut conn = pool.get().await.unwrap();
84
85        assert!(conn.flush().await.is_ok());
86
87        let (key, val) = ("hello", "world");
88        assert_eq!(
89            conn.get(&key).await.unwrap_err().kind(),
90            ErrorKind::NotFound
91        );
92        assert!(conn.set(&key, val.as_bytes(), 0).await.is_ok());
93        assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
94    }
95
96    #[tokio::test]
97    async fn test_cache_add_delete() {
98        let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
99        let pool = bb8::Pool::builder().build(manager).await.unwrap();
100
101        let pool = pool.clone();
102        let mut conn = pool.get().await.unwrap();
103
104        assert!(conn.flush().await.is_ok());
105
106        let (key, val) = ("hello_add_delete", "world");
107        assert!(conn.add(&key, val.as_bytes(), 0).await.is_ok());
108        assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
109
110        // add the same key will fail
111        assert!(conn.add(&key, val.as_bytes(), 0).await.is_err());
112
113        assert!(conn.delete(&key).await.is_ok());
114        assert_eq!(
115            conn.get(&key).await.unwrap_err().kind(),
116            ErrorKind::NotFound
117        );
118    }
119
120    #[tokio::test]
121    async fn test_cache_unix_socket() {
122        let manager = MemcacheConnectionManager::new("unix:/tmp/memcached.sock").unwrap();
123        let pool = bb8::Pool::builder().build(manager).await.unwrap();
124
125        let pool = pool.clone();
126        let mut conn = pool.get().await.unwrap();
127
128        assert!(conn.flush().await.is_ok());
129    }
130}