record_query/value/
messagepack.rs1use std::io;
2
3use ordered_float;
4use rmpv;
5
6use crate::error;
7use crate::value;
8
9#[derive(Debug)]
10pub struct MessagePackSource<R>(R)
11where
12 R: io::Read;
13
14#[derive(Debug)]
15pub struct MessagePackSink<W>(W)
16where
17 W: io::Write;
18
19#[inline]
20pub fn source<R>(r: R) -> MessagePackSource<R>
21where
22 R: io::Read,
23{
24 MessagePackSource(r)
25}
26
27#[inline]
28pub fn sink<W>(w: W) -> MessagePackSink<W>
29where
30 W: io::Write,
31{
32 MessagePackSink(w)
33}
34
35impl<R> value::Source for MessagePackSource<R>
36where
37 R: io::Read,
38{
39 #[inline]
40 fn read(&mut self) -> error::Result<Option<value::Value>> {
41 use rmpv::decode::Error;
42
43 match rmpv::decode::value::read_value(&mut self.0) {
44 Ok(v) => Ok(Some(value_from_message_pack(v)?)),
45 Err(Error::InvalidMarkerRead(ref e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
46 Ok(None)
47 }
48 Err(e) => Err(error::Error::MessagePackDecode(e)),
49 }
50 }
51}
52
53impl<W> value::Sink for MessagePackSink<W>
54where
55 W: io::Write,
56{
57 #[inline]
58 fn write(&mut self, v: value::Value) -> error::Result<()> {
59 rmpv::encode::write_value(&mut self.0, &value_to_message_pack(v)).map_err(From::from)
60 }
61}
62
63fn value_from_message_pack(value: rmpv::Value) -> error::Result<value::Value> {
64 use rmpv::Value;
65 match value {
66 Value::Nil => Ok(value::Value::Unit),
67 Value::Boolean(v) => Ok(value::Value::Bool(v)),
68 Value::Integer(i) if i.is_u64() => Ok(value::Value::U64(i.as_u64().unwrap())),
69 Value::Integer(i) if i.is_i64() => Ok(value::Value::I64(i.as_i64().unwrap())),
70 Value::Integer(_) => unreachable!(),
71 Value::F32(v) => Ok(value::Value::from_f32(v)),
72 Value::F64(v) => Ok(value::Value::from_f64(v)),
73 Value::String(v) => {
74 if v.is_err() {
75 Err(error::Error::Format {
76 msg: v.as_err().unwrap().to_string(),
77 })
78 } else {
79 Ok(value::Value::String(v.into_str().unwrap()))
80 }
81 }
82 Value::Ext(_, v) | Value::Binary(v) => Ok(value::Value::Bytes(v)),
83 Value::Array(v) => Ok(value::Value::Sequence(
84 v.into_iter()
85 .map(value_from_message_pack)
86 .collect::<error::Result<_>>()?,
87 )),
88 Value::Map(v) => Ok(value::Value::Map(
89 v.into_iter()
90 .map(|(k, v)| Ok((value_from_message_pack(k)?, value_from_message_pack(v)?)))
91 .collect::<error::Result<_>>()?,
92 )),
93 }
94}
95
96fn value_to_message_pack(value: value::Value) -> rmpv::Value {
97 use rmpv::Value;
98 match value {
99 value::Value::Unit => Value::Nil,
100 value::Value::Bool(v) => Value::Boolean(v),
101
102 value::Value::I8(v) => Value::Integer(v.into()),
103 value::Value::I16(v) => Value::Integer(v.into()),
104 value::Value::I32(v) => Value::Integer(v.into()),
105 value::Value::I64(v) => Value::Integer(v.into()),
106
107 value::Value::U8(v) => Value::Integer(v.into()),
108 value::Value::U16(v) => Value::Integer(v.into()),
109 value::Value::U32(v) => Value::Integer(v.into()),
110 value::Value::U64(v) => Value::Integer(v.into()),
111
112 value::Value::F32(ordered_float::OrderedFloat(v)) => Value::F32(v),
113 value::Value::F64(ordered_float::OrderedFloat(v)) => Value::F64(v),
114
115 value::Value::Char(v) => Value::String(format!("{}", v).into()),
116 value::Value::String(v) => Value::String(v.into()),
117 value::Value::Bytes(v) => Value::Binary(v),
118
119 value::Value::Sequence(v) => {
120 Value::Array(v.into_iter().map(value_to_message_pack).collect())
121 }
122 value::Value::Map(v) => Value::Map(
123 v.into_iter()
124 .map(|(k, v)| (value_to_message_pack(k), value_to_message_pack(v)))
125 .collect(),
126 ),
127 }
128}