mux 0.1.1

mux codecs for rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
//! Codec functions for encoding and decoding mux frames.
//!
//! Decode messages from a continuous stream using `read_message` to
//! address message framing. Individual message decoding functions assume the
//! remainder of the stream comprises the message.

extern crate byteorder;

use byteorder::{ReadBytesExt, BigEndian, WriteBytesExt};

use std::io;
use std::io::{ErrorKind, Read, Write};
use std::time::Duration;

use super::*;

use std::{u8, u16};

pub mod size;

// concise length checking for encoding length delimited fields
macro_rules! chklen {
    ($e:expr, $len:expr) => ({
        chklen!($e, $len, "Length overflow.")
    });
    ($e:expr, $len:expr, $msg:expr) => {
        if $e.len() > $len as usize {
            return Err(io::Error::new(ErrorKind::InvalidInput, $msg));
        }
    };
}

/// Synchronously read a whole mux `Message`
///
/// This function will synchronously read from the provided `&mut Read` until
/// it has received an entire mux `Message`
///
/// ```rust
/// use std::io::Cursor;
/// use mux::MessageFrame;
/// use mux::codec;
///
/// let mut r = Cursor::new(vec![0,0,0,4,65,0,0,0,1,0,0,0,0]); // ping frame plus 4 0's
/// let frame = codec::read_message(&mut r).unwrap().frame;
/// assert_eq!(frame, MessageFrame::Tping);
/// ```
pub fn read_message<R: Read + ?Sized>(input: &mut R) -> io::Result<Message> {
    let size = {
        let size = try!(input.read_i32::<BigEndian>());
        if size < 4 {
            let msg = format!("Invalid mux frame size: {}. Minimum 4 bytes.", size);
            return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
        }

        size as u64
    };

    decode_message(input.take(size))
}

/// Synchronously decode a mux `Message`
///
/// This function will synchronously read from the provided `&mut Read` until
/// it has decoded a message. Message length is assumed to be triggered by an EOF.
/// When using use a continuous stream, such as a `TcpStream`, use `read_message`.
///
/// ```rust
/// use std::io::Cursor;
/// use mux::MessageFrame;
/// use mux::codec;
///
/// let mut r = Cursor::new(vec![65,0,0,0,1]); // ping frame
/// let frame = codec::decode_message(&mut r).unwrap().frame;
/// assert_eq!(frame, MessageFrame::Tping);
/// ```
pub fn decode_message<R: Read>(mut read: R) -> io::Result<Message> {
    let tpe = try!(read.read_i8());
    let tag = try!(decode_tag(&mut read));
    let frame = try!(decode_frame(tpe, &mut read));

    Ok(Message {
        tag: tag,
        frame: frame,
    })
}

/// Synchronously encode a `Message` to the `Write` with the frame size
///
/// Convert the `Message` to a stream of bytes and write it too the `Write`
/// reference. If the `Write` blocks, this function blocks.
///
/// ```rust
/// use std::io::Cursor;
/// use mux::{Message, MessageFrame, Tag};
/// use mux::codec;
///
/// let mut w = Cursor::new(Vec::new());
/// let tag = Tag::new(true, 1);
/// let _ = codec::write_message(&mut w, &Message { tag: tag, frame: MessageFrame::Tping });
/// assert_eq!(w.into_inner(), vec![0,0,0,4,65,0,0,1]);
/// ```
pub fn write_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
    // the size is the buffer size + the header (id + tag)
    try!(buffer.write_i32::<BigEndian>(size::frame_size(&msg.frame) as i32 + 4));
    encode_message(buffer, msg)
}

/// Synchronously encode a `Message` to the `Write`
///
/// Convert the `Message` to a stream of bytes and write it too the `Write`
/// reference. If the `Write` blocks, this function blocks.
///
/// ```rust
/// use std::io::Cursor;
/// use mux::{Message, MessageFrame, Tag};
/// use mux::codec;
///
/// let mut w = Cursor::new(Vec::new());
/// let tag = Tag::new(true, 1);
/// let _ = codec::encode_message(&mut w, &Message { tag: tag, frame: MessageFrame::Tping });
/// assert_eq!(w.into_inner(), vec![65,0,0,1]);
/// ```
pub fn encode_message<W: Write + ?Sized>(buffer: &mut W, msg: &Message) -> io::Result<()> {
    try!(buffer.write_i8(msg.frame.frame_id()));
    try!(encode_tag(buffer, &msg.tag));
    encode_frame(buffer, &msg.frame)
}

// frame codec functions

/// Synchronously encode a `MessageFrame` to the `Write`
///
/// Convert the `FrameMessage` to a stream of bytes and write it too the `Write`
/// reference. If the `Write` blocks, this function blocks.
///
/// ```rust
/// use std::io::Cursor;
/// use mux::{Message, MessageFrame, Tag};
/// use mux::codec;
///
/// let mut w = Cursor::new(Vec::new());
/// let tag = Tag::new(true, 1);
/// let _ = codec::encode_frame(&mut w, &MessageFrame::Tping);
/// assert_eq!(w.into_inner(), vec![]); // Tping is 0 length
/// ```
pub fn encode_frame<W: Write + ?Sized>(writer: &mut W, frame: &MessageFrame) -> io::Result<()> {
    match *frame {
        MessageFrame::Treq(ref f) => encode_treq(writer, f),
        MessageFrame::Rreq(ref f) => encode_rreq(writer, f),
        MessageFrame::Tdispatch(ref f) => encode_tdispatch(writer, f),
        MessageFrame::Rdispatch(ref f) => encode_rdispatch(writer, f),
        MessageFrame::Tinit(ref f) => encode_init(writer, f),
        MessageFrame::Rinit(ref f) => encode_init(writer, f),
        // the following are empty messages
        MessageFrame::Tping => Ok(()),
        MessageFrame::Rping => Ok(()),
        MessageFrame::Tdrain => Ok(()),
        MessageFrame::Rdrain => Ok(()),
        MessageFrame::Tdiscarded(ref f) => encode_tdiscarded(writer, f),
        MessageFrame::Tlease(ref f) => encode_tlease(writer, f),
        MessageFrame::Rerr(ref f) => encode_rerr(writer, f),
    }
}

/// Synchronously decode a mux `MessageFrame`
///
/// This function will synchronously read from the provided `&mut Read` until
/// it has decoded a frame. Frame length is assumed to be triggered by an EOF.
/// When using use a continuous stream, such as a `TcpStream`, use `Read.take(n)`
/// if you know the frame length or use read_message above to decode an entire
/// `Message` and then extract the frame.
///
/// ```rust
/// use std::io::Cursor;
/// use mux::MessageFrame;
/// use mux::codec;
/// use mux::types;
///
/// let mut r = Cursor::new(vec![]); // ping frame is empty
/// let frame = codec::decode_frame(types::TPING, &mut r).unwrap();
/// assert_eq!(frame, MessageFrame::Tping);
/// ```
pub fn decode_frame<R: Read>(tpe: i8, reader: R) -> io::Result<MessageFrame> {
    Ok(match tpe {
        types::TREQ => MessageFrame::Treq(try!(decode_treq(reader))),
        types::RREQ => MessageFrame::Rreq(try!(decode_rreq(reader))),
        types::TDISPATCH => MessageFrame::Tdispatch(try!(decode_tdispatch(reader))),
        types::RDISPATCH => MessageFrame::Rdispatch(try!(decode_rdispatch(reader))),
        types::TINIT => MessageFrame::Tinit(try!(decode_init(reader))),
        types::RINIT => MessageFrame::Rinit(try!(decode_init(reader))),
        types::TDRAIN => MessageFrame::Tdrain,
        types::RDRAIN => MessageFrame::Rdrain,
        types::TPING => MessageFrame::Tping,
        types::RPING => MessageFrame::Rping,
        types::TLEASE => MessageFrame::Tlease(try!(decode_tlease(reader))),
        types::RERR => MessageFrame::Rerr(try!(decode_rerr(reader))),
        other => {
            return Err(
                io::Error::new(io::ErrorKind::InvalidInput,
                    format!("Invalid frame type: {}", other))
                );
        }
    })
}


///////////// Tlease codec function

pub fn decode_tlease<R: Read>(mut reader: R) -> io::Result<Tlease> {
    let howmuch = try!(reader.read_u8());
    let ticks = try!(reader.read_u64::<BigEndian>());

    if howmuch == 0 {
        Ok(Tlease {
            duration: Duration::from_millis(ticks),
        })
    } else {
        let message = format!("Unknown Tlease 'howmuch' code: {}", howmuch);
        Err(io::Error::new(ErrorKind::InvalidData, message))
    }
}

pub fn encode_tlease<W: Write + ?Sized>(writer: &mut W, tlease: &Tlease) -> io::Result<()> {
    let d = &tlease.duration;
    let millis = d.as_secs()*1000 + (((d.subsec_nanos() as f64)/1e6) as u64);
    try!(writer.write_u8(0));
    try!(writer.write_u64::<BigEndian>(millis));
    Ok(())
}

///////////// Tdiscarded codecs

#[inline]
pub fn decode_tdiscarded<R: Read>(mut reader: R) -> io::Result<Tdiscarded> {
    let mut bts = [0;3];
    try!(reader.read_exact(&mut bts[..]));
    let id = (bts[0] as u32) << 16 | (bts[1] as u32) <<  8 | (bts[2] as u32);
    let msg = try!(body_as_string(reader));

    Ok(Tdiscarded {
        id: id,
        msg: msg,
    })
}

#[inline]
pub fn encode_tdiscarded<W: Write + ?Sized>(writer: &mut W, msg: &Tdiscarded) -> io::Result<()> {
    let bts = [
        ((msg.id >> 16) & 0xff) as u8,
        ((msg.id >>  8) & 0xff) as u8,
        ( msg.id        & 0xff) as u8,
    ];
    try!(writer.write_all(&bts[..]));
    writer.write_all(msg.msg.as_bytes())
}

///////////// Tag codec functions

const TAG_END_MASK: u32 = 1 << 23; // 24th bit of tag
const TAG_ID_MASK: u32 = MAX_TAG;

pub fn decode_tag<R: Read + ?Sized>(reader: &mut R) -> io::Result<Tag> {
    let mut bts = [0; 3];
    let _ = try!(reader.read(&mut bts));

    let id = (bts[0] as u32) << 16 |
             (bts[1] as u32) <<  8 |
             (bts[2] as u32);

    Ok(Tag {
        end: id & TAG_END_MASK == 0,
        id: id & TAG_ID_MASK, // clear the last bit, its for the end flag
    })
}

#[inline]
pub fn encode_tag<W: Write + ?Sized>(buffer: &mut W, tag: &Tag) -> io::Result<()> {
    let bytes = {
        let id = tag.id;
        let endbit = if tag.end { 0 } else { 1 };
        [(id >> 16 & 0x7f) as u8 | (endbit << 7),
         (id >> 8 & 0xff) as u8,
         (id >> 0 & 0xff) as u8]
    };

    buffer.write_all(&bytes)
}

///////////// headers codec functions

pub fn encode_headers<W: Write + ?Sized>(writer: &mut W, headers: &Headers) -> io::Result<()> {
    chklen!(headers, u8::MAX, "Header count overflow");
    try!(writer.write_u8(headers.len() as u8));

    for &(ref k, ref v) in headers {
        chklen!(v, u8::MAX, "Header length overflow");

        try!(writer.write_u8(*k));
        try!(writer.write_u8(v.len() as u8));
        try!(writer.write_all(v));
    }
    Ok(())
}

pub fn decode_headers<R: Read + ?Sized>(reader: &mut R) -> io::Result<Headers> {
    let len = try!(reader.read_u8()) as usize;
    let mut acc = Vec::with_capacity(len);

    for _ in 0..len {
        let key = try!(reader.read_u8());
        let val_len = try!(reader.read_u8());
        let mut val = vec![0;val_len as usize];
        try!(reader.read_exact(&mut val[..]));
        acc.push((key, val));
    }

    Ok(acc)
}

///////////// Contexts codec functions

pub fn encode_contexts<W: Write + ?Sized>(writer: &mut W, contexts: &Contexts) -> io::Result<()> {
    chklen!(contexts, u16::MAX, "Context entries overflow");

    try!(writer.write_u16::<BigEndian>(contexts.len() as u16));
    for &(ref k, ref v) in contexts {
        chklen!(k, u16::MAX, "Context key overflow");
        try!(writer.write_u16::<BigEndian>(k.len() as u16));
        try!(writer.write_all(&k[..]));

        chklen!(v, u16::MAX, "Context value overflow");
        try!(writer.write_u16::<BigEndian>(v.len() as u16));
        try!(writer.write_all(&v[..]));
    }

    Ok(())
}

pub fn decode_contexts<R: Read + ?Sized>(reader: &mut R) -> io::Result<Contexts> {
    let len = try!(reader.read_u16::<BigEndian>()) as usize;

    let mut acc = Vec::with_capacity(len);

    for _ in 0..len {
        let key_len = try!(reader.read_u16::<BigEndian>());
        let mut key = vec![0;key_len as usize];
        try!(reader.read_exact(&mut key[..]));

        let val_len = try!(reader.read_u16::<BigEndian>());
        let mut val = vec![0;val_len as usize];
        try!(reader.read_exact(&mut val[..]));
        acc.push((key, val));
    }

    Ok(acc)
}

///////////// Dtab codec functions
pub fn decode_dtab<R: Read + ?Sized>(reader: &mut R) -> io::Result<Dtab> {
    let len = try!(reader.read_u16::<BigEndian>()) as usize;
    let mut acc = Vec::with_capacity(len);

    for _ in 0..len {
        let key_len = try!(reader.read_u16::<BigEndian>());
        let mut key = vec![0;key_len as usize];
        try!(reader.read_exact(&mut key[..]));

        let val_len = try!(reader.read_u16::<BigEndian>());
        let mut val = vec![0;val_len as usize];
        try!(reader.read_exact(&mut val[..]));
        acc.push(Dentry::new(try!(to_string(key)), try!(to_string(val))));
    }

    Ok(Dtab::from_entries(acc))
}

pub fn encode_dtab<W: Write + ?Sized>(writer: &mut W, table: &Dtab) -> io::Result<()> {
    chklen!(table.entries, u16::MAX, "Dtable length overflow");
    try!(writer.write_u16::<BigEndian>(table.entries.len() as u16));

    for dentry in &table.entries {
        // the string encoder will check for overflows
        try!(encode_u16_string(writer, &dentry.key));
        try!(encode_u16_string(writer, &dentry.val));
    }
    Ok(())
}

///////////// Rerr codec functions

#[inline]
pub fn decode_rerr<R: Read>(reader: R) -> io::Result<Rerr> {
    let msg = try!(body_as_string(reader));
    Ok(Rerr { msg: msg, })
}

#[inline]
pub fn encode_rerr<W: Write + ?Sized>(writer: &mut W, rerr: &Rerr) -> io::Result<()> {
    writer.write_all(rerr.msg.as_bytes())
}

///////////// Init codec functions

pub fn encode_init<W: Write + ?Sized>(writer: &mut W, msg: &Init) -> io::Result<()> {
    try!(writer.write_u16::<BigEndian>(msg.version));

    // Not going to bother checking for overflow: if a single one of the
    // entries overflows then the entire frame overflows which is not the
    // pervue of this function.

    for &(ref k, ref v) in &msg.headers {
        try!(writer.write_u32::<BigEndian>(k.len() as u32));
        try!(writer.write_all(k));
        try!(writer.write_u32::<BigEndian>(v.len() as u32));
        try!(writer.write_all(v));
    }

    Ok(())
}

pub fn decode_init<R: Read>(mut reader: R) -> io::Result<Init> {
    let mut headers = Vec::new();
    let version = try!(reader.read_u16::<BigEndian>());

    loop {
        let klen = match reader.read_u32::<BigEndian>() {
            Ok(len) => len,
            Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
                // termination: out of buffer.
                return Ok(
                    Init {
                        version: version,
                        headers: headers,
                    }
                );
            }
            Err(other) => { return Err(other); }
        };

        let mut k = vec![0;klen as usize];
        try!(reader.read_exact(&mut k));

        let vlen = try!(reader.read_u32::<BigEndian>());
        let mut v = vec![0;vlen as usize];
        try!(reader.read_exact(&mut v));

        headers.push((k, v));
    }
}

///////////// Rdispatch codec functions

pub fn encode_rdispatch<W: Write + ?Sized>(writer: &mut W, frame: &Rdispatch) -> io::Result<()> {
    let (status, body) = rmsg_status_body(&frame.msg);

    try!(writer.write_u8(status));
    try!(encode_contexts(writer, &frame.contexts));
    writer.write_all(body)
}

// Expects to consume the whole stream
pub fn decode_rdispatch<R: Read>(mut reader: R) -> io::Result<Rdispatch> {
    let status = try!(reader.read_u8());
    let contexts = try!(decode_contexts(&mut reader));
    let mut body = Vec::new();
    let _ = try!(reader.read_to_end(&mut body));

    Ok(Rdispatch {
        contexts: contexts,
        msg: try!(decode_rmsg_body(status, body)),
    })
}

///////////// Rreq codec functions

pub fn encode_rreq<W: Write + ?Sized>(writer: &mut W, frame: &Rmsg) -> io::Result<()> {
    let (status, body) = rmsg_status_body(frame);
    try!(writer.write_u8(status));
    writer.write_all(body)
}

pub fn decode_rreq<R: Read>(mut reader: R) -> io::Result<Rmsg> {
    let status = try!(reader.read_u8());
    let mut body = Vec::new();
    try!(reader.read_to_end(&mut body));
    decode_rmsg_body(status, body)
}

///////////// Tdispatch codec functions

pub fn decode_tdispatch<R: Read>(mut reader: R) -> io::Result<Tdispatch> {
    let contexts = try!(decode_contexts(&mut reader));
    let dest = try!(decode_u16_string(&mut reader));
    let dtab = try!(decode_dtab(&mut reader));

    let mut body = Vec::new();
    let _ = try!(reader.read_to_end(&mut body));

    Ok(Tdispatch {
        contexts: contexts,
        dest: dest,
        dtab: dtab,
        body: body,
    })
}

pub fn encode_tdispatch<W: Write + ?Sized>(writer: &mut W, msg: &Tdispatch) -> io::Result<()> {
    try!(encode_contexts(writer, &msg.contexts));
    try!(encode_u16_string(writer, &msg.dest));
    try!(encode_dtab(writer, &msg.dtab));
    writer.write_all(&msg.body)
}

///////////// Treq codec functions

pub fn decode_treq<R: Read>(mut reader: R) -> io::Result<Treq> {
    let headers = try!(decode_headers(&mut reader));
    let mut body = Vec::new();

    let _ = try!(reader.read_to_end(&mut body));
    Ok(Treq {
        headers: headers,
        body: body,
    })
}

#[inline]
pub fn encode_treq<W: Write + ?Sized>(writer: &mut W, msg: &Treq) -> io::Result<()> {
    try!(encode_headers(writer, &msg.headers));
    writer.write_all(&msg.body)
}

#[inline]
fn rmsg_status_body(msg: &Rmsg) -> (u8, &[u8]) {
    match *msg {
        Rmsg::Ok(ref body) => (0, body.as_ref()),
        Rmsg::Error(ref msg) => (1, msg.as_bytes()),
        Rmsg::Nack(ref msg) => (2, msg.as_bytes()),
    }
}

#[inline]
fn decode_rmsg_body(status: u8, body: Vec<u8>) -> io::Result<Rmsg> {
    match status {
        0 => Ok(Rmsg::Ok(body)),
        1 => Ok(Rmsg::Error(try!(to_string(body)))),
        2 => Ok(Rmsg::Nack(try!(to_string(body)))),
        other => Err(
            io::Error::new(ErrorKind::InvalidData, format!("Invalid status code: {}", other))
        )
    }
}

// tools for operating on Strings

// decode a utf8 string with length specified by a u16 prefix byte
#[inline]
pub fn decode_u16_string<R: Read + ?Sized>(reader: &mut R) -> io::Result<String> {
    let str_len = try!(reader.read_u16::<BigEndian>());
    let mut s = vec![0; str_len as usize];

    try!(reader.read_exact(&mut s));

    to_string(s)
}

#[inline]
pub fn encode_u16_string<W: Write + ?Sized>(writer: &mut W, s: &str) -> io::Result<()> {
    let bytes = s.as_bytes();

    chklen!(bytes, u16::MAX, "16bit length delimited string overflow");
    try!(writer.write_u16::<BigEndian>(bytes.len() as u16));
    writer.write_all(bytes)
}

#[inline]
fn to_string(vec: Vec<u8>) -> io::Result<String> {
    match String::from_utf8(vec) {
        Ok(s) => Ok(s),
        Err(_) => Err(io::Error::new(ErrorKind::InvalidData, "Invalid UTF8 field")),
    }
}

#[inline]
fn body_as_string<R: Read>(mut reader: R) -> io::Result<String> {
    let mut data = Vec::new();
    let _ = try!(reader.read_to_end(&mut data));
    to_string(data)
}