1#![allow(dead_code)]
39#![allow(clippy::too_many_arguments)]
40
41use std::io::{self, Read, Write};
42
43use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
44
45#[rustfmt::skip]
46mod consts {
47 pub const MAGIC_REQUEST: u8 = 0x80;
48 pub const MAGIC_RESPONSE: u8 = 0x81;
49
50 pub const STATUS_NO_ERROR: u16 = 0x0000;
51 pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001;
52 pub const STATUS_KEY_EXISTS: u16 = 0x0002;
53 pub const STATUS_VALUE_TOO_LARGE: u16 = 0x0003;
54 pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004;
55 pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005;
56 pub const STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE: u16 = 0x0006;
57 pub const STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER: u16 = 0x0007;
58 pub const STATUS_AUTHENTICATION_ERROR: u16 = 0x0008;
59 pub const STATUS_AUTHENTICATION_CONTINUE: u16 = 0x0009;
60 pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081;
61 pub const STATUS_OUT_OF_MEMORY: u16 = 0x0082;
62 pub const STATUS_NOT_SUPPORTED: u16 = 0x0083;
63 pub const STATUS_INTERNAL_ERROR: u16 = 0x0084;
64 pub const STATUS_BUSY: u16 = 0x0085;
65 pub const STATUS_TEMPORARY_FAILURE: u16 = 0x0086;
66 pub const STATUS_AUTHENTICATION_REQUIRED: u16 = 0x0020;
67 pub const STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED: u16 = 0x0021;
68
69 pub const OPCODE_GET: u8 = 0x00;
70 pub const OPCODE_SET: u8 = 0x01;
71 pub const OPCODE_ADD: u8 = 0x02;
72 pub const OPCODE_REPLACE: u8 = 0x03;
73 pub const OPCODE_DEL: u8 = 0x04;
74 pub const OPCODE_INCR: u8 = 0x05;
75 pub const OPCODE_DECR: u8 = 0x06;
76 pub const OPCODE_QUIT: u8 = 0x07;
77 pub const OPCODE_FLUSH: u8 = 0x08;
78 pub const OPCODE_GETQ: u8 = 0x09;
79 pub const OPCODE_NOP: u8 = 0x0A;
80 pub const OPCODE_VERSION: u8 = 0x0B;
81 pub const OPCODE_GETK: u8 = 0x0C;
82 pub const OPCODE_GETKQ: u8 = 0x0D;
83 pub const OPCODE_APPEND: u8 = 0x0E;
84 pub const OPCODE_PREPEND: u8 = 0x0F;
85 pub const OPCODE_STAT: u8 = 0x10;
86 pub const OPCODE_SETQ: u8 = 0x11;
87 pub const OPCODE_ADDQ: u8 = 0x12;
88 pub const OPCODE_REPLACEQ: u8 = 0x13;
89 pub const OPCODE_DELQ: u8 = 0x14;
90 pub const OPCODE_INCRQ: u8 = 0x15;
91 pub const OPCODE_DECRQ: u8 = 0x16;
92 pub const OPCODE_QUITQ: u8 = 0x17;
93 pub const OPCODE_FLUSHQ: u8 = 0x18;
94 pub const OPCODE_APPENDQ: u8 = 0x19;
95 pub const OPCODE_PREPENDQ: u8 = 0x1A;
96 pub const OPCODE_VERBOSITY: u8 = 0x1B;
97 pub const OPCODE_TOUCH: u8 = 0x1C;
98 pub const OPCODE_GAT: u8 = 0x1D;
99 pub const OPCODE_GATQ: u8 = 0x1E;
100 pub const OPCODE_SASL_LIST_MECHS: u8 = 0x20;
101 pub const OPCODE_SASL_AUTH: u8 = 0x21;
102 pub const OPCODE_SASL_STEP: u8 = 0x22;
103 pub const OPCODE_RGET: u8 = 0x30;
104 pub const OPCODE_RSET: u8 = 0x31;
105 pub const OPCODE_RSETQ: u8 = 0x32;
106 pub const OPCODE_RAPPEND: u8 = 0x33;
107 pub const OPCODE_RAPPENDQ: u8 = 0x34;
108 pub const OPCODE_RPREPEND: u8 = 0x35;
109 pub const OPCODE_RPREPENDQ: u8 = 0x36;
110 pub const OPCODE_RDEL: u8 = 0x37;
111 pub const OPCODE_RDELQ: u8 = 0x38;
112 pub const OPCODE_RINCR: u8 = 0x39;
113 pub const OPCODE_RINCRQ: u8 = 0x3A;
114 pub const OPCODE_RDECR: u8 = 0x3B;
115 pub const OPCODE_RDECRQ: u8 = 0x3C;
116 pub const OPCODE_SET_VBUCKET: u8 = 0x3D;
117 pub const OPCODE_GET_VBUCKET: u8 = 0x3E;
118 pub const OPCODE_DEL_VBUCKET: u8 = 0x3F;
119 pub const OPCODE_TAP_CONNECT: u8 = 0x40;
120 pub const OPCODE_TAP_MUTATION: u8 = 0x41;
121 pub const OPCODE_TAP_DEL: u8 = 0x42;
122 pub const OPCODE_TAP_FLUSH: u8 = 0x43;
123 pub const OPCODE_TAP_OPAQUE: u8 = 0x44;
124 pub const OPCODE_TAP_VBUCKET_SET: u8 = 0x45;
125 pub const OPCODE_TAP_CHECKPOINT_START: u8 = 0x46;
126 pub const OPCODE_TAP_CHECKPOINT_END: u8 = 0x47;
127
128 pub const DATA_TYPE_RAW_BYTES: u8 = 0x00;
129}
130
131#[derive(Copy, Clone, Debug, Eq, PartialEq)]
133#[repr(u16)]
134#[rustfmt::skip]
135pub enum Status {
136 NoError = consts::STATUS_NO_ERROR,
137 KeyNotFound = consts::STATUS_KEY_NOT_FOUND,
138 KeyExists = consts::STATUS_KEY_EXISTS,
139 ValueTooLarge = consts::STATUS_VALUE_TOO_LARGE,
140 InvalidArguments = consts::STATUS_INVALID_ARGUMENTS,
141 ItemNotStored = consts::STATUS_ITEM_NOT_STORED,
142 IncrDecrOnNonNumericValue = consts::STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE,
143 VBucketBelongsToOtherServer = consts::STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER,
144 AuthenticationError = consts::STATUS_AUTHENTICATION_ERROR,
145 AuthenticationContinue = consts::STATUS_AUTHENTICATION_CONTINUE,
146 UnknownCommand = consts::STATUS_UNKNOWN_COMMAND,
147 OutOfMemory = consts::STATUS_OUT_OF_MEMORY,
148 NotSupported = consts::STATUS_NOT_SUPPORTED,
149 InternalError = consts::STATUS_INTERNAL_ERROR,
150 Busy = consts::STATUS_BUSY,
151 TemporaryFailure = consts::STATUS_TEMPORARY_FAILURE,
152 AuthenticationRequired = consts::STATUS_AUTHENTICATION_REQUIRED,
153 AuthenticationFurtherStepRequired = consts::STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED,
154}
155
156impl Status {
157 #[inline]
159 pub fn to_u16(self) -> u16 {
160 self as u16
161 }
162
163 #[inline]
165 #[rustfmt::skip]
166 pub fn from_u16(code: u16) -> Option<Status> {
167 match code {
168 consts::STATUS_NO_ERROR => Some(Status::NoError),
169 consts::STATUS_KEY_NOT_FOUND => Some(Status::KeyNotFound),
170 consts::STATUS_KEY_EXISTS => Some(Status::KeyExists),
171 consts::STATUS_VALUE_TOO_LARGE => Some(Status::ValueTooLarge),
172 consts::STATUS_INVALID_ARGUMENTS => Some(Status::InvalidArguments),
173 consts::STATUS_ITEM_NOT_STORED => Some(Status::ItemNotStored),
174 consts::STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE => Some(Status::IncrDecrOnNonNumericValue),
175 consts::STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER => Some(Status::VBucketBelongsToOtherServer),
176 consts::STATUS_AUTHENTICATION_ERROR => Some(Status::AuthenticationError),
177 consts::STATUS_AUTHENTICATION_CONTINUE => Some(Status::AuthenticationContinue),
178 consts::STATUS_UNKNOWN_COMMAND => Some(Status::UnknownCommand),
179 consts::STATUS_OUT_OF_MEMORY => Some(Status::OutOfMemory),
180 consts::STATUS_NOT_SUPPORTED => Some(Status::NotSupported),
181 consts::STATUS_INTERNAL_ERROR => Some(Status::InternalError),
182 consts::STATUS_BUSY => Some(Status::Busy),
183 consts::STATUS_TEMPORARY_FAILURE => Some(Status::TemporaryFailure),
184 consts::STATUS_AUTHENTICATION_REQUIRED => Some(Status::AuthenticationRequired),
185 consts::STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED => Some(Status::AuthenticationFurtherStepRequired),
186 _ => None,
187 }
188 }
189
190 #[inline]
192 #[rustfmt::skip]
193 pub fn desc(self) -> &'static str {
194 match self {
195 Status::NoError => "no error",
196 Status::KeyNotFound => "key not found",
197 Status::KeyExists => "key exists",
198 Status::ValueTooLarge => "value too large",
199 Status::InvalidArguments => "invalid argument",
200 Status::ItemNotStored => "item not stored",
201 Status::IncrDecrOnNonNumericValue => "incr or decr on non-numeric value",
202 Status::VBucketBelongsToOtherServer => "vbucket belongs to other server",
203 Status::AuthenticationError => "authentication error",
204 Status::AuthenticationContinue => "authentication continue",
205 Status::UnknownCommand => "unknown command",
206 Status::OutOfMemory => "out of memory",
207 Status::NotSupported => "not supported",
208 Status::InternalError => "internal error",
209 Status::Busy => "busy",
210 Status::TemporaryFailure => "temporary failure",
211 Status::AuthenticationRequired => "authentication required/not successful",
212 Status::AuthenticationFurtherStepRequired => "further authentication steps required",
213 }
214 }
215}
216
217#[derive(Clone, Copy, Debug, Eq, PartialEq)]
218#[repr(u8)]
219#[rustfmt::skip]
220pub enum Command {
221 Get = consts::OPCODE_GET,
222 Set = consts::OPCODE_SET,
223 Add = consts::OPCODE_ADD,
224 Replace = consts::OPCODE_REPLACE,
225 Delete = consts::OPCODE_DEL,
226 Increment = consts::OPCODE_INCR,
227 Decrement = consts::OPCODE_DECR,
228 Quit = consts::OPCODE_QUIT,
229 Flush = consts::OPCODE_FLUSH,
230 GetQuietly = consts::OPCODE_GETQ,
231 Noop = consts::OPCODE_NOP,
232 Version = consts::OPCODE_VERSION,
233 GetKey = consts::OPCODE_GETK,
234 GetKeyQuietly = consts::OPCODE_GETKQ,
235 Append = consts::OPCODE_APPEND,
236 Prepend = consts::OPCODE_PREPEND,
237 Stat = consts::OPCODE_STAT,
238 SetQuietly = consts::OPCODE_SETQ,
239 AddQuietly = consts::OPCODE_ADDQ,
240 ReplaceQuietly = consts::OPCODE_REPLACEQ,
241 DeleteQuietly = consts::OPCODE_DELQ,
242 IncrementQuietly = consts::OPCODE_INCRQ,
243 DecrementQuietly = consts::OPCODE_DECRQ,
244 QuitQuietly = consts::OPCODE_QUITQ,
245 FlushQuietly = consts::OPCODE_FLUSHQ,
246 AppendQuietly = consts::OPCODE_APPENDQ,
247 PrependQuietly = consts::OPCODE_PREPENDQ,
248 Verbosity = consts::OPCODE_VERBOSITY,
249 Touch = consts::OPCODE_TOUCH,
250 GetAndTouch = consts::OPCODE_GAT,
251 GetAndTouchQuietly = consts::OPCODE_GATQ,
252 SaslListMechanisms = consts::OPCODE_SASL_LIST_MECHS,
253 SaslAuthenticate = consts::OPCODE_SASL_AUTH,
254 SaslStep = consts::OPCODE_SASL_STEP,
255 RGet = consts::OPCODE_RGET,
256 RSet = consts::OPCODE_RSET,
257 RSetQuietly = consts::OPCODE_RSETQ,
258 RAppend = consts::OPCODE_RAPPEND,
259 RAppendQuietly = consts::OPCODE_RAPPENDQ,
260 RPrepend = consts::OPCODE_RPREPEND,
261 RPrependQuietly = consts::OPCODE_RPREPENDQ,
262 RDelete = consts::OPCODE_RDEL,
263 RDeleteQuietly = consts::OPCODE_RDELQ,
264 RIncrement = consts::OPCODE_RINCR,
265 RIncrementQuietly = consts::OPCODE_RINCRQ,
266 RDecrement = consts::OPCODE_RDECR,
267 RDecrementQuietly = consts::OPCODE_RDECRQ,
268 SetVBucket = consts::OPCODE_SET_VBUCKET,
269 GetVBucket = consts::OPCODE_GET_VBUCKET,
270 DelVBucket = consts::OPCODE_DEL_VBUCKET,
271 TapConnect = consts::OPCODE_TAP_CONNECT,
272 TapMutation = consts::OPCODE_TAP_MUTATION,
273 TapDelete = consts::OPCODE_TAP_DEL,
274 TapFlush = consts::OPCODE_TAP_FLUSH,
275 TapOpaque = consts::OPCODE_TAP_OPAQUE,
276 TapVBucketSet = consts::OPCODE_TAP_VBUCKET_SET,
277 TapCheckpointStart = consts::OPCODE_TAP_CHECKPOINT_START,
278 TapCheckpointEnd = consts::OPCODE_TAP_CHECKPOINT_END,
279}
280
281impl Command {
282 #[inline]
283 fn to_u8(self) -> u8 {
284 self as u8
285 }
286
287 #[inline]
288 #[rustfmt::skip]
289 fn from_u8(code: u8) -> Option<Command> {
290 match code {
291 consts::OPCODE_GET => Some(Command::Get),
292 consts::OPCODE_SET => Some(Command::Set),
293 consts::OPCODE_ADD => Some(Command::Add),
294 consts::OPCODE_REPLACE => Some(Command::Replace),
295 consts::OPCODE_DEL => Some(Command::Delete),
296 consts::OPCODE_INCR => Some(Command::Increment),
297 consts::OPCODE_DECR => Some(Command::Decrement),
298 consts::OPCODE_QUIT => Some(Command::Quit),
299 consts::OPCODE_FLUSH => Some(Command::Flush),
300 consts::OPCODE_GETQ => Some(Command::GetQuietly),
301 consts::OPCODE_NOP => Some(Command::Noop),
302 consts::OPCODE_VERSION => Some(Command::Version),
303 consts::OPCODE_GETK => Some(Command::GetKey),
304 consts::OPCODE_GETKQ => Some(Command::GetKeyQuietly),
305 consts::OPCODE_APPEND => Some(Command::Append),
306 consts::OPCODE_PREPEND => Some(Command::Prepend),
307 consts::OPCODE_STAT => Some(Command::Stat),
308 consts::OPCODE_SETQ => Some(Command::SetQuietly),
309 consts::OPCODE_ADDQ => Some(Command::AddQuietly),
310 consts::OPCODE_REPLACEQ => Some(Command::ReplaceQuietly),
311 consts::OPCODE_DELQ => Some(Command::DeleteQuietly),
312 consts::OPCODE_INCRQ => Some(Command::IncrementQuietly),
313 consts::OPCODE_DECRQ => Some(Command::DecrementQuietly),
314 consts::OPCODE_QUITQ => Some(Command::QuitQuietly),
315 consts::OPCODE_FLUSHQ => Some(Command::FlushQuietly),
316 consts::OPCODE_APPENDQ => Some(Command::AppendQuietly),
317 consts::OPCODE_PREPENDQ => Some(Command::PrependQuietly),
318 consts::OPCODE_VERBOSITY => Some(Command::Verbosity),
319 consts::OPCODE_TOUCH => Some(Command::Touch),
320 consts::OPCODE_GAT => Some(Command::GetAndTouch),
321 consts::OPCODE_GATQ => Some(Command::GetAndTouchQuietly),
322 consts::OPCODE_SASL_LIST_MECHS => Some(Command::SaslListMechanisms),
323 consts::OPCODE_SASL_AUTH => Some(Command::SaslAuthenticate),
324 consts::OPCODE_SASL_STEP => Some(Command::SaslStep),
325 consts::OPCODE_RGET => Some(Command::RGet),
326 consts::OPCODE_RSET => Some(Command::RSet),
327 consts::OPCODE_RSETQ => Some(Command::RSetQuietly),
328 consts::OPCODE_RAPPEND => Some(Command::RAppend),
329 consts::OPCODE_RAPPENDQ => Some(Command::RAppendQuietly),
330 consts::OPCODE_RPREPEND => Some(Command::RPrepend),
331 consts::OPCODE_RPREPENDQ => Some(Command::RPrependQuietly),
332 consts::OPCODE_RDEL => Some(Command::RDelete),
333 consts::OPCODE_RDELQ => Some(Command::RDeleteQuietly),
334 consts::OPCODE_RINCR => Some(Command::RIncrement),
335 consts::OPCODE_RINCRQ => Some(Command::RIncrementQuietly),
336 consts::OPCODE_RDECR => Some(Command::RDecrement),
337 consts::OPCODE_RDECRQ => Some(Command::RDecrementQuietly),
338 consts::OPCODE_SET_VBUCKET => Some(Command::SetVBucket),
339 consts::OPCODE_GET_VBUCKET => Some(Command::GetVBucket),
340 consts::OPCODE_DEL_VBUCKET => Some(Command::DelVBucket),
341 consts::OPCODE_TAP_CONNECT => Some(Command::TapConnect),
342 consts::OPCODE_TAP_MUTATION => Some(Command::TapMutation),
343 consts::OPCODE_TAP_DEL => Some(Command::TapDelete),
344 consts::OPCODE_TAP_FLUSH => Some(Command::TapFlush),
345 consts::OPCODE_TAP_OPAQUE => Some(Command::TapOpaque),
346 consts::OPCODE_TAP_VBUCKET_SET => Some(Command::TapVBucketSet),
347 consts::OPCODE_TAP_CHECKPOINT_START => Some(Command::TapCheckpointStart),
348 consts::OPCODE_TAP_CHECKPOINT_END => Some(Command::TapCheckpointEnd),
349 _ => None,
350 }
351 }
352}
353
354#[derive(Copy, Clone, Debug, Eq, PartialEq)]
355pub enum DataType {
356 RawBytes,
357}
358
359impl DataType {
360 #[inline]
361 fn to_u8(self) -> u8 {
362 match self {
363 DataType::RawBytes => consts::DATA_TYPE_RAW_BYTES,
364 }
365 }
366
367 #[inline]
368 fn from_u8(code: u8) -> Option<DataType> {
369 match code {
370 consts::DATA_TYPE_RAW_BYTES => Some(DataType::RawBytes),
371 _ => None,
372 }
373 }
374}
375
376#[derive(Clone, Debug)]
393pub struct RequestHeader {
394 pub command: Command,
395 key_len: u16,
396 extra_len: u8,
397 pub data_type: DataType,
398 pub vbucket_id: u16,
399 body_len: u32,
400 pub opaque: u32,
401 pub cas: u64,
402}
403
404impl RequestHeader {
405 pub fn new(
406 cmd: Command,
407 dtype: DataType,
408 vbid: u16,
409 opaque: u32,
410 cas: u64,
411 key_len: u16,
412 extra_len: u8,
413 body_len: u32,
414 ) -> RequestHeader {
415 RequestHeader {
416 command: cmd,
417 key_len,
418 extra_len,
419 data_type: dtype,
420 vbucket_id: vbid,
421 body_len,
422 opaque,
423 cas,
424 }
425 }
426
427 pub fn from_payload(
428 cmd: Command,
429 dtype: DataType,
430 vbid: u16,
431 opaque: u32,
432 cas: u64,
433 key: &[u8],
434 extra: &[u8],
435 value: &[u8],
436 ) -> RequestHeader {
437 let key_len = key.len() as u16;
438 let extra_len = extra.len() as u8;
439 let body_len = (key.len() + extra.len() + value.len()) as u32;
440
441 RequestHeader::new(cmd, dtype, vbid, opaque, cas, key_len, extra_len, body_len)
442 }
443
444 #[inline]
445 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
446 writer.write_u8(consts::MAGIC_REQUEST)?;
447 writer.write_u8(self.command.to_u8())?;
448 writer.write_u16::<BigEndian>(self.key_len)?;
449 writer.write_u8(self.extra_len)?;
450 writer.write_u8(self.data_type.to_u8())?;
451 writer.write_u16::<BigEndian>(self.vbucket_id)?;
452 writer.write_u32::<BigEndian>(self.body_len)?;
453 writer.write_u32::<BigEndian>(self.opaque)?;
454 writer.write_u64::<BigEndian>(self.cas)?;
455
456 Ok(())
457 }
458
459 #[inline]
460 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<RequestHeader> {
461 let magic = reader.read_u8()?;
462
463 if magic != consts::MAGIC_REQUEST {
464 return Err(io::Error::new(io::ErrorKind::Other, "Invalid magic"));
465 }
466
467 Ok(RequestHeader {
468 command: match Command::from_u8(reader.read_u8()?) {
469 Some(c) => c,
470 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid command")),
471 },
472 key_len: reader.read_u16::<BigEndian>()?,
473 extra_len: reader.read_u8()?,
474 data_type: match DataType::from_u8(reader.read_u8()?) {
475 Some(d) => d,
476 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid data type")),
477 },
478 vbucket_id: reader.read_u16::<BigEndian>()?,
479 body_len: reader.read_u32::<BigEndian>()?,
480 opaque: reader.read_u32::<BigEndian>()?,
481 cas: reader.read_u64::<BigEndian>()?,
482 })
483 }
484}
485
486#[derive(Clone, Debug)]
503pub struct ResponseHeader {
504 pub command: Command,
505 key_len: u16,
506 extra_len: u8,
507 pub data_type: DataType,
508 pub status: Status,
509 body_len: u32,
510 pub opaque: u32,
511 pub cas: u64,
512}
513
514impl ResponseHeader {
515 pub fn new(
516 command: Command,
517 data_type: DataType,
518 status: Status,
519 opaque: u32,
520 cas: u64,
521 key_len: u16,
522 extra_len: u8,
523 body_len: u32,
524 ) -> ResponseHeader {
525 ResponseHeader {
526 command,
527 key_len,
528 extra_len,
529 data_type,
530 status,
531 body_len,
532 opaque,
533 cas,
534 }
535 }
536
537 pub fn from_payload(
538 cmd: Command,
539 dtype: DataType,
540 status: Status,
541 opaque: u32,
542 cas: u64,
543 key: &[u8],
544 extra: &[u8],
545 value: &[u8],
546 ) -> ResponseHeader {
547 let key_len = key.len() as u16;
548 let extra_len = extra.len() as u8;
549 let body_len = (key.len() + extra.len() + value.len()) as u32;
550
551 ResponseHeader::new(
552 cmd, dtype, status, opaque, cas, key_len, extra_len, body_len,
553 )
554 }
555
556 #[inline]
557 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
558 writer.write_u8(consts::MAGIC_RESPONSE)?;
559 writer.write_u8(self.command.to_u8())?;
560 writer.write_u16::<BigEndian>(self.key_len)?;
561 writer.write_u8(self.extra_len)?;
562 writer.write_u8(self.data_type.to_u8())?;
563 writer.write_u16::<BigEndian>(self.status.to_u16())?;
564 writer.write_u32::<BigEndian>(self.body_len)?;
565 writer.write_u32::<BigEndian>(self.opaque)?;
566 writer.write_u64::<BigEndian>(self.cas)?;
567
568 Ok(())
569 }
570
571 #[inline]
572 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<ResponseHeader> {
573 let magic = reader.read_u8()?;
574
575 if magic != consts::MAGIC_RESPONSE {
576 return Err(io::Error::new(io::ErrorKind::Other, "Invalid magic"));
577 }
578
579 Ok(ResponseHeader {
580 command: match Command::from_u8(reader.read_u8()?) {
581 Some(c) => c,
582 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid command")),
583 },
584 key_len: reader.read_u16::<BigEndian>()?,
585 extra_len: reader.read_u8()?,
586 data_type: match DataType::from_u8(reader.read_u8()?) {
587 Some(d) => d,
588 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid data type")),
589 },
590 status: match Status::from_u16(reader.read_u16::<BigEndian>()?) {
591 Some(s) => s,
592 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid status")),
593 },
594 body_len: reader.read_u32::<BigEndian>()?,
595 opaque: reader.read_u32::<BigEndian>()?,
596 cas: reader.read_u64::<BigEndian>()?,
597 })
598 }
599}
600
601#[derive(Clone, Debug)]
602pub struct RequestPacket {
603 pub header: RequestHeader,
604 pub extra: Vec<u8>,
605 pub key: Vec<u8>,
606 pub value: Vec<u8>,
607}
608
609impl RequestPacket {
610 pub fn new(
611 cmd: Command,
612 dtype: DataType,
613 vbid: u16,
614 opaque: u32,
615 cas: u64,
616 extra: Vec<u8>,
617 key: Vec<u8>,
618 value: Vec<u8>,
619 ) -> RequestPacket {
620 RequestPacket {
621 header: RequestHeader::from_payload(
622 cmd,
623 dtype,
624 vbid,
625 opaque,
626 cas,
627 &key[..],
628 &extra[..],
629 &value[..],
630 ),
631 extra,
632 key,
633 value,
634 }
635 }
636
637 #[inline]
638 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
639 self.header.write_to(writer)?;
640 writer.write_all(&self.extra[..])?;
641 writer.write_all(&self.key[..])?;
642 writer.write_all(&self.value[..])?;
643
644 Ok(())
645 }
646
647 #[inline]
648 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<RequestPacket> {
649 let header = RequestHeader::read_from(reader)?;
650
651 let extra_len = header.extra_len as usize;
652 let key_len = header.key_len as usize;
653 let value_len = header.body_len as usize - extra_len - key_len;
654
655 let extra = {
656 let mut buf = Vec::with_capacity(extra_len as usize);
657 reader.take(extra_len as u64).read_to_end(&mut buf)?;
658 buf
659 };
660
661 let key = {
662 let mut buf = Vec::with_capacity(key_len as usize);
663 reader.take(key_len as u64).read_to_end(&mut buf)?;
664 buf
665 };
666
667 let value = {
668 let mut buf = Vec::with_capacity(value_len as usize);
669 reader.take(value_len as u64).read_to_end(&mut buf)?;
670 buf
671 };
672
673 Ok(RequestPacket {
674 header,
675 extra,
676 key,
677 value,
678 })
679 }
680
681 pub fn as_ref(&self) -> RequestPacketRef<'_> {
682 RequestPacketRef::new(
683 &self.header,
684 &self.extra[..],
685 &self.key[..],
686 &self.value[..],
687 )
688 }
689}
690
691#[derive(Debug)]
692pub struct RequestPacketRef<'a> {
693 pub header: &'a RequestHeader,
694 pub extra: &'a [u8],
695 pub key: &'a [u8],
696 pub value: &'a [u8],
697}
698
699impl<'a> RequestPacketRef<'a> {
700 pub fn new(
701 header: &'a RequestHeader,
702 extra: &'a [u8],
703 key: &'a [u8],
704 value: &'a [u8],
705 ) -> RequestPacketRef<'a> {
706 RequestPacketRef {
707 header,
708 extra,
709 key,
710 value,
711 }
712 }
713
714 #[inline]
715 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
716 self.header.write_to(writer)?;
717 writer.write_all(self.extra)?;
718 writer.write_all(self.key)?;
719 writer.write_all(self.value)?;
720
721 Ok(())
722 }
723}
724
725#[derive(Clone, Debug)]
726pub struct ResponsePacket {
727 pub header: ResponseHeader,
728 pub extra: Vec<u8>,
729 pub key: Vec<u8>,
730 pub value: Vec<u8>,
731}
732
733impl ResponsePacket {
734 pub fn new(
735 cmd: Command,
736 dtype: DataType,
737 status: Status,
738 opaque: u32,
739 cas: u64,
740 extra: Vec<u8>,
741 key: Vec<u8>,
742 value: Vec<u8>,
743 ) -> ResponsePacket {
744 ResponsePacket {
745 header: ResponseHeader::from_payload(
746 cmd,
747 dtype,
748 status,
749 opaque,
750 cas,
751 &key[..],
752 &extra[..],
753 &value[..],
754 ),
755 extra,
756 key,
757 value,
758 }
759 }
760
761 #[inline]
762 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
763 self.header.write_to(writer)?;
764 writer.write_all(&self.extra[..])?;
765 writer.write_all(&self.key[..])?;
766 writer.write_all(&self.value[..])?;
767
768 Ok(())
769 }
770
771 #[inline]
772 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<ResponsePacket> {
773 let header = ResponseHeader::read_from(reader)?;
774
775 let extra_len = header.extra_len as usize;
776 let key_len = header.key_len as usize;
777 let value_len = header.body_len as usize - extra_len - key_len;
778
779 let extra = {
780 let mut buf = Vec::with_capacity(extra_len as usize);
781 reader.take(extra_len as u64).read_to_end(&mut buf)?;
782 buf
783 };
784
785 let key = {
786 let mut buf = Vec::with_capacity(key_len as usize);
787 reader.take(key_len as u64).read_to_end(&mut buf)?;
788 buf
789 };
790
791 let value = {
792 let mut buf = Vec::with_capacity(value_len as usize);
793 reader.take(value_len as u64).read_to_end(&mut buf)?;
794 buf
795 };
796
797 Ok(ResponsePacket {
798 header,
799 extra,
800 key,
801 value,
802 })
803 }
804}
805
806pub struct ResponsePacketRef<'a> {
807 pub header: &'a ResponseHeader,
808 pub extra: &'a [u8],
809 pub key: &'a [u8],
810 pub value: &'a [u8],
811}
812
813impl<'a> ResponsePacketRef<'a> {
814 pub fn new(
815 header: &'a ResponseHeader,
816 extra: &'a [u8],
817 key: &'a [u8],
818 value: &'a [u8],
819 ) -> ResponsePacketRef<'a> {
820 ResponsePacketRef {
821 header,
822 extra,
823 key,
824 value,
825 }
826 }
827
828 #[inline]
829 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
830 self.header.write_to(writer)?;
831 writer.write_all(self.extra)?;
832 writer.write_all(self.key)?;
833 writer.write_all(self.value)?;
834
835 Ok(())
836 }
837}
838
839#[cfg(test)]
840mod test {
841 use std::io::Write;
842 use std::net::TcpStream;
843
844 use crate::proto;
845 use crate::proto::binarydef::{Command, DataType, RequestPacket, ResponsePacket};
846
847 use bufstream::BufStream;
848
849 fn test_stream() -> TcpStream {
850 TcpStream::connect("127.0.0.1:11211").unwrap()
851 }
852
853 #[test]
854 fn test_binary_protocol() {
855 let mut stream = BufStream::new(test_stream());
856
857 {
858 let req_packet = RequestPacket::new(
859 Command::Set,
860 DataType::RawBytes,
861 0,
862 0,
863 0,
864 vec![0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x0e, 0x10],
865 b"test:binary_proto:hello".to_vec(),
866 b"world".to_vec(),
867 );
868
869 req_packet.write_to(&mut stream).unwrap();
870 stream.flush().unwrap();
871
872 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap();
873
874 assert!(resp_packet.header.status == proto::binary::Status::NoError);
875 }
876
877 {
878 let req_packet = RequestPacket::new(
879 Command::Get,
880 DataType::RawBytes,
881 0,
882 0,
883 0,
884 vec![],
885 b"test:binary_proto:hello".to_vec(),
886 vec![],
887 );
888
889 req_packet.write_to(&mut stream).unwrap();
890 stream.flush().unwrap();
891
892 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap();
893
894 assert!(resp_packet.header.status == proto::binary::Status::NoError);
895 assert_eq!(&resp_packet.value[..], b"world");
896 }
897
898 {
899 let req_packet = RequestPacket::new(
900 Command::Delete,
901 DataType::RawBytes,
902 0,
903 0,
904 0,
905 vec![],
906 b"test:binary_proto:hello".to_vec(),
907 vec![],
908 );
909
910 req_packet.write_to(&mut stream).unwrap();
911 stream.flush().unwrap();
912
913 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap();
914
915 assert!(resp_packet.header.status == proto::binary::Status::NoError);
916 }
917 }
918}