1#![allow(clippy::needless_doctest_main)]
30
31pub use bb8;
32pub use memcache_async;
33
34mod client;
35
36use async_trait::async_trait;
37use client::{Connectable, Connection};
38use std::{
39 io::{self, ErrorKind},
40 time::Duration,
41};
42use url::Url;
43
44#[derive(Clone, Debug)]
46pub struct MemcacheConnectionManager {
47 uri: Url,
48 memcache_read_timeout: Option<Duration>,
50 memcache_write_timeout: Option<Duration>,
52}
53
54impl MemcacheConnectionManager {
55 pub fn new<U: Connectable>(u: U) -> Result<MemcacheConnectionManager, io::Error> {
56 Ok(MemcacheConnectionManager {
57 uri: u.get_uri(),
58 memcache_read_timeout: None,
59 memcache_write_timeout: None,
60 })
61 }
62
63 pub fn with_timeouts(mut self, read_timeout: Duration, write_timeout: Duration) -> Self {
64 self.memcache_read_timeout = Some(read_timeout);
65 self.memcache_write_timeout = Some(write_timeout);
66 self
67 }
68}
69
70#[async_trait]
71impl bb8::ManageConnection for MemcacheConnectionManager {
72 type Connection = Connection;
73 type Error = io::Error;
74
75 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
76 Connection::connect(
77 &self.uri,
78 self.memcache_read_timeout,
79 self.memcache_write_timeout,
80 )
81 .await
82 }
83
84 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
85 if conn.is_tainted() {
86 return Err(ErrorKind::ConnectionAborted.into());
87 }
88 conn.version().await.map(|_| ())
89 }
90
91 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
92 conn.is_tainted()
93 }
94}
95
96#[cfg(test)]
97mod test {
98 use super::*;
99 use std::{io::ErrorKind, time::Duration};
100
101 #[tokio::test]
102 async fn test_cache_get() {
103 let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
104 let pool = bb8::Pool::builder().build(manager).await.unwrap();
105
106 let pool = pool.clone();
107 let mut conn = pool.get().await.unwrap();
108
109 assert!(conn.flush().await.is_ok());
110
111 let (key, val) = ("hello", "world");
112 assert_eq!(
113 conn.get(&key).await.unwrap_err().kind(),
114 ErrorKind::NotFound
115 );
116 assert!(conn.set(&key, val.as_bytes(), 0).await.is_ok());
117 assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
118 }
119
120 #[tokio::test]
121 async fn test_cache_add_delete() {
122 let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
123 let pool = bb8::Pool::builder().build(manager).await.unwrap();
124
125 let pool = pool.clone();
126 let mut conn = pool.get().await.unwrap();
127
128 assert!(conn.flush().await.is_ok());
129
130 let (key, val) = ("hello_add_delete", "world");
131 assert!(conn.add(&key, val.as_bytes(), 0).await.is_ok());
132 assert_eq!(conn.get(&key).await.unwrap(), val.as_bytes());
133
134 assert!(conn.add(&key, val.as_bytes(), 0).await.is_err());
136
137 assert!(conn.delete(&key).await.is_ok());
138 assert_eq!(
139 conn.get(&key).await.unwrap_err().kind(),
140 ErrorKind::NotFound
141 );
142 }
143
144 #[tokio::test]
145 async fn test_increment() {
146 let manager = MemcacheConnectionManager::new("tcp://localhost:11211").unwrap();
147 let pool = bb8::Pool::builder().build(manager).await.unwrap();
148
149 let pool = pool.clone();
150 let mut conn = pool.get().await.unwrap();
151
152 assert!(conn.flush().await.is_ok());
153
154 let (key, val) = ("increment", "0");
155 assert!(conn.set(&key, val.as_bytes(), 0).await.is_ok());
156 assert_eq!(conn.increment(&key, 1).await.unwrap(), 1);
157 }
158
159 #[tokio::test]
160 async fn test_cache_unix_socket() {
161 let manager = MemcacheConnectionManager::new("unix:/tmp/memcached.sock").unwrap();
162 let pool = bb8::Pool::builder().build(manager).await.unwrap();
163
164 let pool = pool.clone();
165 let mut conn = pool.get().await.unwrap();
166
167 assert!(conn.flush().await.is_ok());
168 }
169
170 #[tokio::test]
171 async fn test_connection_timeouts() {
172 let manager = MemcacheConnectionManager::new("tcp://localhost:11211")
173 .unwrap()
174 .with_timeouts(Duration::from_millis(2), Duration::from_millis(5));
175 let pool = bb8::Pool::builder().build(manager).await.unwrap();
176
177 {
178 let mut conn = pool.get().await.unwrap();
179
180 assert!(conn.flush().await.is_ok());
181
182 let key = "hello";
183 assert_eq!(
184 conn.get(&key).await.unwrap_err().kind(),
185 ErrorKind::NotFound
186 );
187
188 assert_eq!(pool.state().connections, 1);
189 assert_eq!(pool.state().statistics.connections_closed_broken, 0);
190 }
191
192 {
193 let mut conn = pool.get().await.unwrap();
194
195 assert!(conn.flush().await.is_ok());
196
197 let key = "hello";
198 let v = vec![1; 204_800_000];
200
201 assert_eq!(
202 conn.set(&key, &v, 0).await.unwrap_err().kind(),
203 ErrorKind::TimedOut
204 );
205 }
206
207 assert_eq!(pool.state().connections, 0);
209 assert_eq!(pool.state().statistics.connections_closed_broken, 1);
212 }
213}