record_query/value/
messagepack.rs

1use std::io;
2
3use ordered_float;
4use rmpv;
5
6use crate::error;
7use crate::value;
8
9#[derive(Debug)]
10pub struct MessagePackSource<R>(R)
11where
12    R: io::Read;
13
14#[derive(Debug)]
15pub struct MessagePackSink<W>(W)
16where
17    W: io::Write;
18
19#[inline]
20pub fn source<R>(r: R) -> MessagePackSource<R>
21where
22    R: io::Read,
23{
24    MessagePackSource(r)
25}
26
27#[inline]
28pub fn sink<W>(w: W) -> MessagePackSink<W>
29where
30    W: io::Write,
31{
32    MessagePackSink(w)
33}
34
35impl<R> value::Source for MessagePackSource<R>
36where
37    R: io::Read,
38{
39    #[inline]
40    fn read(&mut self) -> error::Result<Option<value::Value>> {
41        use rmpv::decode::Error;
42
43        match rmpv::decode::value::read_value(&mut self.0) {
44            Ok(v) => Ok(Some(value_from_message_pack(v)?)),
45            Err(Error::InvalidMarkerRead(ref e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
46                Ok(None)
47            }
48            Err(e) => Err(error::Error::MessagePackDecode(e)),
49        }
50    }
51}
52
53impl<W> value::Sink for MessagePackSink<W>
54where
55    W: io::Write,
56{
57    #[inline]
58    fn write(&mut self, v: value::Value) -> error::Result<()> {
59        rmpv::encode::write_value(&mut self.0, &value_to_message_pack(v)).map_err(From::from)
60    }
61}
62
63fn value_from_message_pack(value: rmpv::Value) -> error::Result<value::Value> {
64    use rmpv::Value;
65    match value {
66        Value::Nil => Ok(value::Value::Unit),
67        Value::Boolean(v) => Ok(value::Value::Bool(v)),
68        Value::Integer(i) if i.is_u64() => Ok(value::Value::U64(i.as_u64().unwrap())),
69        Value::Integer(i) if i.is_i64() => Ok(value::Value::I64(i.as_i64().unwrap())),
70        Value::Integer(_) => unreachable!(),
71        Value::F32(v) => Ok(value::Value::from_f32(v)),
72        Value::F64(v) => Ok(value::Value::from_f64(v)),
73        Value::String(v) => {
74            if v.is_err() {
75                Err(error::Error::Format {
76                    msg: v.as_err().unwrap().to_string(),
77                })
78            } else {
79                Ok(value::Value::String(v.into_str().unwrap()))
80            }
81        }
82        Value::Ext(_, v) | Value::Binary(v) => Ok(value::Value::Bytes(v)),
83        Value::Array(v) => Ok(value::Value::Sequence(
84            v.into_iter()
85                .map(value_from_message_pack)
86                .collect::<error::Result<_>>()?,
87        )),
88        Value::Map(v) => Ok(value::Value::Map(
89            v.into_iter()
90                .map(|(k, v)| Ok((value_from_message_pack(k)?, value_from_message_pack(v)?)))
91                .collect::<error::Result<_>>()?,
92        )),
93    }
94}
95
96fn value_to_message_pack(value: value::Value) -> rmpv::Value {
97    use rmpv::Value;
98    match value {
99        value::Value::Unit => Value::Nil,
100        value::Value::Bool(v) => Value::Boolean(v),
101
102        value::Value::I8(v) => Value::Integer(v.into()),
103        value::Value::I16(v) => Value::Integer(v.into()),
104        value::Value::I32(v) => Value::Integer(v.into()),
105        value::Value::I64(v) => Value::Integer(v.into()),
106
107        value::Value::U8(v) => Value::Integer(v.into()),
108        value::Value::U16(v) => Value::Integer(v.into()),
109        value::Value::U32(v) => Value::Integer(v.into()),
110        value::Value::U64(v) => Value::Integer(v.into()),
111
112        value::Value::F32(ordered_float::OrderedFloat(v)) => Value::F32(v),
113        value::Value::F64(ordered_float::OrderedFloat(v)) => Value::F64(v),
114
115        value::Value::Char(v) => Value::String(format!("{}", v).into()),
116        value::Value::String(v) => Value::String(v.into()),
117        value::Value::Bytes(v) => Value::Binary(v),
118
119        value::Value::Sequence(v) => {
120            Value::Array(v.into_iter().map(value_to_message_pack).collect())
121        }
122        value::Value::Map(v) => Value::Map(
123            v.into_iter()
124                .map(|(k, v)| (value_to_message_pack(k), value_to_message_pack(v)))
125                .collect(),
126        ),
127    }
128}