1use std::collections::{BTreeMap, HashMap};
11use std::convert::From;
12use std::error;
13use std::fmt;
14use std::io::{BufRead, BufReader, Cursor, Write};
15use std::str;
16use std::string::String;
17
18use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
19use log::debug;
20
21use crate::proto::binarydef::{
22 Command, DataType, RequestHeader, RequestPacket, RequestPacketRef, ResponsePacket,
23};
24use crate::proto::{self, AuthResponse, MemCachedResult};
25use crate::proto::{
26 AuthOperation, CasOperation, MultiOperation, NoReplyOperation, Operation, ServerOperation,
27};
28
29use semver::Version;
30
31use rand::random;
32
33pub use proto::binarydef::Status;
34
35#[derive(Debug, Clone)]
36pub struct Error {
37 status: Status,
38 desc: &'static str,
39 detail: Option<String>,
40}
41
42impl Error {
43 fn from_status(status: Status, detail: Option<String>) -> Error {
44 Error {
45 status,
46 desc: status.desc(),
47 detail,
48 }
49 }
50
51 pub fn detail(&self) -> Option<String> {
53 self.detail.clone()
54 }
55
56 pub fn status(&self) -> Status {
58 self.status
59 }
60}
61
62impl fmt::Display for Error {
63 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64 write!(f, "{}", self.desc)?;
65 match self.detail {
66 Some(ref s) => write!(f, " ({})", s),
67 None => Ok(()),
68 }
69 }
70}
71
72impl error::Error for Error {}
73
74pub struct BinaryProto<T: BufRead + Write + Send> {
75 stream: T,
76}
77
78impl<T: BufRead + Write + Send> BinaryProto<T> {
85 pub fn new(stream: T) -> BinaryProto<T> {
86 BinaryProto { stream }
87 }
88
89 fn send_noop(&mut self) -> MemCachedResult<u32> {
90 let opaque = random::<u32>();
91 debug!("Sending NOOP");
92 let req_packet = RequestPacket::new(
93 Command::Noop,
94 DataType::RawBytes,
95 0,
96 opaque,
97 0,
98 Vec::new(),
99 Vec::new(),
100 Vec::new(),
101 );
102
103 req_packet.write_to(&mut self.stream)?;
104 self.stream.flush()?;
105
106 Ok(opaque)
107 }
108}
109
110impl<T: BufRead + Write + Send> Operation for BinaryProto<T> {
111 fn set(
112 &mut self,
113 key: &[u8],
114 value: &[u8],
115 flags: u32,
116 expiration: u32,
117 ) -> MemCachedResult<()> {
118 let opaque = random::<u32>();
119 debug!(
120 "Set key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
121 key,
122 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
123 value,
124 flags,
125 expiration
126 );
127 let mut extra = [0u8; 8];
128 {
129 let mut extra_buf = Cursor::new(&mut extra[..]);
130 extra_buf.write_u32::<BigEndian>(flags)?;
131 extra_buf.write_u32::<BigEndian>(expiration)?;
132 }
133
134 let req_header = RequestHeader::from_payload(
135 Command::Set,
136 DataType::RawBytes,
137 0,
138 opaque,
139 0,
140 key,
141 &extra,
142 value,
143 );
144 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
145
146 req_packet.write_to(&mut self.stream)?;
147 self.stream.flush()?;
148
149 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
150 while resp.header.opaque != opaque {
151 debug!(
152 "Expecting opaque: {} but got {}, trying again ...",
153 opaque, resp.header.opaque
154 );
155 resp = ResponsePacket::read_from(&mut self.stream)?;
156 }
157
158 match resp.header.status {
159 Status::NoError => Ok(()),
160 _ => Err(From::from(Error::from_status(resp.header.status, None))),
161 }
162 }
163
164 fn add(
165 &mut self,
166 key: &[u8],
167 value: &[u8],
168 flags: u32,
169 expiration: u32,
170 ) -> MemCachedResult<()> {
171 let opaque = random::<u32>();
172 debug!(
173 "Add key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
174 key,
175 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
176 value,
177 flags,
178 expiration
179 );
180 let mut extra = [0u8; 8];
181 {
182 let mut extra_buf = Cursor::new(&mut extra[..]);
183 extra_buf.write_u32::<BigEndian>(flags)?;
184 extra_buf.write_u32::<BigEndian>(expiration)?;
185 }
186
187 let req_header = RequestHeader::from_payload(
188 Command::Add,
189 DataType::RawBytes,
190 0,
191 opaque,
192 0,
193 key,
194 &extra,
195 value,
196 );
197 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
198
199 req_packet.write_to(&mut self.stream)?;
200 self.stream.flush()?;
201
202 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
203 while resp.header.opaque != opaque {
204 debug!(
205 "Expecting opaque: {} but got {}, trying again ...",
206 opaque, resp.header.opaque
207 );
208 resp = ResponsePacket::read_from(&mut self.stream)?;
209 }
210
211 match resp.header.status {
212 Status::NoError => Ok(()),
213 _ => Err(From::from(Error::from_status(resp.header.status, None))),
214 }
215 }
216
217 fn delete(&mut self, key: &[u8]) -> MemCachedResult<()> {
218 let opaque = random::<u32>();
219 debug!(
220 "Delete key: {:?} {:?}",
221 key,
222 str::from_utf8(key).unwrap_or("<not-utf8-key>")
223 );
224 let req_header = RequestHeader::from_payload(
225 Command::Delete,
226 DataType::RawBytes,
227 0,
228 opaque,
229 0,
230 key,
231 &[],
232 &[],
233 );
234 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
235
236 req_packet.write_to(&mut self.stream)?;
237 self.stream.flush()?;
238
239 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
240 while resp.header.opaque != opaque {
241 debug!(
242 "Expecting opaque: {} but got {}, trying again ...",
243 opaque, resp.header.opaque
244 );
245 resp = ResponsePacket::read_from(&mut self.stream)?;
246 }
247
248 match resp.header.status {
249 Status::NoError => Ok(()),
250 _ => Err(From::from(Error::from_status(resp.header.status, None))),
251 }
252 }
253
254 fn replace(
255 &mut self,
256 key: &[u8],
257 value: &[u8],
258 flags: u32,
259 expiration: u32,
260 ) -> MemCachedResult<()> {
261 let opaque = random::<u32>();
262 debug!(
263 "Replace key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
264 key,
265 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
266 value,
267 flags,
268 expiration
269 );
270 let mut extra = [0u8; 8];
271 {
272 let mut extra_buf = Cursor::new(&mut extra[..]);
273 extra_buf.write_u32::<BigEndian>(flags)?;
274 extra_buf.write_u32::<BigEndian>(expiration)?;
275 }
276
277 let req_header = RequestHeader::from_payload(
278 Command::Replace,
279 DataType::RawBytes,
280 0,
281 opaque,
282 0,
283 key,
284 &extra,
285 value,
286 );
287 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
288
289 req_packet.write_to(&mut self.stream)?;
290 self.stream.flush()?;
291
292 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
293 while resp.header.opaque != opaque {
294 debug!(
295 "Expecting opaque: {} but got {}, trying again ...",
296 opaque, resp.header.opaque
297 );
298 resp = ResponsePacket::read_from(&mut self.stream)?;
299 }
300
301 match resp.header.status {
302 Status::NoError => Ok(()),
303 _ => Err(From::from(Error::from_status(resp.header.status, None))),
304 }
305 }
306
307 fn get(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32)> {
308 let opaque = random::<u32>();
309 debug!(
310 "Get key: {:?} {:?}",
311 key,
312 str::from_utf8(key).unwrap_or("<not-utf8-key>")
313 );
314 let req_header = RequestHeader::from_payload(
315 Command::Get,
316 DataType::RawBytes,
317 0,
318 opaque,
319 0,
320 key,
321 &[],
322 &[],
323 );
324 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
325
326 req_packet.write_to(&mut self.stream)?;
327 self.stream.flush()?;
328
329 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
330 while resp.header.opaque != opaque {
331 debug!(
332 "Expecting opaque: {} but got {}, trying again ...",
333 opaque, resp.header.opaque
334 );
335 resp = ResponsePacket::read_from(&mut self.stream)?;
336 }
337
338 match resp.header.status {
339 Status::NoError => {
340 let mut extrabufr = BufReader::new(&resp.extra[..]);
341 let flags = extrabufr.read_u32::<BigEndian>()?;
342
343 Ok((resp.value, flags))
344 }
345 _ => Err(From::from(Error::from_status(resp.header.status, None))),
346 }
347 }
348
349 fn getk(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32)> {
350 let opaque = random::<u32>();
351 debug!(
352 "GetK key: {:?} {:?}",
353 key,
354 str::from_utf8(key).unwrap_or("<not-utf8-key>")
355 );
356 let req_header = RequestHeader::from_payload(
357 Command::GetKey,
358 DataType::RawBytes,
359 0,
360 opaque,
361 0,
362 key,
363 &[],
364 &[],
365 );
366 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
367
368 req_packet.write_to(&mut self.stream)?;
369 self.stream.flush()?;
370
371 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
372 while resp.header.opaque != opaque {
373 debug!(
374 "Expecting opaque: {} but got {}, trying again ...",
375 opaque, resp.header.opaque
376 );
377 resp = ResponsePacket::read_from(&mut self.stream)?;
378 }
379
380 match resp.header.status {
381 Status::NoError => {
382 let mut extrabufr = BufReader::new(&resp.extra[..]);
383 let flags = extrabufr.read_u32::<BigEndian>()?;
384
385 Ok((resp.key, resp.value, flags))
386 }
387 _ => Err(From::from(Error::from_status(resp.header.status, None))),
388 }
389 }
390
391 fn increment(
392 &mut self,
393 key: &[u8],
394 amount: u64,
395 initial: u64,
396 expiration: u32,
397 ) -> MemCachedResult<u64> {
398 let opaque = random::<u32>();
399 debug!(
400 "Increment key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
401 key,
402 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
403 amount,
404 initial,
405 expiration
406 );
407 let mut extra = [0u8; 20];
408 {
409 let mut extra_buf = Cursor::new(&mut extra[..]);
410 extra_buf.write_u64::<BigEndian>(amount)?;
411 extra_buf.write_u64::<BigEndian>(initial)?;
412 extra_buf.write_u32::<BigEndian>(expiration)?;
413 }
414
415 let req_header = RequestHeader::from_payload(
416 Command::Increment,
417 DataType::RawBytes,
418 0,
419 opaque,
420 0,
421 key,
422 &extra,
423 &[],
424 );
425 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
426
427 req_packet.write_to(&mut self.stream)?;
428 self.stream.flush()?;
429
430 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
431 while resp.header.opaque != opaque {
432 debug!(
433 "Expecting opaque: {} but got {}, trying again ...",
434 opaque, resp.header.opaque
435 );
436 resp = ResponsePacket::read_from(&mut self.stream)?;
437 }
438
439 match resp.header.status {
440 Status::NoError => {
441 let mut bufr = BufReader::new(&resp.value[..]);
442 Ok(bufr.read_u64::<BigEndian>()?)
443 }
444 _ => Err(From::from(Error::from_status(resp.header.status, None))),
445 }
446 }
447
448 fn decrement(
449 &mut self,
450 key: &[u8],
451 amount: u64,
452 initial: u64,
453 expiration: u32,
454 ) -> MemCachedResult<u64> {
455 let opaque = random::<u32>();
456 debug!(
457 "Decrement key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
458 key,
459 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
460 amount,
461 initial,
462 expiration
463 );
464 let mut extra = [0u8; 20];
465 {
466 let mut extra_buf = Cursor::new(&mut extra[..]);
467 extra_buf.write_u64::<BigEndian>(amount)?;
468 extra_buf.write_u64::<BigEndian>(initial)?;
469 extra_buf.write_u32::<BigEndian>(expiration)?;
470 }
471
472 let req_header = RequestHeader::from_payload(
473 Command::Decrement,
474 DataType::RawBytes,
475 0,
476 opaque,
477 0,
478 key,
479 &extra,
480 &[],
481 );
482 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
483
484 req_packet.write_to(&mut self.stream)?;
485 self.stream.flush()?;
486
487 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
488 while resp.header.opaque != opaque {
489 debug!(
490 "Expecting opaque: {} but got {}, trying again ...",
491 opaque, resp.header.opaque
492 );
493 resp = ResponsePacket::read_from(&mut self.stream)?;
494 }
495
496 match resp.header.status {
497 Status::NoError => {
498 let mut bufr = BufReader::new(&resp.value[..]);
499 Ok(bufr.read_u64::<BigEndian>()?)
500 }
501 _ => Err(From::from(Error::from_status(resp.header.status, None))),
502 }
503 }
504
505 fn append(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
506 let opaque = random::<u32>();
507 debug!(
508 "Append key: {:?} {:?}, value: {:?}",
509 key,
510 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
511 value
512 );
513 let req_header = RequestHeader::from_payload(
514 Command::Append,
515 DataType::RawBytes,
516 0,
517 opaque,
518 0,
519 key,
520 &[],
521 value,
522 );
523 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
524
525 req_packet.write_to(&mut self.stream)?;
526 self.stream.flush()?;
527
528 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
529 while resp.header.opaque != opaque {
530 debug!(
531 "Expecting opaque: {} but got {}, trying again ...",
532 opaque, resp.header.opaque
533 );
534 resp = ResponsePacket::read_from(&mut self.stream)?;
535 }
536
537 match resp.header.status {
538 Status::NoError => Ok(()),
539 _ => Err(From::from(Error::from_status(resp.header.status, None))),
540 }
541 }
542
543 fn prepend(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
544 let opaque = random::<u32>();
545 debug!(
546 "Prepend key: {:?} {:?}, value: {:?}",
547 key,
548 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
549 value
550 );
551 let req_header = RequestHeader::from_payload(
552 Command::Prepend,
553 DataType::RawBytes,
554 0,
555 opaque,
556 0,
557 key,
558 &[],
559 value,
560 );
561 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
562
563 req_packet.write_to(&mut self.stream)?;
564 self.stream.flush()?;
565
566 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
567 while resp.header.opaque != opaque {
568 debug!(
569 "Expecting opaque: {} but got {}, trying again ...",
570 opaque, resp.header.opaque
571 );
572 resp = ResponsePacket::read_from(&mut self.stream)?;
573 }
574
575 match resp.header.status {
576 Status::NoError => Ok(()),
577 _ => Err(From::from(Error::from_status(resp.header.status, None))),
578 }
579 }
580
581 fn touch(&mut self, key: &[u8], expiration: u32) -> MemCachedResult<()> {
582 let opaque = random::<u32>();
583 debug!(
584 "Touch key: {:?} {:?}, expiration: {}",
585 key,
586 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
587 expiration
588 );
589 let mut extra = [0u8; 4];
590 {
591 let mut extra_buf = Cursor::new(&mut extra[..]);
592 extra_buf.write_u32::<BigEndian>(expiration)?;
593 }
594
595 let req_header = RequestHeader::from_payload(
596 Command::Touch,
597 DataType::RawBytes,
598 0,
599 opaque,
600 0,
601 key,
602 &extra,
603 &[],
604 );
605 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
606
607 req_packet.write_to(&mut self.stream)?;
608 self.stream.flush()?;
609
610 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
611 while resp.header.opaque != opaque {
612 debug!(
613 "Expecting opaque: {} but got {}, trying again ...",
614 opaque, resp.header.opaque
615 );
616 resp = ResponsePacket::read_from(&mut self.stream)?;
617 }
618
619 match resp.header.status {
620 Status::NoError => Ok(()),
621 _ => Err(From::from(Error::from_status(resp.header.status, None))),
622 }
623 }
624}
625
626impl<T: BufRead + Write + Send> ServerOperation for BinaryProto<T> {
627 fn quit(&mut self) -> MemCachedResult<()> {
628 let opaque = random::<u32>();
629 debug!("Quit");
630 let req_header = RequestHeader::from_payload(
631 Command::Quit,
632 DataType::RawBytes,
633 0,
634 opaque,
635 0,
636 &[],
637 &[],
638 &[],
639 );
640 let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
641
642 req_packet.write_to(&mut self.stream)?;
643 self.stream.flush()?;
644
645 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
646 while resp.header.opaque != opaque {
647 debug!(
648 "Expecting opaque: {} but got {}, trying again ...",
649 opaque, resp.header.opaque
650 );
651 resp = ResponsePacket::read_from(&mut self.stream)?;
652 }
653
654 match resp.header.status {
655 Status::NoError => Ok(()),
656 _ => Err(From::from(Error::from_status(resp.header.status, None))),
657 }
658 }
659
660 fn flush(&mut self, expiration: u32) -> MemCachedResult<()> {
661 let opaque = random::<u32>();
662 debug!("Expiration flush: {}", expiration);
663 let mut extra = [0u8; 4];
664 {
665 let mut extra_buf = Cursor::new(&mut extra[..]);
666 extra_buf.write_u32::<BigEndian>(expiration)?;
667 }
668
669 let req_header = RequestHeader::from_payload(
670 Command::Flush,
671 DataType::RawBytes,
672 0,
673 opaque,
674 0,
675 &[],
676 &extra,
677 &[],
678 );
679 let req_packet = RequestPacketRef::new(&req_header, &extra, &[], &[]);
680
681 req_packet.write_to(&mut self.stream)?;
682 self.stream.flush()?;
683
684 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
685 while resp.header.opaque != opaque {
686 debug!(
687 "Expecting opaque: {} but got {}, trying again ...",
688 opaque, resp.header.opaque
689 );
690 resp = ResponsePacket::read_from(&mut self.stream)?;
691 }
692
693 match resp.header.status {
694 Status::NoError => Ok(()),
695 _ => Err(From::from(Error::from_status(resp.header.status, None))),
696 }
697 }
698
699 fn noop(&mut self) -> MemCachedResult<()> {
700 debug!("Noop");
701 let opaque = self.send_noop()?;
702 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
703 while resp.header.opaque != opaque {
704 debug!(
705 "Expecting opaque: {} but got {}, trying again ...",
706 opaque, resp.header.opaque
707 );
708 resp = ResponsePacket::read_from(&mut self.stream)?;
709 }
710
711 match resp.header.status {
712 Status::NoError => Ok(()),
713 _ => Err(From::from(Error::from_status(resp.header.status, None))),
714 }
715 }
716
717 fn version(&mut self) -> MemCachedResult<Version> {
718 let opaque = random::<u32>();
719 debug!("Version");
720 let req_header =
721 RequestHeader::new(Command::Version, DataType::RawBytes, 0, opaque, 0, 0, 0, 0);
722 let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
723
724 req_packet.write_to(&mut self.stream)?;
725 self.stream.flush()?;
726
727 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
728 while resp.header.opaque != opaque {
729 debug!(
730 "Expecting opaque: {} but got {}, trying again ...",
731 opaque, resp.header.opaque
732 );
733 resp = ResponsePacket::read_from(&mut self.stream)?;
734 }
735
736 match resp.header.status {
737 Status::NoError => {
738 let val = resp.value;
739 let verstr = match str::from_utf8(&val[..]) {
740 Ok(vs) => vs,
741 Err(..) => {
742 return Err(proto::Error::OtherError {
743 desc: "Response is not a string",
744 detail: None,
745 })
746 }
747 };
748
749 Ok(match Version::parse(verstr) {
750 Ok(v) => v,
751 Err(err) => {
752 return Err(proto::Error::OtherError {
753 desc: "Unrecognized version string",
754 detail: Some(err.to_string()),
755 })
756 }
757 })
758 }
759 _ => Err(From::from(Error::from_status(resp.header.status, None))),
760 }
761 }
762
763 fn stat(&mut self) -> MemCachedResult<BTreeMap<String, String>> {
764 let opaque = random::<u32>();
765 debug!("Stat");
766 let req_header =
767 RequestHeader::new(Command::Stat, DataType::RawBytes, 0, opaque, 0, 0, 0, 0);
768 let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
769
770 req_packet.write_to(&mut self.stream)?;
771 self.stream.flush()?;
772
773 let mut result = BTreeMap::new();
774 loop {
775 let resp = ResponsePacket::read_from(&mut self.stream)?;
776 if resp.header.opaque != opaque {
777 debug!(
778 "Expecting opaque: {} but got {}, trying again ...",
779 opaque, resp.header.opaque
780 );
781 continue;
782 }
783 match resp.header.status {
784 Status::NoError => {}
785 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
786 }
787
788 if resp.key.is_empty() && resp.value.is_empty() {
789 break;
790 }
791
792 let key = match String::from_utf8(resp.key) {
793 Ok(k) => k,
794 Err(..) => {
795 return Err(proto::Error::OtherError {
796 desc: "Key is not a string",
797 detail: None,
798 })
799 }
800 };
801
802 let val = match String::from_utf8(resp.value) {
803 Ok(k) => k,
804 Err(..) => {
805 return Err(proto::Error::OtherError {
806 desc: "Value is not a string",
807 detail: None,
808 })
809 }
810 };
811
812 result.insert(key, val);
813 }
814
815 Ok(result)
816 }
817}
818
819impl<T: BufRead + Write + Send> MultiOperation for BinaryProto<T> {
820 fn set_multi(&mut self, kv: BTreeMap<&[u8], (&[u8], u32, u32)>) -> MemCachedResult<()> {
821 for (key, (value, flags, expiration)) in kv.into_iter() {
822 let mut extra = [0u8; 8];
823 {
824 let mut extra_buf = Cursor::new(&mut extra[..]);
825 extra_buf.write_u32::<BigEndian>(flags)?;
826 extra_buf.write_u32::<BigEndian>(expiration)?;
827 }
828
829 let req_header = RequestHeader::from_payload(
830 Command::SetQuietly,
831 DataType::RawBytes,
832 0,
833 0,
834 0,
835 key,
836 &extra,
837 value,
838 );
839 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
840
841 req_packet.write_to(&mut self.stream)?;
842 }
843 self.send_noop()?;
844
845 loop {
846 let resp = ResponsePacket::read_from(&mut self.stream)?;
847
848 match resp.header.status {
849 Status::NoError => {}
850 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
851 }
852
853 if resp.header.command == Command::Noop {
854 return Ok(());
855 }
856 }
857 }
858
859 fn delete_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<()> {
860 for key in keys.iter() {
861 let req_header = RequestHeader::from_payload(
862 Command::DeleteQuietly,
863 DataType::RawBytes,
864 0,
865 0,
866 0,
867 *key,
868 &[],
869 &[],
870 );
871 let req_packet = RequestPacketRef::new(&req_header, &[], *key, &[]);
872
873 req_packet.write_to(&mut self.stream)?;
874 }
875 self.send_noop()?;
876
877 loop {
878 let resp = ResponsePacket::read_from(&mut self.stream)?;
879
880 match resp.header.status {
881 Status::NoError | Status::KeyNotFound => {}
882 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
883 }
884
885 if resp.header.command == Command::Noop {
886 return Ok(());
887 }
888 }
889 }
890
891 fn increment_multi<'a>(
892 &mut self,
893 kv: HashMap<&'a [u8], (u64, u64, u32)>,
894 ) -> MemCachedResult<HashMap<&'a [u8], u64>> {
895 let opaques: MemCachedResult<HashMap<_, _>> = kv
896 .into_iter()
897 .map(|(key, (amount, initial, expiration))| {
898 let opaque = random::<u32>();
899 let mut extra = [0u8; 20];
900 {
901 let mut extra_buf = Cursor::new(&mut extra[..]);
902 extra_buf.write_u64::<BigEndian>(amount)?;
903 extra_buf.write_u64::<BigEndian>(initial)?;
904 extra_buf.write_u32::<BigEndian>(expiration)?;
905 }
906
907 let req_header = RequestHeader::from_payload(
908 Command::Increment,
909 DataType::RawBytes,
910 0,
911 opaque,
912 0,
913 key,
914 &extra,
915 &[],
916 );
917 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
918
919 req_packet.write_to(&mut self.stream)?;
920 Ok((opaque, key))
921 })
922 .collect();
923
924 let opaques = opaques?;
925
926 self.send_noop()?;
927 self.stream.flush()?;
928
929 let mut results = HashMap::with_capacity(opaques.len());
930 loop {
931 let resp = ResponsePacket::read_from(&mut self.stream)?;
932 match resp.header.status {
933 Status::NoError => {}
934 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
935 }
936
937 if resp.header.command == Command::Noop {
938 return Ok(results);
939 }
940
941 if let Some(key) = opaques.get(&resp.header.opaque) {
942 let mut bufr = BufReader::new(&resp.value[..]);
943 let val = bufr.read_u64::<BigEndian>()?;
944 results.insert(key, val);
945 }
946 }
947 }
948
949 fn get_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<HashMap<Vec<u8>, (Vec<u8>, u32)>> {
950 for key in keys.iter() {
951 let req_header = RequestHeader::from_payload(
952 Command::GetKeyQuietly,
953 DataType::RawBytes,
954 0,
955 0,
956 0,
957 *key,
958 &[],
959 &[],
960 );
961 let req_packet = RequestPacketRef::new(&req_header, &[], *key, &[]);
962
963 req_packet.write_to(&mut self.stream)?;
964 }
965 self.send_noop()?;
966
967 let mut result = HashMap::with_capacity(keys.len());
968 loop {
969 let resp = ResponsePacket::read_from(&mut self.stream)?;
970 match resp.header.status {
971 Status::NoError => {}
972 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
973 }
974
975 if resp.header.command == Command::Noop {
976 return Ok(result);
977 }
978
979 let mut extrabufr = BufReader::new(&resp.extra[..]);
980 let flags = extrabufr.read_u32::<BigEndian>()?;
981
982 result.insert(resp.key, (resp.value, flags));
983 }
984 }
985}
986
987impl<T: BufRead + Write + Send> NoReplyOperation for BinaryProto<T> {
988 fn set_noreply(
989 &mut self,
990 key: &[u8],
991 value: &[u8],
992 flags: u32,
993 expiration: u32,
994 ) -> MemCachedResult<()> {
995 let opaque = random::<u32>();
996 debug!(
997 "Set noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
998 key,
999 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1000 value,
1001 flags,
1002 expiration
1003 );
1004 let mut extra = [0u8; 8];
1005 {
1006 let mut extra_buf = Cursor::new(&mut extra[..]);
1007 extra_buf.write_u32::<BigEndian>(flags)?;
1008 extra_buf.write_u32::<BigEndian>(expiration)?;
1009 }
1010
1011 let req_header = RequestHeader::from_payload(
1012 Command::SetQuietly,
1013 DataType::RawBytes,
1014 0,
1015 opaque,
1016 0,
1017 key,
1018 &extra,
1019 value,
1020 );
1021 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1022
1023 req_packet.write_to(&mut self.stream)?;
1024 self.stream.flush()?;
1025
1026 Ok(())
1027 }
1028
1029 fn add_noreply(
1030 &mut self,
1031 key: &[u8],
1032 value: &[u8],
1033 flags: u32,
1034 expiration: u32,
1035 ) -> MemCachedResult<()> {
1036 let opaque = random::<u32>();
1037 debug!(
1038 "Add noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1039 key,
1040 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1041 value,
1042 flags,
1043 expiration
1044 );
1045 let mut extra = [0u8; 8];
1046 {
1047 let mut extra_buf = Cursor::new(&mut extra[..]);
1048 extra_buf.write_u32::<BigEndian>(flags)?;
1049 extra_buf.write_u32::<BigEndian>(expiration)?;
1050 }
1051
1052 let req_header = RequestHeader::from_payload(
1053 Command::AddQuietly,
1054 DataType::RawBytes,
1055 0,
1056 opaque,
1057 0,
1058 key,
1059 &extra,
1060 value,
1061 );
1062 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1063
1064 req_packet.write_to(&mut self.stream)?;
1065 self.stream.flush()?;
1066
1067 Ok(())
1068 }
1069
1070 fn delete_noreply(&mut self, key: &[u8]) -> MemCachedResult<()> {
1071 let opaque = random::<u32>();
1072 debug!(
1073 "Delete noreply key: {:?} {:?}",
1074 key,
1075 str::from_utf8(key).unwrap_or("<not-utf8-key>")
1076 );
1077 let req_header = RequestHeader::from_payload(
1078 Command::DeleteQuietly,
1079 DataType::RawBytes,
1080 0,
1081 opaque,
1082 0,
1083 key,
1084 &[],
1085 &[],
1086 );
1087 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1088
1089 req_packet.write_to(&mut self.stream)?;
1090 self.stream.flush()?;
1091
1092 Ok(())
1093 }
1094
1095 fn replace_noreply(
1096 &mut self,
1097 key: &[u8],
1098 value: &[u8],
1099 flags: u32,
1100 expiration: u32,
1101 ) -> MemCachedResult<()> {
1102 let opaque = random::<u32>();
1103 debug!(
1104 "Replace noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1105 key,
1106 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1107 value,
1108 flags,
1109 expiration
1110 );
1111 let mut extra = [0u8; 8];
1112 {
1113 let mut extra_buf = Cursor::new(&mut extra[..]);
1114 extra_buf.write_u32::<BigEndian>(flags)?;
1115 extra_buf.write_u32::<BigEndian>(expiration)?;
1116 }
1117
1118 let req_header = RequestHeader::from_payload(
1119 Command::ReplaceQuietly,
1120 DataType::RawBytes,
1121 0,
1122 opaque,
1123 0,
1124 key,
1125 &extra,
1126 value,
1127 );
1128 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1129
1130 req_packet.write_to(&mut self.stream)?;
1131 self.stream.flush()?;
1132
1133 Ok(())
1134 }
1135
1136 fn increment_noreply(
1137 &mut self,
1138 key: &[u8],
1139 amount: u64,
1140 initial: u64,
1141 expiration: u32,
1142 ) -> MemCachedResult<()> {
1143 let opaque = random::<u32>();
1144 debug!(
1145 "Increment noreply key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
1146 key,
1147 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1148 amount,
1149 initial,
1150 expiration
1151 );
1152 let mut extra = [0u8; 20];
1153 {
1154 let mut extra_buf = Cursor::new(&mut extra[..]);
1155 extra_buf.write_u64::<BigEndian>(amount)?;
1156 extra_buf.write_u64::<BigEndian>(initial)?;
1157 extra_buf.write_u32::<BigEndian>(expiration)?;
1158 }
1159
1160 let req_header = RequestHeader::from_payload(
1161 Command::IncrementQuietly,
1162 DataType::RawBytes,
1163 0,
1164 opaque,
1165 0,
1166 key,
1167 &extra,
1168 &[],
1169 );
1170 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1171
1172 req_packet.write_to(&mut self.stream)?;
1173 self.stream.flush()?;
1174
1175 Ok(())
1176 }
1177
1178 fn decrement_noreply(
1179 &mut self,
1180 key: &[u8],
1181 amount: u64,
1182 initial: u64,
1183 expiration: u32,
1184 ) -> MemCachedResult<()> {
1185 let opaque = random::<u32>();
1186 debug!(
1187 "Decrement noreply key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
1188 key,
1189 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1190 amount,
1191 initial,
1192 expiration
1193 );
1194 let mut extra = [0u8; 20];
1195 {
1196 let mut extra_buf = Cursor::new(&mut extra[..]);
1197 extra_buf.write_u64::<BigEndian>(amount)?;
1198 extra_buf.write_u64::<BigEndian>(initial)?;
1199 extra_buf.write_u32::<BigEndian>(expiration)?;
1200 }
1201
1202 let req_header = RequestHeader::from_payload(
1203 Command::DecrementQuietly,
1204 DataType::RawBytes,
1205 0,
1206 opaque,
1207 0,
1208 key,
1209 &extra,
1210 &[],
1211 );
1212 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1213
1214 req_packet.write_to(&mut self.stream)?;
1215 self.stream.flush()?;
1216
1217 Ok(())
1218 }
1219
1220 fn append_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
1221 let opaque = random::<u32>();
1222 debug!(
1223 "Append noreply key: {:?} {:?}, value: {:?}",
1224 key,
1225 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1226 value
1227 );
1228 let req_header = RequestHeader::from_payload(
1229 Command::AppendQuietly,
1230 DataType::RawBytes,
1231 0,
1232 opaque,
1233 0,
1234 key,
1235 &[],
1236 value,
1237 );
1238 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1239
1240 req_packet.write_to(&mut self.stream)?;
1241 self.stream.flush()?;
1242
1243 Ok(())
1244 }
1245
1246 fn prepend_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
1247 let opaque = random::<u32>();
1248 debug!(
1249 "Prepend noreply key: {:?} {:?}, value: {:?}",
1250 key,
1251 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1252 value
1253 );
1254 let req_header = RequestHeader::from_payload(
1255 Command::PrependQuietly,
1256 DataType::RawBytes,
1257 0,
1258 opaque,
1259 0,
1260 key,
1261 &[],
1262 value,
1263 );
1264 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1265
1266 req_packet.write_to(&mut self.stream)?;
1267 self.stream.flush()?;
1268
1269 Ok(())
1270 }
1271}
1272
1273impl<T: BufRead + Write + Send> CasOperation for BinaryProto<T> {
1274 fn set_cas(
1275 &mut self,
1276 key: &[u8],
1277 value: &[u8],
1278 flags: u32,
1279 expiration: u32,
1280 cas: u64,
1281 ) -> MemCachedResult<u64> {
1282 let opaque = random::<u32>();
1283 debug!(
1284 "Set cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}, cas: {}",
1285 key,
1286 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1287 value,
1288 flags,
1289 expiration,
1290 cas
1291 );
1292 let mut extra = [0u8; 8];
1293 {
1294 let mut extra_buf = Cursor::new(&mut extra[..]);
1295 extra_buf.write_u32::<BigEndian>(flags)?;
1296 extra_buf.write_u32::<BigEndian>(expiration)?;
1297 }
1298
1299 let req_header = RequestHeader::from_payload(
1300 Command::Set,
1301 DataType::RawBytes,
1302 0,
1303 opaque,
1304 cas,
1305 key,
1306 &extra,
1307 value,
1308 );
1309 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1310
1311 req_packet.write_to(&mut self.stream)?;
1312 self.stream.flush()?;
1313
1314 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1315 while resp.header.opaque != opaque {
1316 debug!(
1317 "Expecting opaque: {} but got {}, trying again ...",
1318 opaque, resp.header.opaque
1319 );
1320 resp = ResponsePacket::read_from(&mut self.stream)?;
1321 }
1322
1323 match resp.header.status {
1324 Status::NoError => Ok(resp.header.cas),
1325 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1326 }
1327 }
1328
1329 fn add_cas(
1330 &mut self,
1331 key: &[u8],
1332 value: &[u8],
1333 flags: u32,
1334 expiration: u32,
1335 ) -> MemCachedResult<u64> {
1336 let opaque = random::<u32>();
1337 debug!(
1338 "Add cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1339 key,
1340 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1341 value,
1342 flags,
1343 expiration
1344 );
1345 let mut extra = [0u8; 8];
1346 {
1347 let mut extra_buf = Cursor::new(&mut extra[..]);
1348 extra_buf.write_u32::<BigEndian>(flags)?;
1349 extra_buf.write_u32::<BigEndian>(expiration)?;
1350 }
1351
1352 let req_header = RequestHeader::from_payload(
1353 Command::Add,
1354 DataType::RawBytes,
1355 0,
1356 opaque,
1357 0,
1358 key,
1359 &extra,
1360 value,
1361 );
1362 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1363
1364 req_packet.write_to(&mut self.stream)?;
1365 self.stream.flush()?;
1366
1367 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1368 while resp.header.opaque != opaque {
1369 debug!(
1370 "Expecting opaque: {} but got {}, trying again ...",
1371 opaque, resp.header.opaque
1372 );
1373 resp = ResponsePacket::read_from(&mut self.stream)?;
1374 }
1375
1376 match resp.header.status {
1377 Status::NoError => Ok(resp.header.cas),
1378 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1379 }
1380 }
1381
1382 fn replace_cas(
1383 &mut self,
1384 key: &[u8],
1385 value: &[u8],
1386 flags: u32,
1387 expiration: u32,
1388 cas: u64,
1389 ) -> MemCachedResult<u64> {
1390 let opaque = random::<u32>();
1391 debug!(
1392 "Replace cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}, cas: {}",
1393 key,
1394 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1395 value,
1396 flags,
1397 expiration,
1398 cas
1399 );
1400 let mut extra = [0u8; 8];
1401 {
1402 let mut extra_buf = Cursor::new(&mut extra[..]);
1403 extra_buf.write_u32::<BigEndian>(flags)?;
1404 extra_buf.write_u32::<BigEndian>(expiration)?;
1405 }
1406
1407 let req_header = RequestHeader::from_payload(
1408 Command::Replace,
1409 DataType::RawBytes,
1410 0,
1411 opaque,
1412 cas,
1413 key,
1414 &extra,
1415 value,
1416 );
1417 let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1418
1419 req_packet.write_to(&mut self.stream)?;
1420 self.stream.flush()?;
1421
1422 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1423 while resp.header.opaque != opaque {
1424 debug!(
1425 "Expecting opaque: {} but got {}, trying again ...",
1426 opaque, resp.header.opaque
1427 );
1428 resp = ResponsePacket::read_from(&mut self.stream)?;
1429 }
1430
1431 match resp.header.status {
1432 Status::NoError => Ok(resp.header.cas),
1433 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1434 }
1435 }
1436
1437 fn get_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32, u64)> {
1438 let opaque = random::<u32>();
1439 debug!(
1440 "Get cas key: {:?} {:?}",
1441 key,
1442 str::from_utf8(key).unwrap_or("<not-utf8-key>")
1443 );
1444 let req_header = RequestHeader::from_payload(
1445 Command::Get,
1446 DataType::RawBytes,
1447 0,
1448 opaque,
1449 0,
1450 key,
1451 &[],
1452 &[],
1453 );
1454 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1455
1456 req_packet.write_to(&mut self.stream)?;
1457 self.stream.flush()?;
1458
1459 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1460 while resp.header.opaque != opaque {
1461 debug!(
1462 "Expecting opaque: {} but got {}, trying again ...",
1463 opaque, resp.header.opaque
1464 );
1465 resp = ResponsePacket::read_from(&mut self.stream)?;
1466 }
1467
1468 match resp.header.status {
1469 Status::NoError => {
1470 let mut extrabufr = BufReader::new(&resp.extra[..]);
1471 let flags = extrabufr.read_u32::<BigEndian>()?;
1472
1473 Ok((resp.value, flags, resp.header.cas))
1474 }
1475 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1476 }
1477 }
1478
1479 fn getk_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32, u64)> {
1480 let opaque = random::<u32>();
1481 debug!(
1482 "GetK cas key: {:?} {:?}",
1483 key,
1484 str::from_utf8(key).unwrap_or("<not-utf8-key>")
1485 );
1486 let req_header = RequestHeader::from_payload(
1487 Command::GetKey,
1488 DataType::RawBytes,
1489 0,
1490 opaque,
1491 0,
1492 key,
1493 &[],
1494 &[],
1495 );
1496 let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1497
1498 req_packet.write_to(&mut self.stream)?;
1499 self.stream.flush()?;
1500
1501 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1502 while resp.header.opaque != opaque {
1503 debug!(
1504 "Expecting opaque: {} but got {}, trying again ...",
1505 opaque, resp.header.opaque
1506 );
1507 resp = ResponsePacket::read_from(&mut self.stream)?;
1508 }
1509
1510 match resp.header.status {
1511 Status::NoError => {
1512 let mut extrabufr = BufReader::new(&resp.extra[..]);
1513 let flags = extrabufr.read_u32::<BigEndian>()?;
1514
1515 Ok((resp.key, resp.value, flags, resp.header.cas))
1516 }
1517 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1518 }
1519 }
1520
1521 fn increment_cas(
1522 &mut self,
1523 key: &[u8],
1524 amount: u64,
1525 initial: u64,
1526 expiration: u32,
1527 cas: u64,
1528 ) -> MemCachedResult<(u64, u64)> {
1529 let opaque = random::<u32>();
1530 debug!(
1531 "Increment cas key: {:?} {:?}, amount: {}, initial: {}, expiration: {}, cas: {}",
1532 key,
1533 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1534 amount,
1535 initial,
1536 expiration,
1537 cas
1538 );
1539 let mut extra = [0u8; 20];
1540 {
1541 let mut extra_buf = Cursor::new(&mut extra[..]);
1542 extra_buf.write_u64::<BigEndian>(amount)?;
1543 extra_buf.write_u64::<BigEndian>(initial)?;
1544 extra_buf.write_u32::<BigEndian>(expiration)?;
1545 }
1546
1547 let req_header = RequestHeader::from_payload(
1548 Command::Increment,
1549 DataType::RawBytes,
1550 0,
1551 opaque,
1552 cas,
1553 key,
1554 &extra,
1555 &[],
1556 );
1557 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1558
1559 req_packet.write_to(&mut self.stream)?;
1560 self.stream.flush()?;
1561
1562 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1563 while resp.header.opaque != opaque {
1564 debug!(
1565 "Expecting opaque: {} but got {}, trying again ...",
1566 opaque, resp.header.opaque
1567 );
1568 resp = ResponsePacket::read_from(&mut self.stream)?;
1569 }
1570
1571 match resp.header.status {
1572 Status::NoError => {
1573 let mut bufr = BufReader::new(&resp.value[..]);
1574 Ok((bufr.read_u64::<BigEndian>()?, resp.header.cas))
1575 }
1576 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1577 }
1578 }
1579
1580 fn decrement_cas(
1581 &mut self,
1582 key: &[u8],
1583 amount: u64,
1584 initial: u64,
1585 expiration: u32,
1586 cas: u64,
1587 ) -> MemCachedResult<(u64, u64)> {
1588 let opaque = random::<u32>();
1589 debug!(
1590 "Decrement cas key: {:?} {:?}, amount: {}, initial: {}, expiration: {}, cas: {}",
1591 key,
1592 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1593 amount,
1594 initial,
1595 expiration,
1596 cas
1597 );
1598 let mut extra = [0u8; 20];
1599 {
1600 let mut extra_buf = Cursor::new(&mut extra[..]);
1601 extra_buf.write_u64::<BigEndian>(amount)?;
1602 extra_buf.write_u64::<BigEndian>(initial)?;
1603 extra_buf.write_u32::<BigEndian>(expiration)?;
1604 }
1605
1606 let req_header = RequestHeader::from_payload(
1607 Command::Decrement,
1608 DataType::RawBytes,
1609 0,
1610 opaque,
1611 cas,
1612 key,
1613 &extra,
1614 &[],
1615 );
1616 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1617
1618 req_packet.write_to(&mut self.stream)?;
1619 self.stream.flush()?;
1620
1621 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1622 while resp.header.opaque != opaque {
1623 debug!(
1624 "Expecting opaque: {} but got {}, trying again ...",
1625 opaque, resp.header.opaque
1626 );
1627 resp = ResponsePacket::read_from(&mut self.stream)?;
1628 }
1629
1630 match resp.header.status {
1631 Status::NoError => {
1632 let mut bufr = BufReader::new(&resp.value[..]);
1633 Ok((bufr.read_u64::<BigEndian>()?, resp.header.cas))
1634 }
1635 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1636 }
1637 }
1638
1639 fn append_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
1640 let opaque = random::<u32>();
1641 debug!(
1642 "Append cas key: {:?} {:?}, value: {:?}, cas: {}",
1643 key,
1644 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1645 value,
1646 cas
1647 );
1648 let req_header = RequestHeader::from_payload(
1649 Command::Append,
1650 DataType::RawBytes,
1651 0,
1652 opaque,
1653 cas,
1654 key,
1655 &[],
1656 value,
1657 );
1658 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1659
1660 req_packet.write_to(&mut self.stream)?;
1661 self.stream.flush()?;
1662
1663 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1664 while resp.header.opaque != opaque {
1665 debug!(
1666 "Expecting opaque: {} but got {}, trying again ...",
1667 opaque, resp.header.opaque
1668 );
1669 resp = ResponsePacket::read_from(&mut self.stream)?;
1670 }
1671
1672 match resp.header.status {
1673 Status::NoError => Ok(resp.header.cas),
1674 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1675 }
1676 }
1677
1678 fn prepend_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
1679 let opaque = random::<u32>();
1680 debug!(
1681 "Prepend cas key: {:?} {:?}, value: {:?}, cas: {}",
1682 key,
1683 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1684 value,
1685 cas
1686 );
1687 let req_header = RequestHeader::from_payload(
1688 Command::Prepend,
1689 DataType::RawBytes,
1690 0,
1691 opaque,
1692 cas,
1693 key,
1694 &[],
1695 value,
1696 );
1697 let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1698
1699 req_packet.write_to(&mut self.stream)?;
1700 self.stream.flush()?;
1701
1702 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1703 while resp.header.opaque != opaque {
1704 debug!(
1705 "Expecting opaque: {} but got {}, trying again ...",
1706 opaque, resp.header.opaque
1707 );
1708 resp = ResponsePacket::read_from(&mut self.stream)?;
1709 }
1710
1711 match resp.header.status {
1712 Status::NoError => Ok(resp.header.cas),
1713 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1714 }
1715 }
1716
1717 fn touch_cas(&mut self, key: &[u8], expiration: u32, cas: u64) -> MemCachedResult<u64> {
1718 let opaque = random::<u32>();
1719 debug!(
1720 "Touch cas key: {:?} {:?}, expiration: {:?}, cas: {}",
1721 key,
1722 str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1723 expiration,
1724 cas
1725 );
1726 let mut extra = [0u8; 4];
1727 {
1728 let mut extra_buf = Cursor::new(&mut extra[..]);
1729 extra_buf.write_u32::<BigEndian>(expiration)?;
1730 }
1731
1732 let req_header = RequestHeader::from_payload(
1733 Command::Touch,
1734 DataType::RawBytes,
1735 0,
1736 opaque,
1737 cas,
1738 key,
1739 &extra,
1740 &[],
1741 );
1742 let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1743
1744 req_packet.write_to(&mut self.stream)?;
1745 self.stream.flush()?;
1746
1747 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1748 while resp.header.opaque != opaque {
1749 debug!(
1750 "Expecting opaque: {} but got {}, trying again ...",
1751 opaque, resp.header.opaque
1752 );
1753 resp = ResponsePacket::read_from(&mut self.stream)?;
1754 }
1755
1756 match resp.header.status {
1757 Status::NoError => Ok(resp.header.cas),
1758 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1759 }
1760 }
1761}
1762
1763impl<T: BufRead + Write + Send> AuthOperation for BinaryProto<T> {
1764 fn list_mechanisms(&mut self) -> MemCachedResult<Vec<String>> {
1765 let opaque = random::<u32>();
1766 debug!("List mechanisms");
1767 let req_header = RequestHeader::new(
1768 Command::SaslListMechanisms,
1769 DataType::RawBytes,
1770 0,
1771 opaque,
1772 0,
1773 0,
1774 0,
1775 0,
1776 );
1777 let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
1778 req_packet.write_to(&mut self.stream)?;
1779 self.stream.flush()?;
1780
1781 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1782 while resp.header.opaque != opaque {
1783 debug!(
1784 "Expecting opaque: {} but got {}, trying again ...",
1785 opaque, resp.header.opaque
1786 );
1787 resp = ResponsePacket::read_from(&mut self.stream)?;
1788 }
1789
1790 match resp.header.status {
1791 Status::NoError => {}
1792 _ => return Err(From::from(Error::from_status(resp.header.status, None))),
1793 }
1794
1795 match str::from_utf8(&resp.value[..]) {
1796 Ok(s) => Ok(s.split(' ').map(|mech| mech.to_string()).collect()),
1797 Err(..) => Err(proto::Error::OtherError {
1798 desc: "Mechanism decode error",
1799 detail: None,
1800 }),
1801 }
1802 }
1803
1804 fn auth_start(&mut self, mech: &str, init: &[u8]) -> MemCachedResult<AuthResponse> {
1805 let opaque = random::<u32>();
1806 debug!("Auth start, mechanism: {:?}, init: {:?}", mech, init);
1807 let req_header = RequestHeader::from_payload(
1808 Command::SaslAuthenticate,
1809 DataType::RawBytes,
1810 0,
1811 opaque,
1812 0,
1813 mech.as_bytes(),
1814 &[],
1815 init,
1816 );
1817 let req_packet = RequestPacketRef::new(&req_header, &[], mech.as_bytes(), init);
1818 req_packet.write_to(&mut self.stream)?;
1819 self.stream.flush()?;
1820
1821 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1822 while resp.header.opaque != opaque {
1823 debug!(
1824 "Expecting opaque: {} but got {}, trying again ...",
1825 opaque, resp.header.opaque
1826 );
1827 resp = ResponsePacket::read_from(&mut self.stream)?;
1828 }
1829
1830 match resp.header.status {
1831 Status::AuthenticationFurtherStepRequired => Ok(AuthResponse::Continue(resp.value)),
1832 Status::NoError => Ok(AuthResponse::Succeeded),
1833 Status::AuthenticationRequired => Ok(AuthResponse::Failed),
1834 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1835 }
1836 }
1837
1838 fn auth_continue(&mut self, mech: &str, data: &[u8]) -> MemCachedResult<AuthResponse> {
1839 let opaque = random::<u32>();
1840 debug!("Auth continue, mechanism: {:?}, data: {:?}", mech, data);
1841 let req_header = RequestHeader::from_payload(
1842 Command::SaslStep,
1843 DataType::RawBytes,
1844 0,
1845 opaque,
1846 0,
1847 mech.as_bytes(),
1848 &[],
1849 data,
1850 );
1851 let req_packet = RequestPacketRef::new(&req_header, &[], mech.as_bytes(), data);
1852 req_packet.write_to(&mut self.stream)?;
1853 self.stream.flush()?;
1854
1855 let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1856 while resp.header.opaque != opaque {
1857 debug!(
1858 "Expecting opaque: {} but got {}, trying again ...",
1859 opaque, resp.header.opaque
1860 );
1861 resp = ResponsePacket::read_from(&mut self.stream)?;
1862 }
1863
1864 match resp.header.status {
1865 Status::AuthenticationFurtherStepRequired => Ok(AuthResponse::Continue(resp.value)),
1866 Status::NoError => Ok(AuthResponse::Succeeded),
1867 Status::AuthenticationRequired => Ok(AuthResponse::Failed),
1868 _ => Err(From::from(Error::from_status(resp.header.status, None))),
1869 }
1870 }
1871}
1872
1873#[cfg(test)]
1874mod test {
1875 use crate::proto::{
1876 BinaryProto, CasOperation, MultiOperation, NoReplyOperation, Operation, ServerOperation,
1877 };
1878 use std::collections::{BTreeMap, HashMap};
1879 use std::net::TcpStream;
1880
1881 use bufstream::BufStream;
1882
1883 const SERVER_ADDR: &str = "127.0.0.1:11211";
1884
1885 fn get_client() -> BinaryProto<BufStream<TcpStream>> {
1886 let stream = TcpStream::connect(SERVER_ADDR).unwrap();
1887 BinaryProto::new(BufStream::new(stream))
1888 }
1889
1890 #[test]
1891 fn test_set_get_delete() {
1892 const KEY: &[u8] = b"test:set_get_delete";
1893 const VAL: &[u8] = b"world";
1894
1895 let mut client = get_client();
1896 client.set(KEY, VAL, 0xdead_beef, 120).unwrap();
1897
1898 let get_resp = client.get(KEY);
1899 assert_eq!(get_resp.unwrap(), (VAL.to_vec(), 0xdead_beef));
1900
1901 let getk_resp = client.getk(KEY);
1902 assert_eq!(
1903 getk_resp.unwrap(),
1904 (KEY.to_vec(), VAL.to_vec(), 0xdead_beef)
1905 );
1906
1907 client.delete(KEY).unwrap();
1908 }
1909
1910 #[test]
1911 fn test_incr_decr() {
1912 const KEY: &[u8] = b"test:incr_decr";
1913
1914 let mut client = get_client();
1915 let _ = client.delete(KEY);
1916 {
1917 let incr_resp = client.increment(KEY, 1, 0, 120);
1918 assert_eq!(incr_resp.unwrap(), 0);
1919 }
1920
1921 {
1922 let incr_resp = client.increment(KEY, 10, 0, 120);
1923 assert_eq!(incr_resp.unwrap(), 10);
1924 }
1925
1926 {
1927 let decr_resp = client.decrement(KEY, 5, 0, 120);
1928 assert_eq!(decr_resp.unwrap(), 5);
1929 }
1930
1931 {
1932 let decr_resp = client.decrement(KEY, 20, 0, 120);
1933 assert_eq!(decr_resp.unwrap(), 0);
1934 }
1935
1936 client.delete(KEY).unwrap();
1937 }
1938
1939 #[test]
1940 fn test_version() {
1941 let mut client = get_client();
1942 client.version().unwrap();
1943 }
1944
1945 #[test]
1946 fn test_noop() {
1947 let mut client = get_client();
1948 client.noop().unwrap();
1949 }
1950
1951 #[test]
1952 #[should_panic]
1953 fn test_quit() {
1954 let mut client = get_client();
1955 client.quit().unwrap();
1956
1957 client.noop().unwrap();
1958 }
1959
1960 #[test]
1961 fn test_flush() {
1962 let mut client = get_client();
1963 client.flush(2).unwrap();
1964 }
1965
1966 #[test]
1967 fn test_add() {
1968 const KEY: &[u8] = b"test:add";
1969 const INIT_VAL: &[u8] = b"initial";
1970 const ADD_VAL: &[u8] = b"added";
1971
1972 let mut client = get_client();
1973
1974 let _ = client.delete(KEY);
1975
1976 client.add(KEY, INIT_VAL, 0xdead_beef, 120).unwrap();
1977
1978 {
1979 let get_resp = client.get(KEY);
1980 assert_eq!(get_resp.unwrap(), (INIT_VAL.to_vec(), 0xdead_beef));
1981 client.add(KEY, ADD_VAL, 0xdead_beef, 120).unwrap_err();
1982 }
1983
1984 client.delete(KEY).unwrap();
1985 }
1986
1987 #[test]
1988 fn test_replace() {
1989 let mut client = get_client();
1990 client
1991 .replace(b"test:replace_key", b"replaced", 0xdead_beef, 120)
1992 .unwrap_err();
1993 client
1994 .add(b"test:replace_key", b"just_add", 0xdead_beef, 120)
1995 .unwrap();
1996 client
1997 .replace(b"test:replace_key", b"replaced", 0xdead_beef, 120)
1998 .unwrap();
1999 client.delete(b"test:replace_key").unwrap();
2000 }
2001
2002 #[test]
2003 fn test_append_prepend() {
2004 let mut client = get_client();
2005 client.append(b"test:append_key", b"appended").unwrap_err();
2006 client
2007 .prepend(b"test:append_key", b"prepended")
2008 .unwrap_err();
2009
2010 {
2011 client
2012 .add(b"test:append_key", b"just_add", 0xdead_beef, 120)
2013 .unwrap();
2014 client.append(b"test:append_key", b"appended").unwrap();
2015
2016 let get_resp = client.get(b"test:append_key");
2017 assert_eq!(
2018 get_resp.unwrap(),
2019 (b"just_addappended".to_vec(), 0xdead_beef)
2020 );
2021
2022 client.prepend(b"test:append_key", b"prepended").unwrap();
2023 let get_resp = client.get(b"test:append_key");
2024 assert_eq!(
2025 get_resp.unwrap(),
2026 (b"prependedjust_addappended".to_vec(), 0xdead_beef)
2027 );
2028 }
2029
2030 client.delete(b"test:append_key").unwrap();
2031 }
2032
2033 #[test]
2034 fn test_stat() {
2035 let mut client = get_client();
2036 client.stat().unwrap();
2037 }
2038
2039 #[test]
2040 fn test_touch() {
2041 let mut client = get_client();
2042 client.touch(b"test:touch", 120).unwrap_err();
2043 client.add(b"test:touch", b"val", 0xcafe_babe, 100).unwrap();
2044 client.touch(b"test:touch", 120).unwrap();
2045 client.delete(b"test:touch").unwrap();
2046 }
2047
2048 #[test]
2049 fn test_set_get_delete_incr_muti() {
2050 let mut client = get_client();
2051
2052 let mut data = BTreeMap::new();
2053 data.insert(
2054 &b"test:multi_hello1"[..],
2055 (&b"world1"[..], 0xdead_beef, 120),
2056 );
2057 data.insert(
2058 &b"test:multi_hello2"[..],
2059 (&b"world2"[..], 0xdead_beef, 120),
2060 );
2061 data.insert(&b"test:multi_num1"[..], (&b"100"[..], 0xdead_beef, 120));
2062 data.insert(&b"test:multi_num2"[..], (&b"200"[..], 0xdead_beef, 120));
2063 data.insert(
2064 &b"test:multi_lastone"[..],
2065 (&b"last!"[..], 0xdead_beef, 120),
2066 );
2067
2068 client.set_multi(data).unwrap();
2069
2070 let get_resp_map = client
2071 .get_multi(&[
2072 b"test:multi_hello1",
2073 b"test:multi_hello2",
2074 b"test:multi_lastone",
2075 ])
2076 .unwrap();
2077 assert_eq!(
2078 get_resp_map.get(&b"test:multi_hello1".to_vec()),
2079 Some(&(b"world1".to_vec(), 0xdead_beef))
2080 );
2081 assert_eq!(
2082 get_resp_map.get(&b"test:multi_hello2".to_vec()),
2083 Some(&(b"world2".to_vec(), 0xdead_beef))
2084 );
2085 assert_eq!(
2086 get_resp_map.get(&b"test:multi_lastone".to_vec()),
2087 Some(&(b"last!".to_vec(), 0xdead_beef))
2088 );
2089
2090 client
2091 .delete_multi(&[
2092 b"test:multi_hello1",
2093 b"test:multi_hello2",
2094 b"test:multi_num3",
2095 ])
2096 .unwrap();
2097
2098 let get_resp_map = client
2099 .get_multi(&[
2100 b"test:multi_hello1",
2101 b"test:multi_hello2",
2102 b"test:multi_lastone",
2103 ])
2104 .unwrap();
2105 assert_eq!(get_resp_map.get(&b"test:multi_hello1".to_vec()), None);
2106 assert_eq!(get_resp_map.get(&b"test:multi_hello2".to_vec()), None);
2107 assert_eq!(
2108 get_resp_map.get(&b"test:multi_lastone".to_vec()),
2109 Some(&(b"last!".to_vec(), 0xdead_beef))
2110 );
2111
2112 let mut data = HashMap::new();
2113 data.insert(&b"test:multi_num1"[..], (10, 50, 120));
2114 data.insert(&b"test:multi_num2"[..], (20, 50, 120));
2115 data.insert(&b"test:multi_num3"[..], (30, 50, 120));
2116 client.increment_multi(data).unwrap();
2117
2118 let get_resp_map = client
2119 .get_multi(&[b"test:multi_num1", b"test:multi_num2", b"test:multi_num3"])
2120 .unwrap();
2121 assert_eq!(
2122 get_resp_map.get(&b"test:multi_num1".to_vec()),
2123 Some(&(b"110".to_vec(), 0xdead_beef))
2124 );
2125 assert_eq!(
2126 get_resp_map.get(&b"test:multi_num2".to_vec()),
2127 Some(&(b"220".to_vec(), 0xdead_beef))
2128 );
2129 assert_eq!(
2130 get_resp_map.get(&b"test:multi_num3".to_vec()),
2131 Some(&(b"50".to_vec(), 0x0))
2132 );
2133
2134 client
2135 .delete_multi(&[b"lastone", b"not_exists!!!!"])
2136 .unwrap();
2137 }
2138
2139 #[test]
2140 fn test_set_add_replace_noreply() {
2141 let key = b"test:noreply_key";
2142 let set_val = b"value";
2143 let add_val = b"just add";
2144 let rep_val = b"replaced";
2145
2146 let mut client = get_client();
2147
2148 let _ = client.delete(key);
2149
2150 client.add_noreply(key, add_val, 0xdead_beef, 120).unwrap();
2151
2152 let get_resp = client.get(key);
2153 assert_eq!(get_resp.unwrap(), (add_val.to_vec(), 0xdead_beef));
2154
2155 client.set_noreply(key, set_val, 0xdead_beef, 120).unwrap();
2156
2157 let get_resp = client.get(key);
2158 assert_eq!(get_resp.unwrap(), (set_val.to_vec(), 0xdead_beef));
2159
2160 client
2161 .replace_noreply(key, rep_val, 0xcafe_babe, 120)
2162 .unwrap();
2163
2164 let get_resp = client.get(key);
2165 assert_eq!(get_resp.unwrap(), (rep_val.to_vec(), 0xcafe_babe));
2166
2167 client.delete(key).unwrap();
2168 }
2169
2170 #[test]
2171 fn test_set_add_replace_cas() {
2172 let key = b"test:cas_key";
2173 let set_val = b"value";
2174 let add_val = b"just add";
2175 let rep_val = b"replaced";
2176
2177 let mut client = get_client();
2178
2179 let add_cas = client.add_cas(key, add_val, 0xdead_beef, 120).unwrap();
2180
2181 {
2182 client
2183 .set_cas(key, set_val, 0xdead_beef, 120, add_cas + 1)
2184 .unwrap_err();
2185
2186 let (_, _, get_cas) = client.get_cas(key).unwrap();
2187 assert_eq!(get_cas, add_cas);
2188
2189 client
2190 .replace_cas(key, rep_val, 0xdead_beef, 120, add_cas + 1)
2191 .unwrap_err();
2192 }
2193
2194 {
2195 let set_cas = client
2196 .set_cas(key, set_val, 0xdead_beef, 120, add_cas)
2197 .unwrap();
2198 let (_, _, get_cas) = client.get_cas(key).unwrap();
2199 assert_eq!(get_cas, set_cas);
2200
2201 client
2202 .replace_cas(key, rep_val, 0xdead_beef, 120, set_cas)
2203 .unwrap();
2204 }
2205
2206 client.delete(key).unwrap();
2207 }
2208
2209 #[test]
2210 fn test_incr_decr_cas() {
2211 let key = b"test:incr_decr_cas";
2212 let mut client = get_client();
2213
2214 let _ = client.delete(key);
2215
2216 let (_, incr_cas) = client.increment_cas(key, 0, 100, 120, 0).unwrap();
2217
2218 client
2219 .increment_cas(key, 0, 10, 120, incr_cas + 1)
2220 .unwrap_err();
2221
2222 let (_, incr_cas) = client.increment_cas(key, 0, 10, 120, incr_cas).unwrap();
2223
2224 client
2225 .decrement_cas(key, 0, 10, 120, incr_cas + 1)
2226 .unwrap_err();
2227 client.decrement_cas(key, 0, 10, 120, incr_cas).unwrap();
2228
2229 client.delete(key).unwrap();
2230 }
2231
2232 #[test]
2233 fn test_append_prepend_cas() {
2234 const KEY: &[u8] = b"test:append_prepend_cas";
2235 let mut client = get_client();
2236
2237 let _ = client.delete(KEY);
2238
2239 let set_cas = client.set_cas(KEY, b"appended", 0, 120, 0).unwrap();
2240 client
2241 .append_cas(KEY, b"appended", set_cas + 1)
2242 .unwrap_err();
2243
2244 let ap_cas = client.append_cas(KEY, b"appended", set_cas).unwrap();
2245 client.prepend_cas(KEY, b"prepend", ap_cas + 1).unwrap_err();
2246 client.prepend_cas(KEY, b"prepend", ap_cas).unwrap();
2247
2248 client.delete(KEY).unwrap();
2249 }
2250
2251 #[test]
2252 fn test_if_noreply_failed() {
2253 let key = b"test:noreply_fail_key";
2254 let set_val = b"value";
2255 let add_val = b"just add";
2256
2257 let mut client = get_client();
2258
2259 let _ = client.delete(key);
2260
2261 client.set_noreply(key, set_val, 0xdead_beef, 120).unwrap();
2262
2263 client.add_noreply(key, add_val, 0xdead_beef, 120).unwrap();
2265
2266 let get_resp = client.get(key);
2267 assert_eq!(get_resp.unwrap(), (set_val.to_vec(), 0xdead_beef));
2268 }
2269}