1use std::cell::RefCell;
13use std::collections::{BTreeMap, HashMap};
14use std::io;
15use std::net::TcpStream;
16use std::ops::Deref;
17use std::path::Path;
18use std::rc::Rc;
19
20use conhash::{ConsistentHash, Node};
21
22use bufstream::BufStream;
23
24#[cfg(unix)]
25use unix_socket::UnixStream;
26
27use crate::proto::{self, AuthResponse, MemCachedResult};
28use crate::proto::{CasOperation, MultiOperation, NoReplyOperation, Operation, Proto};
29
30struct Sasl<'a> {
31 username: &'a str,
32 password: &'a str,
33}
34
35struct Server {
36 pub proto: Box<dyn Proto + Send>,
37 addr: String,
38}
39
40impl Server {
41 fn connect(
42 addr: String,
43 protocol: proto::ProtoType,
44 o_sasl: &Option<Sasl>,
45 ) -> io::Result<Server> {
46 let proto = {
47 let mut split = addr.split("://");
48 match protocol {
49 proto::ProtoType::Binary => match (split.next(), split.next()) {
50 (Some("tcp"), Some(addr)) => {
51 let stream = TcpStream::connect(addr)?;
52 stream.set_nodelay(true)?;
53 let mut proto = Box::new(proto::BinaryProto::new(BufStream::new(stream)))
54 as Box<dyn Proto + Send>;
55 if let Some(sasl) = o_sasl {
56 let auth_str = format!("\x00{}\x00{}", sasl.username, sasl.password);
57 match proto.auth_start("PLAIN", auth_str.as_bytes()) {
58 Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
59 Ok(AuthResponse::Succeeded) => (),
60 Ok(resp) => {
61 let msg =
62 format!("SASL auth failed with AuthResponse: {:?}", resp);
63 return Err(io::Error::new(io::ErrorKind::Other, msg));
64 }
65 }
66 }
67 proto
68 }
69 #[cfg(unix)]
70 (Some("unix"), Some(addr)) => {
71 let stream = UnixStream::connect(&Path::new(addr))?;
72 Box::new(proto::BinaryProto::new(BufStream::new(stream)))
73 as Box<dyn Proto + Send>
74 }
75 (Some(prot), _) => {
76 panic!("Unsupported protocol: {}", prot);
77 }
78 _ => panic!("Malformed address"),
79 },
80 }
81 };
82 Ok(Server { proto, addr })
83 }
84}
85
86#[derive(Clone)]
87struct ServerRef(Rc<RefCell<Server>>);
88
89impl Node for ServerRef {
90 fn name(&self) -> String {
91 self.0.borrow().addr.clone()
92 }
93}
94
95impl Deref for ServerRef {
96 type Target = Rc<RefCell<Server>>;
97
98 fn deref(&self) -> &Rc<RefCell<Server>> {
99 &self.0
100 }
101}
102
103pub struct Client {
128 servers: ConsistentHash<ServerRef>,
129}
130
131impl Client {
132 pub fn connect<S: ToString>(svrs: &[(S, usize)], p: proto::ProtoType) -> io::Result<Client> {
139 Client::conn(svrs, p, None)
140 }
141
142 pub fn connect_sasl<S: ToString>(
149 svrs: &[(S, usize)],
150 p: proto::ProtoType,
151 username: &str,
152 password: &str,
153 ) -> io::Result<Client> {
154 Client::conn(svrs, p, Some(Sasl { username, password }))
155 }
156
157 fn conn<S: ToString>(
158 svrs: &[(S, usize)],
159 p: proto::ProtoType,
160 sasl: Option<Sasl>,
161 ) -> io::Result<Client> {
162 assert!(!svrs.is_empty(), "Server list should not be empty");
163
164 let mut servers = ConsistentHash::new();
165 for (addr, weight) in svrs.iter() {
166 let svr = Server::connect(addr.to_string(), p, &sasl)?;
167 servers.add(&ServerRef(Rc::new(RefCell::new(svr))), *weight);
168 }
169
170 Ok(Client { servers })
171 }
172
173 fn find_server_by_key<'a>(&'a mut self, key: &[u8]) -> &'a mut ServerRef {
174 self.servers.get_mut(key).expect("No valid server found")
175 }
176}
177
178impl Operation for Client {
179 fn set(
180 &mut self,
181 key: &[u8],
182 value: &[u8],
183 flags: u32,
184 expiration: u32,
185 ) -> MemCachedResult<()> {
186 let server = self.find_server_by_key(key);
187 server.borrow_mut().proto.set(key, value, flags, expiration)
188 }
189
190 fn add(
191 &mut self,
192 key: &[u8],
193 value: &[u8],
194 flags: u32,
195 expiration: u32,
196 ) -> MemCachedResult<()> {
197 let server = self.find_server_by_key(key);
198 server.borrow_mut().proto.add(key, value, flags, expiration)
199 }
200
201 fn delete(&mut self, key: &[u8]) -> MemCachedResult<()> {
202 let server = self.find_server_by_key(key);
203 server.borrow_mut().proto.delete(key)
204 }
205
206 fn replace(
207 &mut self,
208 key: &[u8],
209 value: &[u8],
210 flags: u32,
211 expiration: u32,
212 ) -> MemCachedResult<()> {
213 let server = self.find_server_by_key(key);
214 server
215 .borrow_mut()
216 .proto
217 .replace(key, value, flags, expiration)
218 }
219
220 fn get(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32)> {
221 let server = self.find_server_by_key(key);
222 server.borrow_mut().proto.get(key)
223 }
224
225 fn getk(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32)> {
226 let server = self.find_server_by_key(key);
227 server.borrow_mut().proto.getk(key)
228 }
229
230 fn increment(
231 &mut self,
232 key: &[u8],
233 amount: u64,
234 initial: u64,
235 expiration: u32,
236 ) -> MemCachedResult<u64> {
237 let server = self.find_server_by_key(key);
238 server
239 .borrow_mut()
240 .proto
241 .increment(key, amount, initial, expiration)
242 }
243
244 fn decrement(
245 &mut self,
246 key: &[u8],
247 amount: u64,
248 initial: u64,
249 expiration: u32,
250 ) -> MemCachedResult<u64> {
251 let server = self.find_server_by_key(key);
252 server
253 .borrow_mut()
254 .proto
255 .increment(key, amount, initial, expiration)
256 }
257
258 fn append(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
259 let server = self.find_server_by_key(key);
260 server.borrow_mut().proto.append(key, value)
261 }
262
263 fn prepend(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
264 let server = self.find_server_by_key(key);
265 server.borrow_mut().proto.prepend(key, value)
266 }
267
268 fn touch(&mut self, key: &[u8], expiration: u32) -> MemCachedResult<()> {
269 let server = self.find_server_by_key(key);
270 server.borrow_mut().proto.touch(key, expiration)
271 }
272}
273
274impl NoReplyOperation for Client {
275 fn set_noreply(
276 &mut self,
277 key: &[u8],
278 value: &[u8],
279 flags: u32,
280 expiration: u32,
281 ) -> MemCachedResult<()> {
282 let server = self.find_server_by_key(key);
283 server
284 .borrow_mut()
285 .proto
286 .set_noreply(key, value, flags, expiration)
287 }
288
289 fn add_noreply(
290 &mut self,
291 key: &[u8],
292 value: &[u8],
293 flags: u32,
294 expiration: u32,
295 ) -> MemCachedResult<()> {
296 let server = self.find_server_by_key(key);
297 server
298 .borrow_mut()
299 .proto
300 .add_noreply(key, value, flags, expiration)
301 }
302
303 fn delete_noreply(&mut self, key: &[u8]) -> MemCachedResult<()> {
304 let server = self.find_server_by_key(key);
305 server.borrow_mut().proto.delete_noreply(key)
306 }
307
308 fn replace_noreply(
309 &mut self,
310 key: &[u8],
311 value: &[u8],
312 flags: u32,
313 expiration: u32,
314 ) -> MemCachedResult<()> {
315 let server = self.find_server_by_key(key);
316 server
317 .borrow_mut()
318 .proto
319 .replace_noreply(key, value, flags, expiration)
320 }
321
322 fn increment_noreply(
323 &mut self,
324 key: &[u8],
325 amount: u64,
326 initial: u64,
327 expiration: u32,
328 ) -> MemCachedResult<()> {
329 let server = self.find_server_by_key(key);
330 server
331 .borrow_mut()
332 .proto
333 .increment_noreply(key, amount, initial, expiration)
334 }
335
336 fn decrement_noreply(
337 &mut self,
338 key: &[u8],
339 amount: u64,
340 initial: u64,
341 expiration: u32,
342 ) -> MemCachedResult<()> {
343 let server = self.find_server_by_key(key);
344 server
345 .borrow_mut()
346 .proto
347 .decrement_noreply(key, amount, initial, expiration)
348 }
349
350 fn append_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
351 let server = self.find_server_by_key(key);
352 server.borrow_mut().proto.append_noreply(key, value)
353 }
354
355 fn prepend_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
356 let server = self.find_server_by_key(key);
357 server.borrow_mut().proto.prepend_noreply(key, value)
358 }
359}
360
361impl CasOperation for Client {
362 fn set_cas(
363 &mut self,
364 key: &[u8],
365 value: &[u8],
366 flags: u32,
367 expiration: u32,
368 cas: u64,
369 ) -> MemCachedResult<u64> {
370 let server = self.find_server_by_key(key);
371 server
372 .borrow_mut()
373 .proto
374 .set_cas(key, value, flags, expiration, cas)
375 }
376
377 fn add_cas(
378 &mut self,
379 key: &[u8],
380 value: &[u8],
381 flags: u32,
382 expiration: u32,
383 ) -> MemCachedResult<u64> {
384 let server = self.find_server_by_key(key);
385 server
386 .borrow_mut()
387 .proto
388 .add_cas(key, value, flags, expiration)
389 }
390
391 fn replace_cas(
392 &mut self,
393 key: &[u8],
394 value: &[u8],
395 flags: u32,
396 expiration: u32,
397 cas: u64,
398 ) -> MemCachedResult<u64> {
399 let server = self.find_server_by_key(key);
400 server
401 .borrow_mut()
402 .proto
403 .replace_cas(key, value, flags, expiration, cas)
404 }
405
406 fn get_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32, u64)> {
407 let server = self.find_server_by_key(key);
408 server.borrow_mut().proto.get_cas(key)
409 }
410
411 fn getk_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32, u64)> {
412 let server = self.find_server_by_key(key);
413 server.borrow_mut().proto.getk_cas(key)
414 }
415
416 fn increment_cas(
417 &mut self,
418 key: &[u8],
419 amount: u64,
420 initial: u64,
421 expiration: u32,
422 cas: u64,
423 ) -> MemCachedResult<(u64, u64)> {
424 let server = self.find_server_by_key(key);
425 server
426 .borrow_mut()
427 .proto
428 .increment_cas(key, amount, initial, expiration, cas)
429 }
430
431 fn decrement_cas(
432 &mut self,
433 key: &[u8],
434 amount: u64,
435 initial: u64,
436 expiration: u32,
437 cas: u64,
438 ) -> MemCachedResult<(u64, u64)> {
439 let server = self.find_server_by_key(key);
440 server
441 .borrow_mut()
442 .proto
443 .decrement_cas(key, amount, initial, expiration, cas)
444 }
445
446 fn append_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
447 let server = self.find_server_by_key(key);
448 server.borrow_mut().proto.append_cas(key, value, cas)
449 }
450
451 fn prepend_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
452 let server = self.find_server_by_key(key);
453 server.borrow_mut().proto.prepend_cas(key, value, cas)
454 }
455
456 fn touch_cas(&mut self, key: &[u8], expiration: u32, cas: u64) -> MemCachedResult<u64> {
457 let server = self.find_server_by_key(key);
458 server.borrow_mut().proto.touch_cas(key, expiration, cas)
459 }
460}
461
462impl MultiOperation for Client {
463 fn set_multi(&mut self, kv: BTreeMap<&[u8], (&[u8], u32, u32)>) -> MemCachedResult<()> {
464 assert_eq!(self.servers.len(), 1);
465 let server = self.find_server_by_key(kv.keys().next().unwrap());
466 server.borrow_mut().proto.set_multi(kv)
467 }
468 fn delete_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<()> {
469 assert_eq!(self.servers.len(), 1);
470 let server = self.find_server_by_key(keys[0]);
471 server.borrow_mut().proto.delete_multi(keys)
472 }
473 fn increment_multi<'a>(
474 &mut self,
475 kv: HashMap<&'a [u8], (u64, u64, u32)>,
476 ) -> MemCachedResult<HashMap<&'a [u8], u64>> {
477 assert_eq!(self.servers.len(), 1);
478 let server = self.find_server_by_key(kv.keys().next().unwrap());
479 server.borrow_mut().proto.increment_multi(kv)
480 }
481 fn get_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<HashMap<Vec<u8>, (Vec<u8>, u32)>> {
482 assert_eq!(self.servers.len(), 1);
483 let server = self.find_server_by_key(keys[0]);
484 server.borrow_mut().proto.get_multi(keys)
485 }
486}
487
488#[cfg(all(test, feature = "nightly"))]
489mod test {
490 use super::Client;
491 use crate::proto::{NoReplyOperation, Operation, ProtoType};
492 use rand::random;
493 use test::Bencher;
494
495 fn generate_data(len: usize) -> Vec<u8> {
496 (0..len).map(|_| random()).collect()
497 }
498
499 #[bench]
500 fn bench_set_64(b: &mut Bencher) {
501 let key = b"test:test_bench";
502 let val = generate_data(64);
503
504 let mut client =
505 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
506
507 b.iter(|| client.set(key, &val[..], 0, 2));
508 }
509
510 #[bench]
511 fn bench_set_noreply_64(b: &mut Bencher) {
512 let key = b"test:test_bench";
513 let val = generate_data(64);
514
515 let mut client =
516 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
517
518 b.iter(|| client.set_noreply(key, &val[..], 0, 2));
519 }
520
521 #[bench]
522 fn bench_set_512(b: &mut Bencher) {
523 let key = b"test:test_bench";
524 let val = generate_data(512);
525
526 let mut client =
527 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
528
529 b.iter(|| client.set(key, &val[..], 0, 2));
530 }
531
532 #[bench]
533 fn bench_set_noreply_512(b: &mut Bencher) {
534 let key = b"test:test_bench";
535 let val = generate_data(512);
536
537 let mut client =
538 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
539
540 b.iter(|| client.set_noreply(key, &val[..], 0, 2));
541 }
542
543 #[bench]
544 fn bench_set_1024(b: &mut Bencher) {
545 let key = b"test:test_bench";
546 let val = generate_data(1024);
547
548 let mut client =
549 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
550
551 b.iter(|| client.set(key, &val[..], 0, 2));
552 }
553
554 #[bench]
555 fn bench_set_noreply_1024(b: &mut Bencher) {
556 let key = b"test:test_bench";
557 let val = generate_data(1024);
558
559 let mut client =
560 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
561
562 b.iter(|| client.set_noreply(key, &val[..], 0, 2));
563 }
564
565 #[bench]
566 fn bench_set_4096(b: &mut Bencher) {
567 let key = b"test:test_bench";
568 let val = generate_data(4096);
569
570 let mut client =
571 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
572
573 b.iter(|| client.set(key, &val[..], 0, 2));
574 }
575
576 #[bench]
577 fn bench_set_noreply_4096(b: &mut Bencher) {
578 let key = b"test:test_bench";
579 let val = generate_data(4096);
580
581 let mut client =
582 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
583
584 b.iter(|| client.set_noreply(key, &val[..], 0, 2));
585 }
586
587 #[bench]
588 fn bench_set_16384(b: &mut Bencher) {
589 let key = b"test:test_bench";
590 let val = generate_data(16384);
591
592 let mut client =
593 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
594
595 b.iter(|| client.set(key, &val[..], 0, 2));
596 }
597
598 #[bench]
599 fn bench_set_noreply_16384(b: &mut Bencher) {
600 let key = b"test:test_bench";
601 let val = generate_data(16384);
602
603 let mut client =
604 Client::connect(&[("tcp://127.0.0.1:11211", 1)], ProtoType::Binary).unwrap();
605
606 b.iter(|| client.set_noreply(key, &val[..], 0, 2));
607 }
608}