1extern crate byteorder;
8
9use byteorder::{ReadBytesExt, BigEndian, WriteBytesExt};
10
11use std::io;
12use std::io::{ErrorKind, Read, Write};
13use std::time::Duration;
14
15use super::*;
16
17use std::{u8, u16};
18
19pub mod size;
20
21macro_rules! chklen {
23 ($e:expr, $len:expr) => ({
24 chklen!($e, $len, "Length overflow.")
25 });
26 ($e:expr, $len:expr, $msg:expr) => {
27 if $e.len() > $len as usize {
28 return Err(io::Error::new(ErrorKind::InvalidInput, $msg));
29 }
30 };
31}
32
33pub fn read_message<R: Read + ?Sized>(input: &mut R) -> io::Result<Message> {
48 let size = {
49 let size = try!(input.read_i32::<BigEndian>());
50 if size < 4 {
51 let msg = format!("Invalid mux frame size: {}. Minimum 4 bytes.", size);
52 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
53 }
54
55 size as u64
56 };
57
58 decode_message(input.take(size))
59}
60
61pub fn decode_message<R: Read>(mut read: R) -> io::Result<Message> {
77 let tpe = try!(read.read_i8());
78 let tag = try!(decode_tag(&mut read));
79 let frame = try!(decode_frame(tpe, &mut read));
80
81 Ok(Message {
82 tag: tag,
83 frame: frame,
84 })
85}
86
87pub fn write_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
103 try!(buffer.write_i32::<BigEndian>(size::frame_size(&msg.frame) as i32 + 4));
105 encode_message(buffer, msg)
106}
107
108pub fn encode_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
124 try!(buffer.write_i8(msg.frame.frame_id()));
125 try!(encode_tag(buffer, &msg.tag));
126 encode_frame(buffer, &msg.frame)
127}
128
129pub fn encode_frame<W: Write + ?Sized>(writer: &mut W, frame: &MessageFrame) -> io::Result<()> {
147 match *frame {
148 MessageFrame::Treq(ref f) => encode_treq(writer, f),
149 MessageFrame::Rreq(ref f) => encode_rreq(writer, f),
150 MessageFrame::Tdispatch(ref f) => encode_tdispatch(writer, f),
151 MessageFrame::Rdispatch(ref f) => encode_rdispatch(writer, f),
152 MessageFrame::Tinit(ref f) => encode_init(writer, f),
153 MessageFrame::Rinit(ref f) => encode_init(writer, f),
154 MessageFrame::Tping => Ok(()),
156 MessageFrame::Rping => Ok(()),
157 MessageFrame::Tdrain => Ok(()),
158 MessageFrame::Rdrain => Ok(()),
159 MessageFrame::Tdiscarded(ref f) => encode_tdiscarded(writer, f),
160 MessageFrame::Tlease(ref f) => encode_tlease(writer, f),
161 MessageFrame::Rerr(ref f) => encode_rerr(writer, f),
162 }
163}
164
165pub fn decode_frame<R: Read>(tpe: i8, reader: R) -> io::Result<MessageFrame> {
184 Ok(match tpe {
185 types::TREQ => MessageFrame::Treq(try!(decode_treq(reader))),
186 types::RREQ => MessageFrame::Rreq(try!(decode_rreq(reader))),
187 types::TDISPATCH => MessageFrame::Tdispatch(try!(decode_tdispatch(reader))),
188 types::RDISPATCH => MessageFrame::Rdispatch(try!(decode_rdispatch(reader))),
189 types::TINIT => MessageFrame::Tinit(try!(decode_init(reader))),
190 types::RINIT => MessageFrame::Rinit(try!(decode_init(reader))),
191 types::TDRAIN => MessageFrame::Tdrain,
192 types::RDRAIN => MessageFrame::Rdrain,
193 types::TPING => MessageFrame::Tping,
194 types::RPING => MessageFrame::Rping,
195 types::TLEASE => MessageFrame::Tlease(try!(decode_tlease(reader))),
196 types::RERR => MessageFrame::Rerr(try!(decode_rerr(reader))),
197 other => {
198 return Err(
199 io::Error::new(io::ErrorKind::InvalidInput,
200 format!("Invalid frame type: {}", other))
201 );
202 }
203 })
204}
205
206
207pub fn decode_tlease<R: Read>(mut reader: R) -> io::Result<Tlease> {
210 let howmuch = try!(reader.read_u8());
211 let ticks = try!(reader.read_u64::<BigEndian>());
212
213 if howmuch == 0 {
214 Ok(Tlease {
215 duration: Duration::from_millis(ticks),
216 })
217 } else {
218 let message = format!("Unknown Tlease 'howmuch' code: {}", howmuch);
219 Err(io::Error::new(ErrorKind::InvalidData, message))
220 }
221}
222
223pub fn encode_tlease<W: Write + ?Sized>(writer: &mut W, tlease: &Tlease) -> io::Result<()> {
224 let d = &tlease.duration;
225 let millis = d.as_secs()*1000 + (((d.subsec_nanos() as f64)/1e6) as u64);
226 try!(writer.write_u8(0));
227 try!(writer.write_u64::<BigEndian>(millis));
228 Ok(())
229}
230
231#[inline]
234pub fn decode_tdiscarded<R: Read>(mut reader: R) -> io::Result<Tdiscarded> {
235 let mut bts = [0;3];
236 try!(reader.read_exact(&mut bts[..]));
237 let id = (bts[0] as u32) << 16 | (bts[1] as u32) << 8 | (bts[2] as u32);
238 let msg = try!(body_as_string(reader));
239
240 Ok(Tdiscarded {
241 id: id,
242 msg: msg,
243 })
244}
245
246#[inline]
247pub fn encode_tdiscarded<W: Write + ?Sized>(writer: &mut W, msg: &Tdiscarded) -> io::Result<()> {
248 let bts = [
249 ((msg.id >> 16) & 0xff) as u8,
250 ((msg.id >> 8) & 0xff) as u8,
251 ( msg.id & 0xff) as u8,
252 ];
253 try!(writer.write_all(&bts[..]));
254 writer.write_all(msg.msg.as_bytes())
255}
256
257const TAG_END_MASK: u32 = 1 << 23; const TAG_ID_MASK: u32 = MAX_TAG;
261
262pub fn decode_tag<R: Read + ?Sized>(reader: &mut R) -> io::Result<Tag> {
263 let mut bts = [0; 3];
264 let _ = try!(reader.read(&mut bts));
265
266 let id = (bts[0] as u32) << 16 |
267 (bts[1] as u32) << 8 |
268 (bts[2] as u32);
269
270 Ok(Tag {
271 end: id & TAG_END_MASK == 0,
272 id: id & TAG_ID_MASK, })
274}
275
276#[inline]
277pub fn encode_tag<W: Write + ?Sized>(buffer: &mut W, tag: &Tag) -> io::Result<()> {
278 let bytes = {
279 let id = tag.id;
280 let endbit = if tag.end { 0 } else { 1 };
281 [(id >> 16 & 0x7f) as u8 | (endbit << 7),
282 (id >> 8 & 0xff) as u8,
283 (id >> 0 & 0xff) as u8]
284 };
285
286 buffer.write_all(&bytes)
287}
288
289pub fn encode_headers<W: Write + ?Sized>(writer: &mut W, headers: &Headers) -> io::Result<()> {
292 chklen!(headers, u8::MAX, "Header count overflow");
293 try!(writer.write_u8(headers.len() as u8));
294
295 for &(ref k, ref v) in headers {
296 chklen!(v, u8::MAX, "Header length overflow");
297
298 try!(writer.write_u8(*k));
299 try!(writer.write_u8(v.len() as u8));
300 try!(writer.write_all(v));
301 }
302 Ok(())
303}
304
305pub fn decode_headers<R: Read + ?Sized>(reader: &mut R) -> io::Result<Headers> {
306 let len = try!(reader.read_u8()) as usize;
307 let mut acc = Vec::with_capacity(len);
308
309 for _ in 0..len {
310 let key = try!(reader.read_u8());
311 let val_len = try!(reader.read_u8());
312 let mut val = vec![0;val_len as usize];
313 try!(reader.read_exact(&mut val[..]));
314 acc.push((key, val));
315 }
316
317 Ok(acc)
318}
319
320pub fn encode_contexts<W: Write + ?Sized>(writer: &mut W, contexts: &Contexts) -> io::Result<()> {
323 chklen!(contexts, u16::MAX, "Context entries overflow");
324
325 try!(writer.write_u16::<BigEndian>(contexts.len() as u16));
326 for &(ref k, ref v) in contexts {
327 chklen!(k, u16::MAX, "Context key overflow");
328 try!(writer.write_u16::<BigEndian>(k.len() as u16));
329 try!(writer.write_all(&k[..]));
330
331 chklen!(v, u16::MAX, "Context value overflow");
332 try!(writer.write_u16::<BigEndian>(v.len() as u16));
333 try!(writer.write_all(&v[..]));
334 }
335
336 Ok(())
337}
338
339pub fn decode_contexts<R: Read + ?Sized>(reader: &mut R) -> io::Result<Contexts> {
340 let len = try!(reader.read_u16::<BigEndian>()) as usize;
341
342 let mut acc = Vec::with_capacity(len);
343
344 for _ in 0..len {
345 let key_len = try!(reader.read_u16::<BigEndian>());
346 let mut key = vec![0;key_len as usize];
347 try!(reader.read_exact(&mut key[..]));
348
349 let val_len = try!(reader.read_u16::<BigEndian>());
350 let mut val = vec![0;val_len as usize];
351 try!(reader.read_exact(&mut val[..]));
352 acc.push((key, val));
353 }
354
355 Ok(acc)
356}
357
358pub fn decode_dtab<R: Read + ?Sized>(reader: &mut R) -> io::Result<Dtab> {
360 let len = try!(reader.read_u16::<BigEndian>()) as usize;
361 let mut acc = Vec::with_capacity(len);
362
363 for _ in 0..len {
364 let key_len = try!(reader.read_u16::<BigEndian>());
365 let mut key = vec![0;key_len as usize];
366 try!(reader.read_exact(&mut key[..]));
367
368 let val_len = try!(reader.read_u16::<BigEndian>());
369 let mut val = vec![0;val_len as usize];
370 try!(reader.read_exact(&mut val[..]));
371 acc.push(Dentry::new(try!(to_string(key)), try!(to_string(val))));
372 }
373
374 Ok(Dtab::from_entries(acc))
375}
376
377pub fn encode_dtab<W: Write + ?Sized>(writer: &mut W, table: &Dtab) -> io::Result<()> {
378 chklen!(table.entries, u16::MAX, "Dtable length overflow");
379 try!(writer.write_u16::<BigEndian>(table.entries.len() as u16));
380
381 for dentry in &table.entries {
382 try!(encode_u16_string(writer, &dentry.key));
384 try!(encode_u16_string(writer, &dentry.val));
385 }
386 Ok(())
387}
388
389#[inline]
392pub fn decode_rerr<R: Read>(reader: R) -> io::Result<Rerr> {
393 let msg = try!(body_as_string(reader));
394 Ok(Rerr { msg: msg, })
395}
396
397#[inline]
398pub fn encode_rerr<W: Write + ?Sized>(writer: &mut W, rerr: &Rerr) -> io::Result<()> {
399 writer.write_all(rerr.msg.as_bytes())
400}
401
402pub fn encode_init<W: Write + ?Sized>(writer: &mut W, msg: &Init) -> io::Result<()> {
405 try!(writer.write_u16::<BigEndian>(msg.version));
406
407 for &(ref k, ref v) in &msg.headers {
412 try!(writer.write_u32::<BigEndian>(k.len() as u32));
413 try!(writer.write_all(k));
414 try!(writer.write_u32::<BigEndian>(v.len() as u32));
415 try!(writer.write_all(v));
416 }
417
418 Ok(())
419}
420
421pub fn decode_init<R: Read>(mut reader: R) -> io::Result<Init> {
422 let mut headers = Vec::new();
423 let version = try!(reader.read_u16::<BigEndian>());
424
425 loop {
426 let klen = match reader.read_u32::<BigEndian>() {
427 Ok(len) => len,
428 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
429 return Ok(
431 Init {
432 version: version,
433 headers: headers,
434 }
435 );
436 }
437 Err(other) => { return Err(other); }
438 };
439
440 let mut k = vec![0;klen as usize];
441 try!(reader.read_exact(&mut k));
442
443 let vlen = try!(reader.read_u32::<BigEndian>());
444 let mut v = vec![0;vlen as usize];
445 try!(reader.read_exact(&mut v));
446
447 headers.push((k, v));
448 }
449}
450
451pub fn encode_rdispatch<W: Write + ?Sized>(writer: &mut W, frame: &Rdispatch) -> io::Result<()> {
454 let (status, body) = rmsg_status_body(&frame.msg);
455
456 try!(writer.write_u8(status));
457 try!(encode_contexts(writer, &frame.contexts));
458 writer.write_all(body)
459}
460
461pub fn decode_rdispatch<R: Read>(mut reader: R) -> io::Result<Rdispatch> {
463 let status = try!(reader.read_u8());
464 let contexts = try!(decode_contexts(&mut reader));
465 let mut body = Vec::new();
466 let _ = try!(reader.read_to_end(&mut body));
467
468 Ok(Rdispatch {
469 contexts: contexts,
470 msg: try!(decode_rmsg_body(status, body)),
471 })
472}
473
474pub fn encode_rreq<W: Write + ?Sized>(writer: &mut W, frame: &Rmsg) -> io::Result<()> {
477 let (status, body) = rmsg_status_body(frame);
478 try!(writer.write_u8(status));
479 writer.write_all(body)
480}
481
482pub fn decode_rreq<R: Read>(mut reader: R) -> io::Result<Rmsg> {
483 let status = try!(reader.read_u8());
484 let mut body = Vec::new();
485 try!(reader.read_to_end(&mut body));
486 decode_rmsg_body(status, body)
487}
488
489pub fn decode_tdispatch<R: Read>(mut reader: R) -> io::Result<Tdispatch> {
492 let contexts = try!(decode_contexts(&mut reader));
493 let dest = try!(decode_u16_string(&mut reader));
494 let dtab = try!(decode_dtab(&mut reader));
495
496 let mut body = Vec::new();
497 let _ = try!(reader.read_to_end(&mut body));
498
499 Ok(Tdispatch {
500 contexts: contexts,
501 dest: dest,
502 dtab: dtab,
503 body: body,
504 })
505}
506
507pub fn encode_tdispatch<W: Write + ?Sized>(writer: &mut W, msg: &Tdispatch) -> io::Result<()> {
508 try!(encode_contexts(writer, &msg.contexts));
509 try!(encode_u16_string(writer, &msg.dest));
510 try!(encode_dtab(writer, &msg.dtab));
511 writer.write_all(&msg.body)
512}
513
514pub fn decode_treq<R: Read>(mut reader: R) -> io::Result<Treq> {
517 let headers = try!(decode_headers(&mut reader));
518 let mut body = Vec::new();
519
520 let _ = try!(reader.read_to_end(&mut body));
521 Ok(Treq {
522 headers: headers,
523 body: body,
524 })
525}
526
527#[inline]
528pub fn encode_treq<W: Write + ?Sized>(writer: &mut W, msg: &Treq) -> io::Result<()> {
529 try!(encode_headers(writer, &msg.headers));
530 writer.write_all(&msg.body)
531}
532
533#[inline]
534fn rmsg_status_body(msg: &Rmsg) -> (u8, &[u8]) {
535 match *msg {
536 Rmsg::Ok(ref body) => (0, body.as_ref()),
537 Rmsg::Error(ref msg) => (1, msg.as_bytes()),
538 Rmsg::Nack(ref msg) => (2, msg.as_bytes()),
539 }
540}
541
542#[inline]
543fn decode_rmsg_body(status: u8, body: Vec<u8>) -> io::Result<Rmsg> {
544 match status {
545 0 => Ok(Rmsg::Ok(body)),
546 1 => Ok(Rmsg::Error(try!(to_string(body)))),
547 2 => Ok(Rmsg::Nack(try!(to_string(body)))),
548 other => Err(
549 io::Error::new(ErrorKind::InvalidData, format!("Invalid status code: {}", other))
550 )
551 }
552}
553
554#[inline]
558pub fn decode_u16_string<R: Read + ?Sized>(reader: &mut R) -> io::Result<String> {
559 let str_len = try!(reader.read_u16::<BigEndian>());
560 let mut s = vec![0; str_len as usize];
561
562 try!(reader.read_exact(&mut s));
563
564 to_string(s)
565}
566
567#[inline]
568pub fn encode_u16_string<W: Write + ?Sized>(writer: &mut W, s: &str) -> io::Result<()> {
569 let bytes = s.as_bytes();
570
571 chklen!(bytes, u16::MAX, "16bit length delimited string overflow");
572 try!(writer.write_u16::<BigEndian>(bytes.len() as u16));
573 writer.write_all(bytes)
574}
575
576#[inline]
577fn to_string(vec: Vec<u8>) -> io::Result<String> {
578 match String::from_utf8(vec) {
579 Ok(s) => Ok(s),
580 Err(_) => Err(io::Error::new(ErrorKind::InvalidData, "Invalid UTF8 field")),
581 }
582}
583
584#[inline]
585fn body_as_string<R: Read>(mut reader: R) -> io::Result<String> {
586 let mut data = Vec::new();
587 let _ = try!(reader.read_to_end(&mut data));
588 to_string(data)
589}