memcached/proto/
binary.rs

1// Copyright (c) 2015 Y. T. Chung <zonyitoo@gmail.com>
2// Licensed under the Apache License, Version 2.0
3// <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
6// at your option. All files in the project carrying such
7// notice may not be copied, modified, or distributed except
8// according to those terms.
9
10use std::collections::{BTreeMap, HashMap};
11use std::convert::From;
12use std::error;
13use std::fmt;
14use std::io::{BufRead, BufReader, Cursor, Write};
15use std::str;
16use std::string::String;
17
18use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
19use log::debug;
20
21use crate::proto::binarydef::{
22    Command, DataType, RequestHeader, RequestPacket, RequestPacketRef, ResponsePacket,
23};
24use crate::proto::{self, AuthResponse, MemCachedResult};
25use crate::proto::{
26    AuthOperation, CasOperation, MultiOperation, NoReplyOperation, Operation, ServerOperation,
27};
28
29use semver::Version;
30
31use rand::random;
32
33pub use proto::binarydef::Status;
34
35#[derive(Debug, Clone)]
36pub struct Error {
37    status: Status,
38    desc: &'static str,
39    detail: Option<String>,
40}
41
42impl Error {
43    fn from_status(status: Status, detail: Option<String>) -> Error {
44        Error {
45            status,
46            desc: status.desc(),
47            detail,
48        }
49    }
50
51    /// Get error description
52    pub fn detail(&self) -> Option<String> {
53        self.detail.clone()
54    }
55
56    /// Get status code
57    pub fn status(&self) -> Status {
58        self.status
59    }
60}
61
62impl fmt::Display for Error {
63    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64        write!(f, "{}", self.desc)?;
65        match self.detail {
66            Some(ref s) => write!(f, " ({})", s),
67            None => Ok(()),
68        }
69    }
70}
71
72impl error::Error for Error {}
73
74pub struct BinaryProto<T: BufRead + Write + Send> {
75    stream: T,
76}
77
78// impl<T: BufRead + Write + Send> Proto for BinaryProto<T> {
79//     fn clone(&self) -> Box<Proto + Send> {
80//         box BinaryProto { stream: BufStream::new(self.stream.get_ref().clone()) }
81//     }
82// }
83
84impl<T: BufRead + Write + Send> BinaryProto<T> {
85    pub fn new(stream: T) -> BinaryProto<T> {
86        BinaryProto { stream }
87    }
88
89    fn send_noop(&mut self) -> MemCachedResult<u32> {
90        let opaque = random::<u32>();
91        debug!("Sending NOOP");
92        let req_packet = RequestPacket::new(
93            Command::Noop,
94            DataType::RawBytes,
95            0,
96            opaque,
97            0,
98            Vec::new(),
99            Vec::new(),
100            Vec::new(),
101        );
102
103        req_packet.write_to(&mut self.stream)?;
104        self.stream.flush()?;
105
106        Ok(opaque)
107    }
108}
109
110impl<T: BufRead + Write + Send> Operation for BinaryProto<T> {
111    fn set(
112        &mut self,
113        key: &[u8],
114        value: &[u8],
115        flags: u32,
116        expiration: u32,
117    ) -> MemCachedResult<()> {
118        let opaque = random::<u32>();
119        debug!(
120            "Set key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
121            key,
122            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
123            value,
124            flags,
125            expiration
126        );
127        let mut extra = [0u8; 8];
128        {
129            let mut extra_buf = Cursor::new(&mut extra[..]);
130            extra_buf.write_u32::<BigEndian>(flags)?;
131            extra_buf.write_u32::<BigEndian>(expiration)?;
132        }
133
134        let req_header = RequestHeader::from_payload(
135            Command::Set,
136            DataType::RawBytes,
137            0,
138            opaque,
139            0,
140            key,
141            &extra,
142            value,
143        );
144        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
145
146        req_packet.write_to(&mut self.stream)?;
147        self.stream.flush()?;
148
149        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
150        while resp.header.opaque != opaque {
151            debug!(
152                "Expecting opaque: {} but got {}, trying again ...",
153                opaque, resp.header.opaque
154            );
155            resp = ResponsePacket::read_from(&mut self.stream)?;
156        }
157
158        match resp.header.status {
159            Status::NoError => Ok(()),
160            _ => Err(From::from(Error::from_status(resp.header.status, None))),
161        }
162    }
163
164    fn add(
165        &mut self,
166        key: &[u8],
167        value: &[u8],
168        flags: u32,
169        expiration: u32,
170    ) -> MemCachedResult<()> {
171        let opaque = random::<u32>();
172        debug!(
173            "Add key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
174            key,
175            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
176            value,
177            flags,
178            expiration
179        );
180        let mut extra = [0u8; 8];
181        {
182            let mut extra_buf = Cursor::new(&mut extra[..]);
183            extra_buf.write_u32::<BigEndian>(flags)?;
184            extra_buf.write_u32::<BigEndian>(expiration)?;
185        }
186
187        let req_header = RequestHeader::from_payload(
188            Command::Add,
189            DataType::RawBytes,
190            0,
191            opaque,
192            0,
193            key,
194            &extra,
195            value,
196        );
197        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
198
199        req_packet.write_to(&mut self.stream)?;
200        self.stream.flush()?;
201
202        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
203        while resp.header.opaque != opaque {
204            debug!(
205                "Expecting opaque: {} but got {}, trying again ...",
206                opaque, resp.header.opaque
207            );
208            resp = ResponsePacket::read_from(&mut self.stream)?;
209        }
210
211        match resp.header.status {
212            Status::NoError => Ok(()),
213            _ => Err(From::from(Error::from_status(resp.header.status, None))),
214        }
215    }
216
217    fn delete(&mut self, key: &[u8]) -> MemCachedResult<()> {
218        let opaque = random::<u32>();
219        debug!(
220            "Delete key: {:?} {:?}",
221            key,
222            str::from_utf8(key).unwrap_or("<not-utf8-key>")
223        );
224        let req_header = RequestHeader::from_payload(
225            Command::Delete,
226            DataType::RawBytes,
227            0,
228            opaque,
229            0,
230            key,
231            &[],
232            &[],
233        );
234        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
235
236        req_packet.write_to(&mut self.stream)?;
237        self.stream.flush()?;
238
239        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
240        while resp.header.opaque != opaque {
241            debug!(
242                "Expecting opaque: {} but got {}, trying again ...",
243                opaque, resp.header.opaque
244            );
245            resp = ResponsePacket::read_from(&mut self.stream)?;
246        }
247
248        match resp.header.status {
249            Status::NoError => Ok(()),
250            _ => Err(From::from(Error::from_status(resp.header.status, None))),
251        }
252    }
253
254    fn replace(
255        &mut self,
256        key: &[u8],
257        value: &[u8],
258        flags: u32,
259        expiration: u32,
260    ) -> MemCachedResult<()> {
261        let opaque = random::<u32>();
262        debug!(
263            "Replace key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
264            key,
265            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
266            value,
267            flags,
268            expiration
269        );
270        let mut extra = [0u8; 8];
271        {
272            let mut extra_buf = Cursor::new(&mut extra[..]);
273            extra_buf.write_u32::<BigEndian>(flags)?;
274            extra_buf.write_u32::<BigEndian>(expiration)?;
275        }
276
277        let req_header = RequestHeader::from_payload(
278            Command::Replace,
279            DataType::RawBytes,
280            0,
281            opaque,
282            0,
283            key,
284            &extra,
285            value,
286        );
287        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
288
289        req_packet.write_to(&mut self.stream)?;
290        self.stream.flush()?;
291
292        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
293        while resp.header.opaque != opaque {
294            debug!(
295                "Expecting opaque: {} but got {}, trying again ...",
296                opaque, resp.header.opaque
297            );
298            resp = ResponsePacket::read_from(&mut self.stream)?;
299        }
300
301        match resp.header.status {
302            Status::NoError => Ok(()),
303            _ => Err(From::from(Error::from_status(resp.header.status, None))),
304        }
305    }
306
307    fn get(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32)> {
308        let opaque = random::<u32>();
309        debug!(
310            "Get key: {:?} {:?}",
311            key,
312            str::from_utf8(key).unwrap_or("<not-utf8-key>")
313        );
314        let req_header = RequestHeader::from_payload(
315            Command::Get,
316            DataType::RawBytes,
317            0,
318            opaque,
319            0,
320            key,
321            &[],
322            &[],
323        );
324        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
325
326        req_packet.write_to(&mut self.stream)?;
327        self.stream.flush()?;
328
329        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
330        while resp.header.opaque != opaque {
331            debug!(
332                "Expecting opaque: {} but got {}, trying again ...",
333                opaque, resp.header.opaque
334            );
335            resp = ResponsePacket::read_from(&mut self.stream)?;
336        }
337
338        match resp.header.status {
339            Status::NoError => {
340                let mut extrabufr = BufReader::new(&resp.extra[..]);
341                let flags = extrabufr.read_u32::<BigEndian>()?;
342
343                Ok((resp.value, flags))
344            }
345            _ => Err(From::from(Error::from_status(resp.header.status, None))),
346        }
347    }
348
349    fn getk(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32)> {
350        let opaque = random::<u32>();
351        debug!(
352            "GetK key: {:?} {:?}",
353            key,
354            str::from_utf8(key).unwrap_or("<not-utf8-key>")
355        );
356        let req_header = RequestHeader::from_payload(
357            Command::GetKey,
358            DataType::RawBytes,
359            0,
360            opaque,
361            0,
362            key,
363            &[],
364            &[],
365        );
366        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
367
368        req_packet.write_to(&mut self.stream)?;
369        self.stream.flush()?;
370
371        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
372        while resp.header.opaque != opaque {
373            debug!(
374                "Expecting opaque: {} but got {}, trying again ...",
375                opaque, resp.header.opaque
376            );
377            resp = ResponsePacket::read_from(&mut self.stream)?;
378        }
379
380        match resp.header.status {
381            Status::NoError => {
382                let mut extrabufr = BufReader::new(&resp.extra[..]);
383                let flags = extrabufr.read_u32::<BigEndian>()?;
384
385                Ok((resp.key, resp.value, flags))
386            }
387            _ => Err(From::from(Error::from_status(resp.header.status, None))),
388        }
389    }
390
391    fn increment(
392        &mut self,
393        key: &[u8],
394        amount: u64,
395        initial: u64,
396        expiration: u32,
397    ) -> MemCachedResult<u64> {
398        let opaque = random::<u32>();
399        debug!(
400            "Increment key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
401            key,
402            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
403            amount,
404            initial,
405            expiration
406        );
407        let mut extra = [0u8; 20];
408        {
409            let mut extra_buf = Cursor::new(&mut extra[..]);
410            extra_buf.write_u64::<BigEndian>(amount)?;
411            extra_buf.write_u64::<BigEndian>(initial)?;
412            extra_buf.write_u32::<BigEndian>(expiration)?;
413        }
414
415        let req_header = RequestHeader::from_payload(
416            Command::Increment,
417            DataType::RawBytes,
418            0,
419            opaque,
420            0,
421            key,
422            &extra,
423            &[],
424        );
425        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
426
427        req_packet.write_to(&mut self.stream)?;
428        self.stream.flush()?;
429
430        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
431        while resp.header.opaque != opaque {
432            debug!(
433                "Expecting opaque: {} but got {}, trying again ...",
434                opaque, resp.header.opaque
435            );
436            resp = ResponsePacket::read_from(&mut self.stream)?;
437        }
438
439        match resp.header.status {
440            Status::NoError => {
441                let mut bufr = BufReader::new(&resp.value[..]);
442                Ok(bufr.read_u64::<BigEndian>()?)
443            }
444            _ => Err(From::from(Error::from_status(resp.header.status, None))),
445        }
446    }
447
448    fn decrement(
449        &mut self,
450        key: &[u8],
451        amount: u64,
452        initial: u64,
453        expiration: u32,
454    ) -> MemCachedResult<u64> {
455        let opaque = random::<u32>();
456        debug!(
457            "Decrement key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
458            key,
459            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
460            amount,
461            initial,
462            expiration
463        );
464        let mut extra = [0u8; 20];
465        {
466            let mut extra_buf = Cursor::new(&mut extra[..]);
467            extra_buf.write_u64::<BigEndian>(amount)?;
468            extra_buf.write_u64::<BigEndian>(initial)?;
469            extra_buf.write_u32::<BigEndian>(expiration)?;
470        }
471
472        let req_header = RequestHeader::from_payload(
473            Command::Decrement,
474            DataType::RawBytes,
475            0,
476            opaque,
477            0,
478            key,
479            &extra,
480            &[],
481        );
482        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
483
484        req_packet.write_to(&mut self.stream)?;
485        self.stream.flush()?;
486
487        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
488        while resp.header.opaque != opaque {
489            debug!(
490                "Expecting opaque: {} but got {}, trying again ...",
491                opaque, resp.header.opaque
492            );
493            resp = ResponsePacket::read_from(&mut self.stream)?;
494        }
495
496        match resp.header.status {
497            Status::NoError => {
498                let mut bufr = BufReader::new(&resp.value[..]);
499                Ok(bufr.read_u64::<BigEndian>()?)
500            }
501            _ => Err(From::from(Error::from_status(resp.header.status, None))),
502        }
503    }
504
505    fn append(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
506        let opaque = random::<u32>();
507        debug!(
508            "Append key: {:?} {:?}, value: {:?}",
509            key,
510            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
511            value
512        );
513        let req_header = RequestHeader::from_payload(
514            Command::Append,
515            DataType::RawBytes,
516            0,
517            opaque,
518            0,
519            key,
520            &[],
521            value,
522        );
523        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
524
525        req_packet.write_to(&mut self.stream)?;
526        self.stream.flush()?;
527
528        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
529        while resp.header.opaque != opaque {
530            debug!(
531                "Expecting opaque: {} but got {}, trying again ...",
532                opaque, resp.header.opaque
533            );
534            resp = ResponsePacket::read_from(&mut self.stream)?;
535        }
536
537        match resp.header.status {
538            Status::NoError => Ok(()),
539            _ => Err(From::from(Error::from_status(resp.header.status, None))),
540        }
541    }
542
543    fn prepend(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
544        let opaque = random::<u32>();
545        debug!(
546            "Prepend key: {:?} {:?}, value: {:?}",
547            key,
548            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
549            value
550        );
551        let req_header = RequestHeader::from_payload(
552            Command::Prepend,
553            DataType::RawBytes,
554            0,
555            opaque,
556            0,
557            key,
558            &[],
559            value,
560        );
561        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
562
563        req_packet.write_to(&mut self.stream)?;
564        self.stream.flush()?;
565
566        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
567        while resp.header.opaque != opaque {
568            debug!(
569                "Expecting opaque: {} but got {}, trying again ...",
570                opaque, resp.header.opaque
571            );
572            resp = ResponsePacket::read_from(&mut self.stream)?;
573        }
574
575        match resp.header.status {
576            Status::NoError => Ok(()),
577            _ => Err(From::from(Error::from_status(resp.header.status, None))),
578        }
579    }
580
581    fn touch(&mut self, key: &[u8], expiration: u32) -> MemCachedResult<()> {
582        let opaque = random::<u32>();
583        debug!(
584            "Touch key: {:?} {:?}, expiration: {}",
585            key,
586            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
587            expiration
588        );
589        let mut extra = [0u8; 4];
590        {
591            let mut extra_buf = Cursor::new(&mut extra[..]);
592            extra_buf.write_u32::<BigEndian>(expiration)?;
593        }
594
595        let req_header = RequestHeader::from_payload(
596            Command::Touch,
597            DataType::RawBytes,
598            0,
599            opaque,
600            0,
601            key,
602            &extra,
603            &[],
604        );
605        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
606
607        req_packet.write_to(&mut self.stream)?;
608        self.stream.flush()?;
609
610        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
611        while resp.header.opaque != opaque {
612            debug!(
613                "Expecting opaque: {} but got {}, trying again ...",
614                opaque, resp.header.opaque
615            );
616            resp = ResponsePacket::read_from(&mut self.stream)?;
617        }
618
619        match resp.header.status {
620            Status::NoError => Ok(()),
621            _ => Err(From::from(Error::from_status(resp.header.status, None))),
622        }
623    }
624}
625
626impl<T: BufRead + Write + Send> ServerOperation for BinaryProto<T> {
627    fn quit(&mut self) -> MemCachedResult<()> {
628        let opaque = random::<u32>();
629        debug!("Quit");
630        let req_header = RequestHeader::from_payload(
631            Command::Quit,
632            DataType::RawBytes,
633            0,
634            opaque,
635            0,
636            &[],
637            &[],
638            &[],
639        );
640        let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
641
642        req_packet.write_to(&mut self.stream)?;
643        self.stream.flush()?;
644
645        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
646        while resp.header.opaque != opaque {
647            debug!(
648                "Expecting opaque: {} but got {}, trying again ...",
649                opaque, resp.header.opaque
650            );
651            resp = ResponsePacket::read_from(&mut self.stream)?;
652        }
653
654        match resp.header.status {
655            Status::NoError => Ok(()),
656            _ => Err(From::from(Error::from_status(resp.header.status, None))),
657        }
658    }
659
660    fn flush(&mut self, expiration: u32) -> MemCachedResult<()> {
661        let opaque = random::<u32>();
662        debug!("Expiration flush: {}", expiration);
663        let mut extra = [0u8; 4];
664        {
665            let mut extra_buf = Cursor::new(&mut extra[..]);
666            extra_buf.write_u32::<BigEndian>(expiration)?;
667        }
668
669        let req_header = RequestHeader::from_payload(
670            Command::Flush,
671            DataType::RawBytes,
672            0,
673            opaque,
674            0,
675            &[],
676            &extra,
677            &[],
678        );
679        let req_packet = RequestPacketRef::new(&req_header, &extra, &[], &[]);
680
681        req_packet.write_to(&mut self.stream)?;
682        self.stream.flush()?;
683
684        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
685        while resp.header.opaque != opaque {
686            debug!(
687                "Expecting opaque: {} but got {}, trying again ...",
688                opaque, resp.header.opaque
689            );
690            resp = ResponsePacket::read_from(&mut self.stream)?;
691        }
692
693        match resp.header.status {
694            Status::NoError => Ok(()),
695            _ => Err(From::from(Error::from_status(resp.header.status, None))),
696        }
697    }
698
699    fn noop(&mut self) -> MemCachedResult<()> {
700        debug!("Noop");
701        let opaque = self.send_noop()?;
702        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
703        while resp.header.opaque != opaque {
704            debug!(
705                "Expecting opaque: {} but got {}, trying again ...",
706                opaque, resp.header.opaque
707            );
708            resp = ResponsePacket::read_from(&mut self.stream)?;
709        }
710
711        match resp.header.status {
712            Status::NoError => Ok(()),
713            _ => Err(From::from(Error::from_status(resp.header.status, None))),
714        }
715    }
716
717    fn version(&mut self) -> MemCachedResult<Version> {
718        let opaque = random::<u32>();
719        debug!("Version");
720        let req_header =
721            RequestHeader::new(Command::Version, DataType::RawBytes, 0, opaque, 0, 0, 0, 0);
722        let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
723
724        req_packet.write_to(&mut self.stream)?;
725        self.stream.flush()?;
726
727        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
728        while resp.header.opaque != opaque {
729            debug!(
730                "Expecting opaque: {} but got {}, trying again ...",
731                opaque, resp.header.opaque
732            );
733            resp = ResponsePacket::read_from(&mut self.stream)?;
734        }
735
736        match resp.header.status {
737            Status::NoError => {
738                let val = resp.value;
739                let verstr = match str::from_utf8(&val[..]) {
740                    Ok(vs) => vs,
741                    Err(..) => {
742                        return Err(proto::Error::OtherError {
743                            desc: "Response is not a string",
744                            detail: None,
745                        })
746                    }
747                };
748
749                Ok(match Version::parse(verstr) {
750                    Ok(v) => v,
751                    Err(err) => {
752                        return Err(proto::Error::OtherError {
753                            desc: "Unrecognized version string",
754                            detail: Some(err.to_string()),
755                        })
756                    }
757                })
758            }
759            _ => Err(From::from(Error::from_status(resp.header.status, None))),
760        }
761    }
762
763    fn stat(&mut self) -> MemCachedResult<BTreeMap<String, String>> {
764        let opaque = random::<u32>();
765        debug!("Stat");
766        let req_header =
767            RequestHeader::new(Command::Stat, DataType::RawBytes, 0, opaque, 0, 0, 0, 0);
768        let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
769
770        req_packet.write_to(&mut self.stream)?;
771        self.stream.flush()?;
772
773        let mut result = BTreeMap::new();
774        loop {
775            let resp = ResponsePacket::read_from(&mut self.stream)?;
776            if resp.header.opaque != opaque {
777                debug!(
778                    "Expecting opaque: {} but got {}, trying again ...",
779                    opaque, resp.header.opaque
780                );
781                continue;
782            }
783            match resp.header.status {
784                Status::NoError => {}
785                _ => return Err(From::from(Error::from_status(resp.header.status, None))),
786            }
787
788            if resp.key.is_empty() && resp.value.is_empty() {
789                break;
790            }
791
792            let key = match String::from_utf8(resp.key) {
793                Ok(k) => k,
794                Err(..) => {
795                    return Err(proto::Error::OtherError {
796                        desc: "Key is not a string",
797                        detail: None,
798                    })
799                }
800            };
801
802            let val = match String::from_utf8(resp.value) {
803                Ok(k) => k,
804                Err(..) => {
805                    return Err(proto::Error::OtherError {
806                        desc: "Value is not a string",
807                        detail: None,
808                    })
809                }
810            };
811
812            result.insert(key, val);
813        }
814
815        Ok(result)
816    }
817}
818
819impl<T: BufRead + Write + Send> MultiOperation for BinaryProto<T> {
820    fn set_multi(&mut self, kv: BTreeMap<&[u8], (&[u8], u32, u32)>) -> MemCachedResult<()> {
821        for (key, (value, flags, expiration)) in kv.into_iter() {
822            let mut extra = [0u8; 8];
823            {
824                let mut extra_buf = Cursor::new(&mut extra[..]);
825                extra_buf.write_u32::<BigEndian>(flags)?;
826                extra_buf.write_u32::<BigEndian>(expiration)?;
827            }
828
829            let req_header = RequestHeader::from_payload(
830                Command::SetQuietly,
831                DataType::RawBytes,
832                0,
833                0,
834                0,
835                key,
836                &extra,
837                value,
838            );
839            let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
840
841            req_packet.write_to(&mut self.stream)?;
842        }
843        self.send_noop()?;
844
845        loop {
846            let resp = ResponsePacket::read_from(&mut self.stream)?;
847
848            match resp.header.status {
849                Status::NoError => {}
850                _ => return Err(From::from(Error::from_status(resp.header.status, None))),
851            }
852
853            if resp.header.command == Command::Noop {
854                return Ok(());
855            }
856        }
857    }
858
859    fn delete_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<()> {
860        for key in keys.iter() {
861            let req_header = RequestHeader::from_payload(
862                Command::DeleteQuietly,
863                DataType::RawBytes,
864                0,
865                0,
866                0,
867                *key,
868                &[],
869                &[],
870            );
871            let req_packet = RequestPacketRef::new(&req_header, &[], *key, &[]);
872
873            req_packet.write_to(&mut self.stream)?;
874        }
875        self.send_noop()?;
876
877        loop {
878            let resp = ResponsePacket::read_from(&mut self.stream)?;
879
880            match resp.header.status {
881                Status::NoError | Status::KeyNotFound => {}
882                _ => return Err(From::from(Error::from_status(resp.header.status, None))),
883            }
884
885            if resp.header.command == Command::Noop {
886                return Ok(());
887            }
888        }
889    }
890
891    fn increment_multi<'a>(
892        &mut self,
893        kv: HashMap<&'a [u8], (u64, u64, u32)>,
894    ) -> MemCachedResult<HashMap<&'a [u8], u64>> {
895        let opaques: MemCachedResult<HashMap<_, _>> = kv
896            .into_iter()
897            .map(|(key, (amount, initial, expiration))| {
898                let opaque = random::<u32>();
899                let mut extra = [0u8; 20];
900                {
901                    let mut extra_buf = Cursor::new(&mut extra[..]);
902                    extra_buf.write_u64::<BigEndian>(amount)?;
903                    extra_buf.write_u64::<BigEndian>(initial)?;
904                    extra_buf.write_u32::<BigEndian>(expiration)?;
905                }
906
907                let req_header = RequestHeader::from_payload(
908                    Command::Increment,
909                    DataType::RawBytes,
910                    0,
911                    opaque,
912                    0,
913                    key,
914                    &extra,
915                    &[],
916                );
917                let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
918
919                req_packet.write_to(&mut self.stream)?;
920                Ok((opaque, key))
921            })
922            .collect();
923
924        let opaques = opaques?;
925
926        self.send_noop()?;
927        self.stream.flush()?;
928
929        let mut results = HashMap::with_capacity(opaques.len());
930        loop {
931            let resp = ResponsePacket::read_from(&mut self.stream)?;
932            match resp.header.status {
933                Status::NoError => {}
934                _ => return Err(From::from(Error::from_status(resp.header.status, None))),
935            }
936
937            if resp.header.command == Command::Noop {
938                return Ok(results);
939            }
940
941            if let Some(key) = opaques.get(&resp.header.opaque) {
942                let mut bufr = BufReader::new(&resp.value[..]);
943                let val = bufr.read_u64::<BigEndian>()?;
944                results.insert(key, val);
945            }
946        }
947    }
948
949    fn get_multi(&mut self, keys: &[&[u8]]) -> MemCachedResult<HashMap<Vec<u8>, (Vec<u8>, u32)>> {
950        for key in keys.iter() {
951            let req_header = RequestHeader::from_payload(
952                Command::GetKeyQuietly,
953                DataType::RawBytes,
954                0,
955                0,
956                0,
957                *key,
958                &[],
959                &[],
960            );
961            let req_packet = RequestPacketRef::new(&req_header, &[], *key, &[]);
962
963            req_packet.write_to(&mut self.stream)?;
964        }
965        self.send_noop()?;
966
967        let mut result = HashMap::with_capacity(keys.len());
968        loop {
969            let resp = ResponsePacket::read_from(&mut self.stream)?;
970            match resp.header.status {
971                Status::NoError => {}
972                _ => return Err(From::from(Error::from_status(resp.header.status, None))),
973            }
974
975            if resp.header.command == Command::Noop {
976                return Ok(result);
977            }
978
979            let mut extrabufr = BufReader::new(&resp.extra[..]);
980            let flags = extrabufr.read_u32::<BigEndian>()?;
981
982            result.insert(resp.key, (resp.value, flags));
983        }
984    }
985}
986
987impl<T: BufRead + Write + Send> NoReplyOperation for BinaryProto<T> {
988    fn set_noreply(
989        &mut self,
990        key: &[u8],
991        value: &[u8],
992        flags: u32,
993        expiration: u32,
994    ) -> MemCachedResult<()> {
995        let opaque = random::<u32>();
996        debug!(
997            "Set noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
998            key,
999            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1000            value,
1001            flags,
1002            expiration
1003        );
1004        let mut extra = [0u8; 8];
1005        {
1006            let mut extra_buf = Cursor::new(&mut extra[..]);
1007            extra_buf.write_u32::<BigEndian>(flags)?;
1008            extra_buf.write_u32::<BigEndian>(expiration)?;
1009        }
1010
1011        let req_header = RequestHeader::from_payload(
1012            Command::SetQuietly,
1013            DataType::RawBytes,
1014            0,
1015            opaque,
1016            0,
1017            key,
1018            &extra,
1019            value,
1020        );
1021        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1022
1023        req_packet.write_to(&mut self.stream)?;
1024        self.stream.flush()?;
1025
1026        Ok(())
1027    }
1028
1029    fn add_noreply(
1030        &mut self,
1031        key: &[u8],
1032        value: &[u8],
1033        flags: u32,
1034        expiration: u32,
1035    ) -> MemCachedResult<()> {
1036        let opaque = random::<u32>();
1037        debug!(
1038            "Add noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1039            key,
1040            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1041            value,
1042            flags,
1043            expiration
1044        );
1045        let mut extra = [0u8; 8];
1046        {
1047            let mut extra_buf = Cursor::new(&mut extra[..]);
1048            extra_buf.write_u32::<BigEndian>(flags)?;
1049            extra_buf.write_u32::<BigEndian>(expiration)?;
1050        }
1051
1052        let req_header = RequestHeader::from_payload(
1053            Command::AddQuietly,
1054            DataType::RawBytes,
1055            0,
1056            opaque,
1057            0,
1058            key,
1059            &extra,
1060            value,
1061        );
1062        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1063
1064        req_packet.write_to(&mut self.stream)?;
1065        self.stream.flush()?;
1066
1067        Ok(())
1068    }
1069
1070    fn delete_noreply(&mut self, key: &[u8]) -> MemCachedResult<()> {
1071        let opaque = random::<u32>();
1072        debug!(
1073            "Delete noreply key: {:?} {:?}",
1074            key,
1075            str::from_utf8(key).unwrap_or("<not-utf8-key>")
1076        );
1077        let req_header = RequestHeader::from_payload(
1078            Command::DeleteQuietly,
1079            DataType::RawBytes,
1080            0,
1081            opaque,
1082            0,
1083            key,
1084            &[],
1085            &[],
1086        );
1087        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1088
1089        req_packet.write_to(&mut self.stream)?;
1090        self.stream.flush()?;
1091
1092        Ok(())
1093    }
1094
1095    fn replace_noreply(
1096        &mut self,
1097        key: &[u8],
1098        value: &[u8],
1099        flags: u32,
1100        expiration: u32,
1101    ) -> MemCachedResult<()> {
1102        let opaque = random::<u32>();
1103        debug!(
1104            "Replace noreply key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1105            key,
1106            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1107            value,
1108            flags,
1109            expiration
1110        );
1111        let mut extra = [0u8; 8];
1112        {
1113            let mut extra_buf = Cursor::new(&mut extra[..]);
1114            extra_buf.write_u32::<BigEndian>(flags)?;
1115            extra_buf.write_u32::<BigEndian>(expiration)?;
1116        }
1117
1118        let req_header = RequestHeader::from_payload(
1119            Command::ReplaceQuietly,
1120            DataType::RawBytes,
1121            0,
1122            opaque,
1123            0,
1124            key,
1125            &extra,
1126            value,
1127        );
1128        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1129
1130        req_packet.write_to(&mut self.stream)?;
1131        self.stream.flush()?;
1132
1133        Ok(())
1134    }
1135
1136    fn increment_noreply(
1137        &mut self,
1138        key: &[u8],
1139        amount: u64,
1140        initial: u64,
1141        expiration: u32,
1142    ) -> MemCachedResult<()> {
1143        let opaque = random::<u32>();
1144        debug!(
1145            "Increment noreply key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
1146            key,
1147            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1148            amount,
1149            initial,
1150            expiration
1151        );
1152        let mut extra = [0u8; 20];
1153        {
1154            let mut extra_buf = Cursor::new(&mut extra[..]);
1155            extra_buf.write_u64::<BigEndian>(amount)?;
1156            extra_buf.write_u64::<BigEndian>(initial)?;
1157            extra_buf.write_u32::<BigEndian>(expiration)?;
1158        }
1159
1160        let req_header = RequestHeader::from_payload(
1161            Command::IncrementQuietly,
1162            DataType::RawBytes,
1163            0,
1164            opaque,
1165            0,
1166            key,
1167            &extra,
1168            &[],
1169        );
1170        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1171
1172        req_packet.write_to(&mut self.stream)?;
1173        self.stream.flush()?;
1174
1175        Ok(())
1176    }
1177
1178    fn decrement_noreply(
1179        &mut self,
1180        key: &[u8],
1181        amount: u64,
1182        initial: u64,
1183        expiration: u32,
1184    ) -> MemCachedResult<()> {
1185        let opaque = random::<u32>();
1186        debug!(
1187            "Decrement noreply key: {:?} {:?}, amount: {}, initial: {}, expiration: {}",
1188            key,
1189            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1190            amount,
1191            initial,
1192            expiration
1193        );
1194        let mut extra = [0u8; 20];
1195        {
1196            let mut extra_buf = Cursor::new(&mut extra[..]);
1197            extra_buf.write_u64::<BigEndian>(amount)?;
1198            extra_buf.write_u64::<BigEndian>(initial)?;
1199            extra_buf.write_u32::<BigEndian>(expiration)?;
1200        }
1201
1202        let req_header = RequestHeader::from_payload(
1203            Command::DecrementQuietly,
1204            DataType::RawBytes,
1205            0,
1206            opaque,
1207            0,
1208            key,
1209            &extra,
1210            &[],
1211        );
1212        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1213
1214        req_packet.write_to(&mut self.stream)?;
1215        self.stream.flush()?;
1216
1217        Ok(())
1218    }
1219
1220    fn append_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
1221        let opaque = random::<u32>();
1222        debug!(
1223            "Append noreply key: {:?} {:?}, value: {:?}",
1224            key,
1225            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1226            value
1227        );
1228        let req_header = RequestHeader::from_payload(
1229            Command::AppendQuietly,
1230            DataType::RawBytes,
1231            0,
1232            opaque,
1233            0,
1234            key,
1235            &[],
1236            value,
1237        );
1238        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1239
1240        req_packet.write_to(&mut self.stream)?;
1241        self.stream.flush()?;
1242
1243        Ok(())
1244    }
1245
1246    fn prepend_noreply(&mut self, key: &[u8], value: &[u8]) -> MemCachedResult<()> {
1247        let opaque = random::<u32>();
1248        debug!(
1249            "Prepend noreply key: {:?} {:?}, value: {:?}",
1250            key,
1251            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1252            value
1253        );
1254        let req_header = RequestHeader::from_payload(
1255            Command::PrependQuietly,
1256            DataType::RawBytes,
1257            0,
1258            opaque,
1259            0,
1260            key,
1261            &[],
1262            value,
1263        );
1264        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1265
1266        req_packet.write_to(&mut self.stream)?;
1267        self.stream.flush()?;
1268
1269        Ok(())
1270    }
1271}
1272
1273impl<T: BufRead + Write + Send> CasOperation for BinaryProto<T> {
1274    fn set_cas(
1275        &mut self,
1276        key: &[u8],
1277        value: &[u8],
1278        flags: u32,
1279        expiration: u32,
1280        cas: u64,
1281    ) -> MemCachedResult<u64> {
1282        let opaque = random::<u32>();
1283        debug!(
1284            "Set cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}, cas: {}",
1285            key,
1286            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1287            value,
1288            flags,
1289            expiration,
1290            cas
1291        );
1292        let mut extra = [0u8; 8];
1293        {
1294            let mut extra_buf = Cursor::new(&mut extra[..]);
1295            extra_buf.write_u32::<BigEndian>(flags)?;
1296            extra_buf.write_u32::<BigEndian>(expiration)?;
1297        }
1298
1299        let req_header = RequestHeader::from_payload(
1300            Command::Set,
1301            DataType::RawBytes,
1302            0,
1303            opaque,
1304            cas,
1305            key,
1306            &extra,
1307            value,
1308        );
1309        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1310
1311        req_packet.write_to(&mut self.stream)?;
1312        self.stream.flush()?;
1313
1314        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1315        while resp.header.opaque != opaque {
1316            debug!(
1317                "Expecting opaque: {} but got {}, trying again ...",
1318                opaque, resp.header.opaque
1319            );
1320            resp = ResponsePacket::read_from(&mut self.stream)?;
1321        }
1322
1323        match resp.header.status {
1324            Status::NoError => Ok(resp.header.cas),
1325            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1326        }
1327    }
1328
1329    fn add_cas(
1330        &mut self,
1331        key: &[u8],
1332        value: &[u8],
1333        flags: u32,
1334        expiration: u32,
1335    ) -> MemCachedResult<u64> {
1336        let opaque = random::<u32>();
1337        debug!(
1338            "Add cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}",
1339            key,
1340            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1341            value,
1342            flags,
1343            expiration
1344        );
1345        let mut extra = [0u8; 8];
1346        {
1347            let mut extra_buf = Cursor::new(&mut extra[..]);
1348            extra_buf.write_u32::<BigEndian>(flags)?;
1349            extra_buf.write_u32::<BigEndian>(expiration)?;
1350        }
1351
1352        let req_header = RequestHeader::from_payload(
1353            Command::Add,
1354            DataType::RawBytes,
1355            0,
1356            opaque,
1357            0,
1358            key,
1359            &extra,
1360            value,
1361        );
1362        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1363
1364        req_packet.write_to(&mut self.stream)?;
1365        self.stream.flush()?;
1366
1367        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1368        while resp.header.opaque != opaque {
1369            debug!(
1370                "Expecting opaque: {} but got {}, trying again ...",
1371                opaque, resp.header.opaque
1372            );
1373            resp = ResponsePacket::read_from(&mut self.stream)?;
1374        }
1375
1376        match resp.header.status {
1377            Status::NoError => Ok(resp.header.cas),
1378            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1379        }
1380    }
1381
1382    fn replace_cas(
1383        &mut self,
1384        key: &[u8],
1385        value: &[u8],
1386        flags: u32,
1387        expiration: u32,
1388        cas: u64,
1389    ) -> MemCachedResult<u64> {
1390        let opaque = random::<u32>();
1391        debug!(
1392            "Replace cas key: {:?} {:?}, value: {:?}, flags: 0x{:x}, expiration: {}, cas: {}",
1393            key,
1394            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1395            value,
1396            flags,
1397            expiration,
1398            cas
1399        );
1400        let mut extra = [0u8; 8];
1401        {
1402            let mut extra_buf = Cursor::new(&mut extra[..]);
1403            extra_buf.write_u32::<BigEndian>(flags)?;
1404            extra_buf.write_u32::<BigEndian>(expiration)?;
1405        }
1406
1407        let req_header = RequestHeader::from_payload(
1408            Command::Replace,
1409            DataType::RawBytes,
1410            0,
1411            opaque,
1412            cas,
1413            key,
1414            &extra,
1415            value,
1416        );
1417        let req_packet = RequestPacketRef::new(&req_header, &extra, key, value);
1418
1419        req_packet.write_to(&mut self.stream)?;
1420        self.stream.flush()?;
1421
1422        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1423        while resp.header.opaque != opaque {
1424            debug!(
1425                "Expecting opaque: {} but got {}, trying again ...",
1426                opaque, resp.header.opaque
1427            );
1428            resp = ResponsePacket::read_from(&mut self.stream)?;
1429        }
1430
1431        match resp.header.status {
1432            Status::NoError => Ok(resp.header.cas),
1433            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1434        }
1435    }
1436
1437    fn get_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, u32, u64)> {
1438        let opaque = random::<u32>();
1439        debug!(
1440            "Get cas key: {:?} {:?}",
1441            key,
1442            str::from_utf8(key).unwrap_or("<not-utf8-key>")
1443        );
1444        let req_header = RequestHeader::from_payload(
1445            Command::Get,
1446            DataType::RawBytes,
1447            0,
1448            opaque,
1449            0,
1450            key,
1451            &[],
1452            &[],
1453        );
1454        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1455
1456        req_packet.write_to(&mut self.stream)?;
1457        self.stream.flush()?;
1458
1459        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1460        while resp.header.opaque != opaque {
1461            debug!(
1462                "Expecting opaque: {} but got {}, trying again ...",
1463                opaque, resp.header.opaque
1464            );
1465            resp = ResponsePacket::read_from(&mut self.stream)?;
1466        }
1467
1468        match resp.header.status {
1469            Status::NoError => {
1470                let mut extrabufr = BufReader::new(&resp.extra[..]);
1471                let flags = extrabufr.read_u32::<BigEndian>()?;
1472
1473                Ok((resp.value, flags, resp.header.cas))
1474            }
1475            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1476        }
1477    }
1478
1479    fn getk_cas(&mut self, key: &[u8]) -> MemCachedResult<(Vec<u8>, Vec<u8>, u32, u64)> {
1480        let opaque = random::<u32>();
1481        debug!(
1482            "GetK cas key: {:?} {:?}",
1483            key,
1484            str::from_utf8(key).unwrap_or("<not-utf8-key>")
1485        );
1486        let req_header = RequestHeader::from_payload(
1487            Command::GetKey,
1488            DataType::RawBytes,
1489            0,
1490            opaque,
1491            0,
1492            key,
1493            &[],
1494            &[],
1495        );
1496        let req_packet = RequestPacketRef::new(&req_header, &[], key, &[]);
1497
1498        req_packet.write_to(&mut self.stream)?;
1499        self.stream.flush()?;
1500
1501        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1502        while resp.header.opaque != opaque {
1503            debug!(
1504                "Expecting opaque: {} but got {}, trying again ...",
1505                opaque, resp.header.opaque
1506            );
1507            resp = ResponsePacket::read_from(&mut self.stream)?;
1508        }
1509
1510        match resp.header.status {
1511            Status::NoError => {
1512                let mut extrabufr = BufReader::new(&resp.extra[..]);
1513                let flags = extrabufr.read_u32::<BigEndian>()?;
1514
1515                Ok((resp.key, resp.value, flags, resp.header.cas))
1516            }
1517            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1518        }
1519    }
1520
1521    fn increment_cas(
1522        &mut self,
1523        key: &[u8],
1524        amount: u64,
1525        initial: u64,
1526        expiration: u32,
1527        cas: u64,
1528    ) -> MemCachedResult<(u64, u64)> {
1529        let opaque = random::<u32>();
1530        debug!(
1531            "Increment cas key: {:?} {:?}, amount: {}, initial: {}, expiration: {}, cas: {}",
1532            key,
1533            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1534            amount,
1535            initial,
1536            expiration,
1537            cas
1538        );
1539        let mut extra = [0u8; 20];
1540        {
1541            let mut extra_buf = Cursor::new(&mut extra[..]);
1542            extra_buf.write_u64::<BigEndian>(amount)?;
1543            extra_buf.write_u64::<BigEndian>(initial)?;
1544            extra_buf.write_u32::<BigEndian>(expiration)?;
1545        }
1546
1547        let req_header = RequestHeader::from_payload(
1548            Command::Increment,
1549            DataType::RawBytes,
1550            0,
1551            opaque,
1552            cas,
1553            key,
1554            &extra,
1555            &[],
1556        );
1557        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1558
1559        req_packet.write_to(&mut self.stream)?;
1560        self.stream.flush()?;
1561
1562        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1563        while resp.header.opaque != opaque {
1564            debug!(
1565                "Expecting opaque: {} but got {}, trying again ...",
1566                opaque, resp.header.opaque
1567            );
1568            resp = ResponsePacket::read_from(&mut self.stream)?;
1569        }
1570
1571        match resp.header.status {
1572            Status::NoError => {
1573                let mut bufr = BufReader::new(&resp.value[..]);
1574                Ok((bufr.read_u64::<BigEndian>()?, resp.header.cas))
1575            }
1576            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1577        }
1578    }
1579
1580    fn decrement_cas(
1581        &mut self,
1582        key: &[u8],
1583        amount: u64,
1584        initial: u64,
1585        expiration: u32,
1586        cas: u64,
1587    ) -> MemCachedResult<(u64, u64)> {
1588        let opaque = random::<u32>();
1589        debug!(
1590            "Decrement cas key: {:?} {:?}, amount: {}, initial: {}, expiration: {}, cas: {}",
1591            key,
1592            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1593            amount,
1594            initial,
1595            expiration,
1596            cas
1597        );
1598        let mut extra = [0u8; 20];
1599        {
1600            let mut extra_buf = Cursor::new(&mut extra[..]);
1601            extra_buf.write_u64::<BigEndian>(amount)?;
1602            extra_buf.write_u64::<BigEndian>(initial)?;
1603            extra_buf.write_u32::<BigEndian>(expiration)?;
1604        }
1605
1606        let req_header = RequestHeader::from_payload(
1607            Command::Decrement,
1608            DataType::RawBytes,
1609            0,
1610            opaque,
1611            cas,
1612            key,
1613            &extra,
1614            &[],
1615        );
1616        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1617
1618        req_packet.write_to(&mut self.stream)?;
1619        self.stream.flush()?;
1620
1621        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1622        while resp.header.opaque != opaque {
1623            debug!(
1624                "Expecting opaque: {} but got {}, trying again ...",
1625                opaque, resp.header.opaque
1626            );
1627            resp = ResponsePacket::read_from(&mut self.stream)?;
1628        }
1629
1630        match resp.header.status {
1631            Status::NoError => {
1632                let mut bufr = BufReader::new(&resp.value[..]);
1633                Ok((bufr.read_u64::<BigEndian>()?, resp.header.cas))
1634            }
1635            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1636        }
1637    }
1638
1639    fn append_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
1640        let opaque = random::<u32>();
1641        debug!(
1642            "Append cas key: {:?} {:?}, value: {:?}, cas: {}",
1643            key,
1644            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1645            value,
1646            cas
1647        );
1648        let req_header = RequestHeader::from_payload(
1649            Command::Append,
1650            DataType::RawBytes,
1651            0,
1652            opaque,
1653            cas,
1654            key,
1655            &[],
1656            value,
1657        );
1658        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1659
1660        req_packet.write_to(&mut self.stream)?;
1661        self.stream.flush()?;
1662
1663        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1664        while resp.header.opaque != opaque {
1665            debug!(
1666                "Expecting opaque: {} but got {}, trying again ...",
1667                opaque, resp.header.opaque
1668            );
1669            resp = ResponsePacket::read_from(&mut self.stream)?;
1670        }
1671
1672        match resp.header.status {
1673            Status::NoError => Ok(resp.header.cas),
1674            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1675        }
1676    }
1677
1678    fn prepend_cas(&mut self, key: &[u8], value: &[u8], cas: u64) -> MemCachedResult<u64> {
1679        let opaque = random::<u32>();
1680        debug!(
1681            "Prepend cas key: {:?} {:?}, value: {:?}, cas: {}",
1682            key,
1683            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1684            value,
1685            cas
1686        );
1687        let req_header = RequestHeader::from_payload(
1688            Command::Prepend,
1689            DataType::RawBytes,
1690            0,
1691            opaque,
1692            cas,
1693            key,
1694            &[],
1695            value,
1696        );
1697        let req_packet = RequestPacketRef::new(&req_header, &[], key, value);
1698
1699        req_packet.write_to(&mut self.stream)?;
1700        self.stream.flush()?;
1701
1702        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1703        while resp.header.opaque != opaque {
1704            debug!(
1705                "Expecting opaque: {} but got {}, trying again ...",
1706                opaque, resp.header.opaque
1707            );
1708            resp = ResponsePacket::read_from(&mut self.stream)?;
1709        }
1710
1711        match resp.header.status {
1712            Status::NoError => Ok(resp.header.cas),
1713            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1714        }
1715    }
1716
1717    fn touch_cas(&mut self, key: &[u8], expiration: u32, cas: u64) -> MemCachedResult<u64> {
1718        let opaque = random::<u32>();
1719        debug!(
1720            "Touch cas key: {:?} {:?}, expiration: {:?}, cas: {}",
1721            key,
1722            str::from_utf8(key).unwrap_or("<not-utf8-key>"),
1723            expiration,
1724            cas
1725        );
1726        let mut extra = [0u8; 4];
1727        {
1728            let mut extra_buf = Cursor::new(&mut extra[..]);
1729            extra_buf.write_u32::<BigEndian>(expiration)?;
1730        }
1731
1732        let req_header = RequestHeader::from_payload(
1733            Command::Touch,
1734            DataType::RawBytes,
1735            0,
1736            opaque,
1737            cas,
1738            key,
1739            &extra,
1740            &[],
1741        );
1742        let req_packet = RequestPacketRef::new(&req_header, &extra, key, &[]);
1743
1744        req_packet.write_to(&mut self.stream)?;
1745        self.stream.flush()?;
1746
1747        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1748        while resp.header.opaque != opaque {
1749            debug!(
1750                "Expecting opaque: {} but got {}, trying again ...",
1751                opaque, resp.header.opaque
1752            );
1753            resp = ResponsePacket::read_from(&mut self.stream)?;
1754        }
1755
1756        match resp.header.status {
1757            Status::NoError => Ok(resp.header.cas),
1758            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1759        }
1760    }
1761}
1762
1763impl<T: BufRead + Write + Send> AuthOperation for BinaryProto<T> {
1764    fn list_mechanisms(&mut self) -> MemCachedResult<Vec<String>> {
1765        let opaque = random::<u32>();
1766        debug!("List mechanisms");
1767        let req_header = RequestHeader::new(
1768            Command::SaslListMechanisms,
1769            DataType::RawBytes,
1770            0,
1771            opaque,
1772            0,
1773            0,
1774            0,
1775            0,
1776        );
1777        let req_packet = RequestPacketRef::new(&req_header, &[], &[], &[]);
1778        req_packet.write_to(&mut self.stream)?;
1779        self.stream.flush()?;
1780
1781        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1782        while resp.header.opaque != opaque {
1783            debug!(
1784                "Expecting opaque: {} but got {}, trying again ...",
1785                opaque, resp.header.opaque
1786            );
1787            resp = ResponsePacket::read_from(&mut self.stream)?;
1788        }
1789
1790        match resp.header.status {
1791            Status::NoError => {}
1792            _ => return Err(From::from(Error::from_status(resp.header.status, None))),
1793        }
1794
1795        match str::from_utf8(&resp.value[..]) {
1796            Ok(s) => Ok(s.split(' ').map(|mech| mech.to_string()).collect()),
1797            Err(..) => Err(proto::Error::OtherError {
1798                desc: "Mechanism decode error",
1799                detail: None,
1800            }),
1801        }
1802    }
1803
1804    fn auth_start(&mut self, mech: &str, init: &[u8]) -> MemCachedResult<AuthResponse> {
1805        let opaque = random::<u32>();
1806        debug!("Auth start, mechanism: {:?}, init: {:?}", mech, init);
1807        let req_header = RequestHeader::from_payload(
1808            Command::SaslAuthenticate,
1809            DataType::RawBytes,
1810            0,
1811            opaque,
1812            0,
1813            mech.as_bytes(),
1814            &[],
1815            init,
1816        );
1817        let req_packet = RequestPacketRef::new(&req_header, &[], mech.as_bytes(), init);
1818        req_packet.write_to(&mut self.stream)?;
1819        self.stream.flush()?;
1820
1821        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1822        while resp.header.opaque != opaque {
1823            debug!(
1824                "Expecting opaque: {} but got {}, trying again ...",
1825                opaque, resp.header.opaque
1826            );
1827            resp = ResponsePacket::read_from(&mut self.stream)?;
1828        }
1829
1830        match resp.header.status {
1831            Status::AuthenticationFurtherStepRequired => Ok(AuthResponse::Continue(resp.value)),
1832            Status::NoError => Ok(AuthResponse::Succeeded),
1833            Status::AuthenticationRequired => Ok(AuthResponse::Failed),
1834            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1835        }
1836    }
1837
1838    fn auth_continue(&mut self, mech: &str, data: &[u8]) -> MemCachedResult<AuthResponse> {
1839        let opaque = random::<u32>();
1840        debug!("Auth continue, mechanism: {:?}, data: {:?}", mech, data);
1841        let req_header = RequestHeader::from_payload(
1842            Command::SaslStep,
1843            DataType::RawBytes,
1844            0,
1845            opaque,
1846            0,
1847            mech.as_bytes(),
1848            &[],
1849            data,
1850        );
1851        let req_packet = RequestPacketRef::new(&req_header, &[], mech.as_bytes(), data);
1852        req_packet.write_to(&mut self.stream)?;
1853        self.stream.flush()?;
1854
1855        let mut resp = ResponsePacket::read_from(&mut self.stream)?;
1856        while resp.header.opaque != opaque {
1857            debug!(
1858                "Expecting opaque: {} but got {}, trying again ...",
1859                opaque, resp.header.opaque
1860            );
1861            resp = ResponsePacket::read_from(&mut self.stream)?;
1862        }
1863
1864        match resp.header.status {
1865            Status::AuthenticationFurtherStepRequired => Ok(AuthResponse::Continue(resp.value)),
1866            Status::NoError => Ok(AuthResponse::Succeeded),
1867            Status::AuthenticationRequired => Ok(AuthResponse::Failed),
1868            _ => Err(From::from(Error::from_status(resp.header.status, None))),
1869        }
1870    }
1871}
1872
1873#[cfg(test)]
1874mod test {
1875    use crate::proto::{
1876        BinaryProto, CasOperation, MultiOperation, NoReplyOperation, Operation, ServerOperation,
1877    };
1878    use std::collections::{BTreeMap, HashMap};
1879    use std::net::TcpStream;
1880
1881    use bufstream::BufStream;
1882
1883    const SERVER_ADDR: &str = "127.0.0.1:11211";
1884
1885    fn get_client() -> BinaryProto<BufStream<TcpStream>> {
1886        let stream = TcpStream::connect(SERVER_ADDR).unwrap();
1887        BinaryProto::new(BufStream::new(stream))
1888    }
1889
1890    #[test]
1891    fn test_set_get_delete() {
1892        const KEY: &[u8] = b"test:set_get_delete";
1893        const VAL: &[u8] = b"world";
1894
1895        let mut client = get_client();
1896        client.set(KEY, VAL, 0xdead_beef, 120).unwrap();
1897
1898        let get_resp = client.get(KEY);
1899        assert_eq!(get_resp.unwrap(), (VAL.to_vec(), 0xdead_beef));
1900
1901        let getk_resp = client.getk(KEY);
1902        assert_eq!(
1903            getk_resp.unwrap(),
1904            (KEY.to_vec(), VAL.to_vec(), 0xdead_beef)
1905        );
1906
1907        client.delete(KEY).unwrap();
1908    }
1909
1910    #[test]
1911    fn test_incr_decr() {
1912        const KEY: &[u8] = b"test:incr_decr";
1913
1914        let mut client = get_client();
1915        let _ = client.delete(KEY);
1916        {
1917            let incr_resp = client.increment(KEY, 1, 0, 120);
1918            assert_eq!(incr_resp.unwrap(), 0);
1919        }
1920
1921        {
1922            let incr_resp = client.increment(KEY, 10, 0, 120);
1923            assert_eq!(incr_resp.unwrap(), 10);
1924        }
1925
1926        {
1927            let decr_resp = client.decrement(KEY, 5, 0, 120);
1928            assert_eq!(decr_resp.unwrap(), 5);
1929        }
1930
1931        {
1932            let decr_resp = client.decrement(KEY, 20, 0, 120);
1933            assert_eq!(decr_resp.unwrap(), 0);
1934        }
1935
1936        client.delete(KEY).unwrap();
1937    }
1938
1939    #[test]
1940    fn test_version() {
1941        let mut client = get_client();
1942        client.version().unwrap();
1943    }
1944
1945    #[test]
1946    fn test_noop() {
1947        let mut client = get_client();
1948        client.noop().unwrap();
1949    }
1950
1951    #[test]
1952    #[should_panic]
1953    fn test_quit() {
1954        let mut client = get_client();
1955        client.quit().unwrap();
1956
1957        client.noop().unwrap();
1958    }
1959
1960    #[test]
1961    fn test_flush() {
1962        let mut client = get_client();
1963        client.flush(2).unwrap();
1964    }
1965
1966    #[test]
1967    fn test_add() {
1968        const KEY: &[u8] = b"test:add";
1969        const INIT_VAL: &[u8] = b"initial";
1970        const ADD_VAL: &[u8] = b"added";
1971
1972        let mut client = get_client();
1973
1974        let _ = client.delete(KEY);
1975
1976        client.add(KEY, INIT_VAL, 0xdead_beef, 120).unwrap();
1977
1978        {
1979            let get_resp = client.get(KEY);
1980            assert_eq!(get_resp.unwrap(), (INIT_VAL.to_vec(), 0xdead_beef));
1981            client.add(KEY, ADD_VAL, 0xdead_beef, 120).unwrap_err();
1982        }
1983
1984        client.delete(KEY).unwrap();
1985    }
1986
1987    #[test]
1988    fn test_replace() {
1989        let mut client = get_client();
1990        client
1991            .replace(b"test:replace_key", b"replaced", 0xdead_beef, 120)
1992            .unwrap_err();
1993        client
1994            .add(b"test:replace_key", b"just_add", 0xdead_beef, 120)
1995            .unwrap();
1996        client
1997            .replace(b"test:replace_key", b"replaced", 0xdead_beef, 120)
1998            .unwrap();
1999        client.delete(b"test:replace_key").unwrap();
2000    }
2001
2002    #[test]
2003    fn test_append_prepend() {
2004        let mut client = get_client();
2005        client.append(b"test:append_key", b"appended").unwrap_err();
2006        client
2007            .prepend(b"test:append_key", b"prepended")
2008            .unwrap_err();
2009
2010        {
2011            client
2012                .add(b"test:append_key", b"just_add", 0xdead_beef, 120)
2013                .unwrap();
2014            client.append(b"test:append_key", b"appended").unwrap();
2015
2016            let get_resp = client.get(b"test:append_key");
2017            assert_eq!(
2018                get_resp.unwrap(),
2019                (b"just_addappended".to_vec(), 0xdead_beef)
2020            );
2021
2022            client.prepend(b"test:append_key", b"prepended").unwrap();
2023            let get_resp = client.get(b"test:append_key");
2024            assert_eq!(
2025                get_resp.unwrap(),
2026                (b"prependedjust_addappended".to_vec(), 0xdead_beef)
2027            );
2028        }
2029
2030        client.delete(b"test:append_key").unwrap();
2031    }
2032
2033    #[test]
2034    fn test_stat() {
2035        let mut client = get_client();
2036        client.stat().unwrap();
2037    }
2038
2039    #[test]
2040    fn test_touch() {
2041        let mut client = get_client();
2042        client.touch(b"test:touch", 120).unwrap_err();
2043        client.add(b"test:touch", b"val", 0xcafe_babe, 100).unwrap();
2044        client.touch(b"test:touch", 120).unwrap();
2045        client.delete(b"test:touch").unwrap();
2046    }
2047
2048    #[test]
2049    fn test_set_get_delete_incr_muti() {
2050        let mut client = get_client();
2051
2052        let mut data = BTreeMap::new();
2053        data.insert(
2054            &b"test:multi_hello1"[..],
2055            (&b"world1"[..], 0xdead_beef, 120),
2056        );
2057        data.insert(
2058            &b"test:multi_hello2"[..],
2059            (&b"world2"[..], 0xdead_beef, 120),
2060        );
2061        data.insert(&b"test:multi_num1"[..], (&b"100"[..], 0xdead_beef, 120));
2062        data.insert(&b"test:multi_num2"[..], (&b"200"[..], 0xdead_beef, 120));
2063        data.insert(
2064            &b"test:multi_lastone"[..],
2065            (&b"last!"[..], 0xdead_beef, 120),
2066        );
2067
2068        client.set_multi(data).unwrap();
2069
2070        let get_resp_map = client
2071            .get_multi(&[
2072                b"test:multi_hello1",
2073                b"test:multi_hello2",
2074                b"test:multi_lastone",
2075            ])
2076            .unwrap();
2077        assert_eq!(
2078            get_resp_map.get(&b"test:multi_hello1".to_vec()),
2079            Some(&(b"world1".to_vec(), 0xdead_beef))
2080        );
2081        assert_eq!(
2082            get_resp_map.get(&b"test:multi_hello2".to_vec()),
2083            Some(&(b"world2".to_vec(), 0xdead_beef))
2084        );
2085        assert_eq!(
2086            get_resp_map.get(&b"test:multi_lastone".to_vec()),
2087            Some(&(b"last!".to_vec(), 0xdead_beef))
2088        );
2089
2090        client
2091            .delete_multi(&[
2092                b"test:multi_hello1",
2093                b"test:multi_hello2",
2094                b"test:multi_num3",
2095            ])
2096            .unwrap();
2097
2098        let get_resp_map = client
2099            .get_multi(&[
2100                b"test:multi_hello1",
2101                b"test:multi_hello2",
2102                b"test:multi_lastone",
2103            ])
2104            .unwrap();
2105        assert_eq!(get_resp_map.get(&b"test:multi_hello1".to_vec()), None);
2106        assert_eq!(get_resp_map.get(&b"test:multi_hello2".to_vec()), None);
2107        assert_eq!(
2108            get_resp_map.get(&b"test:multi_lastone".to_vec()),
2109            Some(&(b"last!".to_vec(), 0xdead_beef))
2110        );
2111
2112        let mut data = HashMap::new();
2113        data.insert(&b"test:multi_num1"[..], (10, 50, 120));
2114        data.insert(&b"test:multi_num2"[..], (20, 50, 120));
2115        data.insert(&b"test:multi_num3"[..], (30, 50, 120));
2116        client.increment_multi(data).unwrap();
2117
2118        let get_resp_map = client
2119            .get_multi(&[b"test:multi_num1", b"test:multi_num2", b"test:multi_num3"])
2120            .unwrap();
2121        assert_eq!(
2122            get_resp_map.get(&b"test:multi_num1".to_vec()),
2123            Some(&(b"110".to_vec(), 0xdead_beef))
2124        );
2125        assert_eq!(
2126            get_resp_map.get(&b"test:multi_num2".to_vec()),
2127            Some(&(b"220".to_vec(), 0xdead_beef))
2128        );
2129        assert_eq!(
2130            get_resp_map.get(&b"test:multi_num3".to_vec()),
2131            Some(&(b"50".to_vec(), 0x0))
2132        );
2133
2134        client
2135            .delete_multi(&[b"lastone", b"not_exists!!!!"])
2136            .unwrap();
2137    }
2138
2139    #[test]
2140    fn test_set_add_replace_noreply() {
2141        let key = b"test:noreply_key";
2142        let set_val = b"value";
2143        let add_val = b"just add";
2144        let rep_val = b"replaced";
2145
2146        let mut client = get_client();
2147
2148        let _ = client.delete(key);
2149
2150        client.add_noreply(key, add_val, 0xdead_beef, 120).unwrap();
2151
2152        let get_resp = client.get(key);
2153        assert_eq!(get_resp.unwrap(), (add_val.to_vec(), 0xdead_beef));
2154
2155        client.set_noreply(key, set_val, 0xdead_beef, 120).unwrap();
2156
2157        let get_resp = client.get(key);
2158        assert_eq!(get_resp.unwrap(), (set_val.to_vec(), 0xdead_beef));
2159
2160        client
2161            .replace_noreply(key, rep_val, 0xcafe_babe, 120)
2162            .unwrap();
2163
2164        let get_resp = client.get(key);
2165        assert_eq!(get_resp.unwrap(), (rep_val.to_vec(), 0xcafe_babe));
2166
2167        client.delete(key).unwrap();
2168    }
2169
2170    #[test]
2171    fn test_set_add_replace_cas() {
2172        let key = b"test:cas_key";
2173        let set_val = b"value";
2174        let add_val = b"just add";
2175        let rep_val = b"replaced";
2176
2177        let mut client = get_client();
2178
2179        let add_cas = client.add_cas(key, add_val, 0xdead_beef, 120).unwrap();
2180
2181        {
2182            client
2183                .set_cas(key, set_val, 0xdead_beef, 120, add_cas + 1)
2184                .unwrap_err();
2185
2186            let (_, _, get_cas) = client.get_cas(key).unwrap();
2187            assert_eq!(get_cas, add_cas);
2188
2189            client
2190                .replace_cas(key, rep_val, 0xdead_beef, 120, add_cas + 1)
2191                .unwrap_err();
2192        }
2193
2194        {
2195            let set_cas = client
2196                .set_cas(key, set_val, 0xdead_beef, 120, add_cas)
2197                .unwrap();
2198            let (_, _, get_cas) = client.get_cas(key).unwrap();
2199            assert_eq!(get_cas, set_cas);
2200
2201            client
2202                .replace_cas(key, rep_val, 0xdead_beef, 120, set_cas)
2203                .unwrap();
2204        }
2205
2206        client.delete(key).unwrap();
2207    }
2208
2209    #[test]
2210    fn test_incr_decr_cas() {
2211        let key = b"test:incr_decr_cas";
2212        let mut client = get_client();
2213
2214        let _ = client.delete(key);
2215
2216        let (_, incr_cas) = client.increment_cas(key, 0, 100, 120, 0).unwrap();
2217
2218        client
2219            .increment_cas(key, 0, 10, 120, incr_cas + 1)
2220            .unwrap_err();
2221
2222        let (_, incr_cas) = client.increment_cas(key, 0, 10, 120, incr_cas).unwrap();
2223
2224        client
2225            .decrement_cas(key, 0, 10, 120, incr_cas + 1)
2226            .unwrap_err();
2227        client.decrement_cas(key, 0, 10, 120, incr_cas).unwrap();
2228
2229        client.delete(key).unwrap();
2230    }
2231
2232    #[test]
2233    fn test_append_prepend_cas() {
2234        const KEY: &[u8] = b"test:append_prepend_cas";
2235        let mut client = get_client();
2236
2237        let _ = client.delete(KEY);
2238
2239        let set_cas = client.set_cas(KEY, b"appended", 0, 120, 0).unwrap();
2240        client
2241            .append_cas(KEY, b"appended", set_cas + 1)
2242            .unwrap_err();
2243
2244        let ap_cas = client.append_cas(KEY, b"appended", set_cas).unwrap();
2245        client.prepend_cas(KEY, b"prepend", ap_cas + 1).unwrap_err();
2246        client.prepend_cas(KEY, b"prepend", ap_cas).unwrap();
2247
2248        client.delete(KEY).unwrap();
2249    }
2250
2251    #[test]
2252    fn test_if_noreply_failed() {
2253        let key = b"test:noreply_fail_key";
2254        let set_val = b"value";
2255        let add_val = b"just add";
2256
2257        let mut client = get_client();
2258
2259        let _ = client.delete(key);
2260
2261        client.set_noreply(key, set_val, 0xdead_beef, 120).unwrap();
2262
2263        // Should failed, because key is already set
2264        client.add_noreply(key, add_val, 0xdead_beef, 120).unwrap();
2265
2266        let get_resp = client.get(key);
2267        assert_eq!(get_resp.unwrap(), (set_val.to_vec(), 0xdead_beef));
2268    }
2269}