mux/codec/
mod.rs

1//! Codec functions for encoding and decoding mux frames.
2//!
3//! Decode messages from a continuous stream using `read_message` to
4//! address message framing. Individual message decoding functions assume the
5//! remainder of the stream comprises the message.
6
7extern crate byteorder;
8
9use byteorder::{ReadBytesExt, BigEndian, WriteBytesExt};
10
11use std::io;
12use std::io::{ErrorKind, Read, Write};
13use std::time::Duration;
14
15use super::*;
16
17use std::{u8, u16};
18
19pub mod size;
20
21// concise length checking for encoding length delimited fields
22macro_rules! chklen {
23    ($e:expr, $len:expr) => ({
24        chklen!($e, $len, "Length overflow.")
25    });
26    ($e:expr, $len:expr, $msg:expr) => {
27        if $e.len() > $len as usize {
28            return Err(io::Error::new(ErrorKind::InvalidInput, $msg));
29        }
30    };
31}
32
33/// Synchronously read a whole mux `Message`
34///
35/// This function will synchronously read from the provided `&mut Read` until
36/// it has received an entire mux `Message`
37///
38/// ```rust
39/// use std::io::Cursor;
40/// use mux::MessageFrame;
41/// use mux::codec;
42///
43/// let mut r = Cursor::new(vec![0,0,0,4,65,0,0,0,1,0,0,0,0]); // ping frame plus 4 0's
44/// let frame = codec::read_message(&mut r).unwrap().frame;
45/// assert_eq!(frame, MessageFrame::Tping);
46/// ```
47pub fn read_message<R: Read + ?Sized>(input: &mut R) -> io::Result<Message> {
48    let size = {
49        let size = try!(input.read_i32::<BigEndian>());
50        if size < 4 {
51            let msg = format!("Invalid mux frame size: {}. Minimum 4 bytes.", size);
52            return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
53        }
54
55        size as u64
56    };
57
58    decode_message(input.take(size))
59}
60
61/// Synchronously decode a mux `Message`
62///
63/// This function will synchronously read from the provided `&mut Read` until
64/// it has decoded a message. Message length is assumed to be triggered by an EOF.
65/// When using use a continuous stream, such as a `TcpStream`, use `read_message`.
66///
67/// ```rust
68/// use std::io::Cursor;
69/// use mux::MessageFrame;
70/// use mux::codec;
71///
72/// let mut r = Cursor::new(vec![65,0,0,0,1]); // ping frame
73/// let frame = codec::decode_message(&mut r).unwrap().frame;
74/// assert_eq!(frame, MessageFrame::Tping);
75/// ```
76pub fn decode_message<R: Read>(mut read: R) -> io::Result<Message> {
77    let tpe = try!(read.read_i8());
78    let tag = try!(decode_tag(&mut read));
79    let frame = try!(decode_frame(tpe, &mut read));
80
81    Ok(Message {
82        tag: tag,
83        frame: frame,
84    })
85}
86
87/// Synchronously encode a `Message` to the `Write` with the frame size
88///
89/// Convert the `Message` to a stream of bytes and write it too the `Write`
90/// reference. If the `Write` blocks, this function blocks.
91///
92/// ```rust
93/// use std::io::Cursor;
94/// use mux::{Message, MessageFrame, Tag};
95/// use mux::codec;
96///
97/// let mut w = Cursor::new(Vec::new());
98/// let tag = Tag::new(true, 1);
99/// let _ = codec::write_message(&mut w, &Message { tag: tag, frame: MessageFrame::Tping });
100/// assert_eq!(w.into_inner(), vec![0,0,0,4,65,0,0,1]);
101/// ```
102pub fn write_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
103    // the size is the buffer size + the header (id + tag)
104    try!(buffer.write_i32::<BigEndian>(size::frame_size(&msg.frame) as i32 + 4));
105    encode_message(buffer, msg)
106}
107
108/// Synchronously encode a `Message` to the `Write`
109///
110/// Convert the `Message` to a stream of bytes and write it too the `Write`
111/// reference. If the `Write` blocks, this function blocks.
112///
113/// ```rust
114/// use std::io::Cursor;
115/// use mux::{Message, MessageFrame, Tag};
116/// use mux::codec;
117///
118/// let mut w = Cursor::new(Vec::new());
119/// let tag = Tag::new(true, 1);
120/// let _ = codec::encode_message(&mut w, &Message { tag: tag, frame: MessageFrame::Tping });
121/// assert_eq!(w.into_inner(), vec![65,0,0,1]);
122/// ```
123pub fn encode_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
124    try!(buffer.write_i8(msg.frame.frame_id()));
125    try!(encode_tag(buffer, &msg.tag));
126    encode_frame(buffer, &msg.frame)
127}
128
129// frame codec functions
130
131/// Synchronously encode a `MessageFrame` to the `Write`
132///
133/// Convert the `FrameMessage` to a stream of bytes and write it too the `Write`
134/// reference. If the `Write` blocks, this function blocks.
135///
136/// ```rust
137/// use std::io::Cursor;
138/// use mux::{Message, MessageFrame, Tag};
139/// use mux::codec;
140///
141/// let mut w = Cursor::new(Vec::new());
142/// let tag = Tag::new(true, 1);
143/// let _ = codec::encode_frame(&mut w, &MessageFrame::Tping);
144/// assert_eq!(w.into_inner(), vec![]); // Tping is 0 length
145/// ```
146pub fn encode_frame<W: Write + ?Sized>(writer: &mut W, frame: &MessageFrame) -> io::Result<()> {
147    match *frame {
148        MessageFrame::Treq(ref f) => encode_treq(writer, f),
149        MessageFrame::Rreq(ref f) => encode_rreq(writer, f),
150        MessageFrame::Tdispatch(ref f) => encode_tdispatch(writer, f),
151        MessageFrame::Rdispatch(ref f) => encode_rdispatch(writer, f),
152        MessageFrame::Tinit(ref f) => encode_init(writer, f),
153        MessageFrame::Rinit(ref f) => encode_init(writer, f),
154        // the following are empty messages
155        MessageFrame::Tping => Ok(()),
156        MessageFrame::Rping => Ok(()),
157        MessageFrame::Tdrain => Ok(()),
158        MessageFrame::Rdrain => Ok(()),
159        MessageFrame::Tdiscarded(ref f) => encode_tdiscarded(writer, f),
160        MessageFrame::Tlease(ref f) => encode_tlease(writer, f),
161        MessageFrame::Rerr(ref f) => encode_rerr(writer, f),
162    }
163}
164
165/// Synchronously decode a mux `MessageFrame`
166///
167/// This function will synchronously read from the provided `&mut Read` until
168/// it has decoded a frame. Frame length is assumed to be triggered by an EOF.
169/// When using use a continuous stream, such as a `TcpStream`, use `Read.take(n)`
170/// if you know the frame length or use read_message above to decode an entire
171/// `Message` and then extract the frame.
172///
173/// ```rust
174/// use std::io::Cursor;
175/// use mux::MessageFrame;
176/// use mux::codec;
177/// use mux::types;
178///
179/// let mut r = Cursor::new(vec![]); // ping frame is empty
180/// let frame = codec::decode_frame(types::TPING, &mut r).unwrap();
181/// assert_eq!(frame, MessageFrame::Tping);
182/// ```
183pub fn decode_frame<R: Read>(tpe: i8, reader: R) -> io::Result<MessageFrame> {
184    Ok(match tpe {
185        types::TREQ => MessageFrame::Treq(try!(decode_treq(reader))),
186        types::RREQ => MessageFrame::Rreq(try!(decode_rreq(reader))),
187        types::TDISPATCH => MessageFrame::Tdispatch(try!(decode_tdispatch(reader))),
188        types::RDISPATCH => MessageFrame::Rdispatch(try!(decode_rdispatch(reader))),
189        types::TINIT => MessageFrame::Tinit(try!(decode_init(reader))),
190        types::RINIT => MessageFrame::Rinit(try!(decode_init(reader))),
191        types::TDRAIN => MessageFrame::Tdrain,
192        types::RDRAIN => MessageFrame::Rdrain,
193        types::TPING => MessageFrame::Tping,
194        types::RPING => MessageFrame::Rping,
195        types::TLEASE => MessageFrame::Tlease(try!(decode_tlease(reader))),
196        types::RERR => MessageFrame::Rerr(try!(decode_rerr(reader))),
197        other => {
198            return Err(
199                io::Error::new(io::ErrorKind::InvalidInput,
200                    format!("Invalid frame type: {}", other))
201                );
202        }
203    })
204}
205
206
207///////////// Tlease codec function
208
209pub fn decode_tlease<R: Read>(mut reader: R) -> io::Result<Tlease> {
210    let howmuch = try!(reader.read_u8());
211    let ticks = try!(reader.read_u64::<BigEndian>());
212
213    if howmuch == 0 {
214        Ok(Tlease {
215            duration: Duration::from_millis(ticks),
216        })
217    } else {
218        let message = format!("Unknown Tlease 'howmuch' code: {}", howmuch);
219        Err(io::Error::new(ErrorKind::InvalidData, message))
220    }
221}
222
223pub fn encode_tlease<W: Write + ?Sized>(writer: &mut W, tlease: &Tlease) -> io::Result<()> {
224    let d = &tlease.duration;
225    let millis = d.as_secs()*1000 + (((d.subsec_nanos() as f64)/1e6) as u64);
226    try!(writer.write_u8(0));
227    try!(writer.write_u64::<BigEndian>(millis));
228    Ok(())
229}
230
231///////////// Tdiscarded codecs
232
233#[inline]
234pub fn decode_tdiscarded<R: Read>(mut reader: R) -> io::Result<Tdiscarded> {
235    let mut bts = [0;3];
236    try!(reader.read_exact(&mut bts[..]));
237    let id = (bts[0] as u32) << 16 | (bts[1] as u32) <<  8 | (bts[2] as u32);
238    let msg = try!(body_as_string(reader));
239
240    Ok(Tdiscarded {
241        id: id,
242        msg: msg,
243    })
244}
245
246#[inline]
247pub fn encode_tdiscarded<W: Write + ?Sized>(writer: &mut W, msg: &Tdiscarded) -> io::Result<()> {
248    let bts = [
249        ((msg.id >> 16) & 0xff) as u8,
250        ((msg.id >>  8) & 0xff) as u8,
251        ( msg.id        & 0xff) as u8,
252    ];
253    try!(writer.write_all(&bts[..]));
254    writer.write_all(msg.msg.as_bytes())
255}
256
257///////////// Tag codec functions
258
259const TAG_END_MASK: u32 = 1 << 23; // 24th bit of tag
260const TAG_ID_MASK: u32 = MAX_TAG;
261
262pub fn decode_tag<R: Read + ?Sized>(reader: &mut R) -> io::Result<Tag> {
263    let mut bts = [0; 3];
264    let _ = try!(reader.read(&mut bts));
265
266    let id = (bts[0] as u32) << 16 |
267             (bts[1] as u32) <<  8 |
268             (bts[2] as u32);
269
270    Ok(Tag {
271        end: id & TAG_END_MASK == 0,
272        id: id & TAG_ID_MASK, // clear the last bit, its for the end flag
273    })
274}
275
276#[inline]
277pub fn encode_tag<W: Write + ?Sized>(buffer: &mut W, tag: &Tag) -> io::Result<()> {
278    let bytes = {
279        let id = tag.id;
280        let endbit = if tag.end { 0 } else { 1 };
281        [(id >> 16 & 0x7f) as u8 | (endbit << 7),
282         (id >> 8 & 0xff) as u8,
283         (id >> 0 & 0xff) as u8]
284    };
285
286    buffer.write_all(&bytes)
287}
288
289///////////// headers codec functions
290
291pub fn encode_headers<W: Write + ?Sized>(writer: &mut W, headers: &Headers) -> io::Result<()> {
292    chklen!(headers, u8::MAX, "Header count overflow");
293    try!(writer.write_u8(headers.len() as u8));
294
295    for &(ref k, ref v) in headers {
296        chklen!(v, u8::MAX, "Header length overflow");
297
298        try!(writer.write_u8(*k));
299        try!(writer.write_u8(v.len() as u8));
300        try!(writer.write_all(v));
301    }
302    Ok(())
303}
304
305pub fn decode_headers<R: Read + ?Sized>(reader: &mut R) -> io::Result<Headers> {
306    let len = try!(reader.read_u8()) as usize;
307    let mut acc = Vec::with_capacity(len);
308
309    for _ in 0..len {
310        let key = try!(reader.read_u8());
311        let val_len = try!(reader.read_u8());
312        let mut val = vec![0;val_len as usize];
313        try!(reader.read_exact(&mut val[..]));
314        acc.push((key, val));
315    }
316
317    Ok(acc)
318}
319
320///////////// Contexts codec functions
321
322pub fn encode_contexts<W: Write + ?Sized>(writer: &mut W, contexts: &Contexts) -> io::Result<()> {
323    chklen!(contexts, u16::MAX, "Context entries overflow");
324
325    try!(writer.write_u16::<BigEndian>(contexts.len() as u16));
326    for &(ref k, ref v) in contexts {
327        chklen!(k, u16::MAX, "Context key overflow");
328        try!(writer.write_u16::<BigEndian>(k.len() as u16));
329        try!(writer.write_all(&k[..]));
330
331        chklen!(v, u16::MAX, "Context value overflow");
332        try!(writer.write_u16::<BigEndian>(v.len() as u16));
333        try!(writer.write_all(&v[..]));
334    }
335
336    Ok(())
337}
338
339pub fn decode_contexts<R: Read + ?Sized>(reader: &mut R) -> io::Result<Contexts> {
340    let len = try!(reader.read_u16::<BigEndian>()) as usize;
341
342    let mut acc = Vec::with_capacity(len);
343
344    for _ in 0..len {
345        let key_len = try!(reader.read_u16::<BigEndian>());
346        let mut key = vec![0;key_len as usize];
347        try!(reader.read_exact(&mut key[..]));
348
349        let val_len = try!(reader.read_u16::<BigEndian>());
350        let mut val = vec![0;val_len as usize];
351        try!(reader.read_exact(&mut val[..]));
352        acc.push((key, val));
353    }
354
355    Ok(acc)
356}
357
358///////////// Dtab codec functions
359pub fn decode_dtab<R: Read + ?Sized>(reader: &mut R) -> io::Result<Dtab> {
360    let len = try!(reader.read_u16::<BigEndian>()) as usize;
361    let mut acc = Vec::with_capacity(len);
362
363    for _ in 0..len {
364        let key_len = try!(reader.read_u16::<BigEndian>());
365        let mut key = vec![0;key_len as usize];
366        try!(reader.read_exact(&mut key[..]));
367
368        let val_len = try!(reader.read_u16::<BigEndian>());
369        let mut val = vec![0;val_len as usize];
370        try!(reader.read_exact(&mut val[..]));
371        acc.push(Dentry::new(try!(to_string(key)), try!(to_string(val))));
372    }
373
374    Ok(Dtab::from_entries(acc))
375}
376
377pub fn encode_dtab<W: Write + ?Sized>(writer: &mut W, table: &Dtab) -> io::Result<()> {
378    chklen!(table.entries, u16::MAX, "Dtable length overflow");
379    try!(writer.write_u16::<BigEndian>(table.entries.len() as u16));
380
381    for dentry in &table.entries {
382        // the string encoder will check for overflows
383        try!(encode_u16_string(writer, &dentry.key));
384        try!(encode_u16_string(writer, &dentry.val));
385    }
386    Ok(())
387}
388
389///////////// Rerr codec functions
390
391#[inline]
392pub fn decode_rerr<R: Read>(reader: R) -> io::Result<Rerr> {
393    let msg = try!(body_as_string(reader));
394    Ok(Rerr { msg: msg, })
395}
396
397#[inline]
398pub fn encode_rerr<W: Write + ?Sized>(writer: &mut W, rerr: &Rerr) -> io::Result<()> {
399    writer.write_all(rerr.msg.as_bytes())
400}
401
402///////////// Init codec functions
403
404pub fn encode_init<W: Write + ?Sized>(writer: &mut W, msg: &Init) -> io::Result<()> {
405    try!(writer.write_u16::<BigEndian>(msg.version));
406
407    // Not going to bother checking for overflow: if a single one of the
408    // entries overflows then the entire frame overflows which is not the
409    // pervue of this function.
410
411    for &(ref k, ref v) in &msg.headers {
412        try!(writer.write_u32::<BigEndian>(k.len() as u32));
413        try!(writer.write_all(k));
414        try!(writer.write_u32::<BigEndian>(v.len() as u32));
415        try!(writer.write_all(v));
416    }
417
418    Ok(())
419}
420
421pub fn decode_init<R: Read>(mut reader: R) -> io::Result<Init> {
422    let mut headers = Vec::new();
423    let version = try!(reader.read_u16::<BigEndian>());
424
425    loop {
426        let klen = match reader.read_u32::<BigEndian>() {
427            Ok(len) => len,
428            Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
429                // termination: out of buffer.
430                return Ok(
431                    Init {
432                        version: version,
433                        headers: headers,
434                    }
435                );
436            }
437            Err(other) => { return Err(other); }
438        };
439
440        let mut k = vec![0;klen as usize];
441        try!(reader.read_exact(&mut k));
442
443        let vlen = try!(reader.read_u32::<BigEndian>());
444        let mut v = vec![0;vlen as usize];
445        try!(reader.read_exact(&mut v));
446
447        headers.push((k, v));
448    }
449}
450
451///////////// Rdispatch codec functions
452
453pub fn encode_rdispatch<W: Write + ?Sized>(writer: &mut W, frame: &Rdispatch) -> io::Result<()> {
454    let (status, body) = rmsg_status_body(&frame.msg);
455
456    try!(writer.write_u8(status));
457    try!(encode_contexts(writer, &frame.contexts));
458    writer.write_all(body)
459}
460
461// Expects to consume the whole stream
462pub fn decode_rdispatch<R: Read>(mut reader: R) -> io::Result<Rdispatch> {
463    let status = try!(reader.read_u8());
464    let contexts = try!(decode_contexts(&mut reader));
465    let mut body = Vec::new();
466    let _ = try!(reader.read_to_end(&mut body));
467
468    Ok(Rdispatch {
469        contexts: contexts,
470        msg: try!(decode_rmsg_body(status, body)),
471    })
472}
473
474///////////// Rreq codec functions
475
476pub fn encode_rreq<W: Write + ?Sized>(writer: &mut W, frame: &Rmsg) -> io::Result<()> {
477    let (status, body) = rmsg_status_body(frame);
478    try!(writer.write_u8(status));
479    writer.write_all(body)
480}
481
482pub fn decode_rreq<R: Read>(mut reader: R) -> io::Result<Rmsg> {
483    let status = try!(reader.read_u8());
484    let mut body = Vec::new();
485    try!(reader.read_to_end(&mut body));
486    decode_rmsg_body(status, body)
487}
488
489///////////// Tdispatch codec functions
490
491pub fn decode_tdispatch<R: Read>(mut reader: R) -> io::Result<Tdispatch> {
492    let contexts = try!(decode_contexts(&mut reader));
493    let dest = try!(decode_u16_string(&mut reader));
494    let dtab = try!(decode_dtab(&mut reader));
495
496    let mut body = Vec::new();
497    let _ = try!(reader.read_to_end(&mut body));
498
499    Ok(Tdispatch {
500        contexts: contexts,
501        dest: dest,
502        dtab: dtab,
503        body: body,
504    })
505}
506
507pub fn encode_tdispatch<W: Write + ?Sized>(writer: &mut W, msg: &Tdispatch) -> io::Result<()> {
508    try!(encode_contexts(writer, &msg.contexts));
509    try!(encode_u16_string(writer, &msg.dest));
510    try!(encode_dtab(writer, &msg.dtab));
511    writer.write_all(&msg.body)
512}
513
514///////////// Treq codec functions
515
516pub fn decode_treq<R: Read>(mut reader: R) -> io::Result<Treq> {
517    let headers = try!(decode_headers(&mut reader));
518    let mut body = Vec::new();
519
520    let _ = try!(reader.read_to_end(&mut body));
521    Ok(Treq {
522        headers: headers,
523        body: body,
524    })
525}
526
527#[inline]
528pub fn encode_treq<W: Write + ?Sized>(writer: &mut W, msg: &Treq) -> io::Result<()> {
529    try!(encode_headers(writer, &msg.headers));
530    writer.write_all(&msg.body)
531}
532
533#[inline]
534fn rmsg_status_body(msg: &Rmsg) -> (u8, &[u8]) {
535    match *msg {
536        Rmsg::Ok(ref body) => (0, body.as_ref()),
537        Rmsg::Error(ref msg) => (1, msg.as_bytes()),
538        Rmsg::Nack(ref msg) => (2, msg.as_bytes()),
539    }
540}
541
542#[inline]
543fn decode_rmsg_body(status: u8, body: Vec<u8>) -> io::Result<Rmsg> {
544    match status {
545        0 => Ok(Rmsg::Ok(body)),
546        1 => Ok(Rmsg::Error(try!(to_string(body)))),
547        2 => Ok(Rmsg::Nack(try!(to_string(body)))),
548        other => Err(
549            io::Error::new(ErrorKind::InvalidData, format!("Invalid status code: {}", other))
550        )
551    }
552}
553
554// tools for operating on Strings
555
556// decode a utf8 string with length specified by a u16 prefix byte
557#[inline]
558pub fn decode_u16_string<R: Read + ?Sized>(reader: &mut R) -> io::Result<String> {
559    let str_len = try!(reader.read_u16::<BigEndian>());
560    let mut s = vec![0; str_len as usize];
561
562    try!(reader.read_exact(&mut s));
563
564    to_string(s)
565}
566
567#[inline]
568pub fn encode_u16_string<W: Write + ?Sized>(writer: &mut W, s: &str) -> io::Result<()> {
569    let bytes = s.as_bytes();
570
571    chklen!(bytes, u16::MAX, "16bit length delimited string overflow");
572    try!(writer.write_u16::<BigEndian>(bytes.len() as u16));
573    writer.write_all(bytes)
574}
575
576#[inline]
577fn to_string(vec: Vec<u8>) -> io::Result<String> {
578    match String::from_utf8(vec) {
579        Ok(s) => Ok(s),
580        Err(_) => Err(io::Error::new(ErrorKind::InvalidData, "Invalid UTF8 field")),
581    }
582}
583
584#[inline]
585fn body_as_string<R: Read>(mut reader: R) -> io::Result<String> {
586    let mut data = Vec::new();
587    let _ = try!(reader.read_to_end(&mut data));
588    to_string(data)
589}