nu_command/formats/from/
msgpack.rs

1// Credit to https://github.com/hulthe/nu_plugin_msgpack for the original idea, though the
2// implementation here is unique.
3
4use std::{
5    error::Error,
6    io::{self, Cursor, ErrorKind},
7    string::FromUtf8Error,
8};
9
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::{TimeZone, Utc};
12use nu_engine::command_prelude::*;
13use nu_protocol::Signals;
14use rmp::decode::{self as mp, ValueReadError};
15
16/// Max recursion depth
17const MAX_DEPTH: usize = 50;
18
19#[derive(Clone)]
20pub struct FromMsgpack;
21
22impl Command for FromMsgpack {
23    fn name(&self) -> &str {
24        "from msgpack"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(self.name())
29            .input_output_type(Type::Binary, Type::Any)
30            .switch("objects", "Read multiple objects from input", None)
31            .category(Category::Formats)
32    }
33
34    fn description(&self) -> &str {
35        "Convert MessagePack data into Nu values."
36    }
37
38    fn extra_description(&self) -> &str {
39        r#"
40Not all values are representable as MessagePack.
41
42The datetime extension type is read as dates. MessagePack binary values are
43read to their Nu equivalent. Most other types are read in an analogous way to
44`from json`, and may not convert to the exact same type if `to msgpack` was
45used originally to create the data.
46
47MessagePack: https://msgpack.org/
48"#
49    }
50
51    fn examples(&self) -> Vec<Example> {
52        vec![
53            Example {
54                description: "Read a list of values from MessagePack",
55                example: "0x[93A3666F6F2AC2] | from msgpack",
56                result: Some(Value::test_list(vec![
57                    Value::test_string("foo"),
58                    Value::test_int(42),
59                    Value::test_bool(false),
60                ])),
61            },
62            Example {
63                description: "Read a stream of multiple values from MessagePack",
64                example: "0x[81A76E757368656C6CA5726F636B73A9736572696F75736C79] | from msgpack --objects",
65                result: Some(Value::test_list(vec![
66                    Value::test_record(record! {
67                        "nushell" => Value::test_string("rocks"),
68                    }),
69                    Value::test_string("seriously"),
70                ])),
71            },
72            Example {
73                description: "Read a table from MessagePack",
74                example: "0x[9282AA6576656E745F6E616D65B141706F6C6C6F203131204C616E64696E67A474696D65C70CFF00000000FFFFFFFFFF2CAB5B82AA6576656E745F6E616D65B44E757368656C6C20666972737420636F6D6D6974A474696D65D6FF5CD5ADE0] | from msgpack",
75                result: Some(Value::test_list(vec![
76                    Value::test_record(record! {
77                        "event_name" => Value::test_string("Apollo 11 Landing"),
78                        "time" => Value::test_date(Utc.with_ymd_and_hms(
79                            1969,
80                            7,
81                            24,
82                            16,
83                            50,
84                            35,
85                        ).unwrap().into())
86                    }),
87                    Value::test_record(record! {
88                        "event_name" => Value::test_string("Nushell first commit"),
89                        "time" => Value::test_date(Utc.with_ymd_and_hms(
90                            2019,
91                            5,
92                            10,
93                            16,
94                            59,
95                            12,
96                        ).unwrap().into())
97                    }),
98                ])),
99            },
100        ]
101    }
102
103    fn run(
104        &self,
105        engine_state: &EngineState,
106        stack: &mut Stack,
107        call: &Call,
108        input: PipelineData,
109    ) -> Result<PipelineData, ShellError> {
110        let objects = call.has_flag(engine_state, stack, "objects")?;
111        let opts = Opts {
112            span: call.head,
113            objects,
114            signals: engine_state.signals().clone(),
115        };
116        let metadata = input.metadata().map(|md| md.with_content_type(None));
117        let out = match input {
118            // Deserialize from a byte buffer
119            PipelineData::Value(Value::Binary { val: bytes, .. }, _) => {
120                read_msgpack(Cursor::new(bytes), opts)
121            }
122            // Deserialize from a raw stream directly without having to collect it
123            PipelineData::ByteStream(stream, ..) => {
124                let span = stream.span();
125                if let Some(reader) = stream.reader() {
126                    read_msgpack(reader, opts)
127                } else {
128                    Err(ShellError::PipelineMismatch {
129                        exp_input_type: "binary or byte stream".into(),
130                        dst_span: call.head,
131                        src_span: span,
132                    })
133                }
134            }
135            input => Err(ShellError::PipelineMismatch {
136                exp_input_type: "binary or byte stream".into(),
137                dst_span: call.head,
138                src_span: input.span().unwrap_or(call.head),
139            }),
140        };
141        out.map(|pd| pd.set_metadata(metadata))
142    }
143}
144
145#[derive(Debug)]
146pub(crate) enum ReadError {
147    MaxDepth(Span),
148    Io(io::Error, Span),
149    TypeMismatch(rmp::Marker, Span),
150    Utf8(FromUtf8Error, Span),
151    Shell(Box<ShellError>),
152}
153
154impl From<Box<ShellError>> for ReadError {
155    fn from(v: Box<ShellError>) -> Self {
156        Self::Shell(v)
157    }
158}
159
160impl From<ShellError> for ReadError {
161    fn from(value: ShellError) -> Self {
162        Box::new(value).into()
163    }
164}
165
166impl From<Spanned<ValueReadError>> for ReadError {
167    fn from(value: Spanned<ValueReadError>) -> Self {
168        match value.item {
169            // All I/O errors:
170            ValueReadError::InvalidMarkerRead(err) | ValueReadError::InvalidDataRead(err) => {
171                ReadError::Io(err, value.span)
172            }
173            ValueReadError::TypeMismatch(marker) => ReadError::TypeMismatch(marker, value.span),
174        }
175    }
176}
177
178impl From<Spanned<io::Error>> for ReadError {
179    fn from(value: Spanned<io::Error>) -> Self {
180        ReadError::Io(value.item, value.span)
181    }
182}
183
184impl From<Spanned<FromUtf8Error>> for ReadError {
185    fn from(value: Spanned<FromUtf8Error>) -> Self {
186        ReadError::Utf8(value.item, value.span)
187    }
188}
189
190impl From<ReadError> for ShellError {
191    fn from(value: ReadError) -> Self {
192        match value {
193            ReadError::MaxDepth(span) => ShellError::GenericError {
194                error: "MessagePack data is nested too deeply".into(),
195                msg: format!("exceeded depth limit ({MAX_DEPTH})"),
196                span: Some(span),
197                help: None,
198                inner: vec![],
199            },
200            ReadError::Io(err, span) => ShellError::GenericError {
201                error: "Error while reading MessagePack data".into(),
202                msg: err.to_string(),
203                span: Some(span),
204                help: None,
205                // Take the inner ShellError
206                inner: err
207                    .source()
208                    .and_then(|s| s.downcast_ref::<ShellError>())
209                    .cloned()
210                    .into_iter()
211                    .collect(),
212            },
213            ReadError::TypeMismatch(marker, span) => ShellError::GenericError {
214                error: "Invalid marker while reading MessagePack data".into(),
215                msg: format!("unexpected {:?} in data", marker),
216                span: Some(span),
217                help: None,
218                inner: vec![],
219            },
220            ReadError::Utf8(err, span) => ShellError::NonUtf8Custom {
221                msg: format!("in MessagePack data: {err}"),
222                span,
223            },
224            ReadError::Shell(err) => *err,
225        }
226    }
227}
228
229pub(crate) struct Opts {
230    pub span: Span,
231    pub objects: bool,
232    pub signals: Signals,
233}
234
235/// Read single or multiple values into PipelineData
236pub(crate) fn read_msgpack(
237    mut input: impl io::Read + Send + 'static,
238    opts: Opts,
239) -> Result<PipelineData, ShellError> {
240    let Opts {
241        span,
242        objects,
243        signals,
244    } = opts;
245    if objects {
246        // Make an iterator that reads multiple values from the reader
247        let mut done = false;
248        Ok(std::iter::from_fn(move || {
249            if !done {
250                let result = read_value(&mut input, span, 0);
251                match result {
252                    Ok(value) => Some(value),
253                    // Any error should cause us to not read anymore
254                    Err(ReadError::Io(err, _)) if err.kind() == ErrorKind::UnexpectedEof => {
255                        done = true;
256                        None
257                    }
258                    Err(other_err) => {
259                        done = true;
260                        Some(Value::error(other_err.into(), span))
261                    }
262                }
263            } else {
264                None
265            }
266        })
267        .into_pipeline_data(span, signals))
268    } else {
269        // Read a single value and then make sure it's EOF
270        let result = read_value(&mut input, span, 0)?;
271        assert_eof(&mut input, span)?;
272        Ok(result.into_pipeline_data())
273    }
274}
275
276fn read_value(input: &mut impl io::Read, span: Span, depth: usize) -> Result<Value, ReadError> {
277    // Prevent stack overflow
278    if depth >= MAX_DEPTH {
279        return Err(ReadError::MaxDepth(span));
280    }
281
282    let marker = mp::read_marker(input)
283        .map_err(ValueReadError::from)
284        .err_span(span)?;
285
286    // We decide what kind of value to make depending on the marker. rmp doesn't really provide us
287    // a lot of utilities for reading the data after the marker, I think they assume you want to
288    // use rmp-serde or rmpv, but we don't have that kind of serde implementation for Value and so
289    // hand-written deserialization is going to be the fastest
290    match marker {
291        rmp::Marker::FixPos(num) => Ok(Value::int(num as i64, span)),
292        rmp::Marker::FixNeg(num) => Ok(Value::int(num as i64, span)),
293        rmp::Marker::Null => Ok(Value::nothing(span)),
294        rmp::Marker::True => Ok(Value::bool(true, span)),
295        rmp::Marker::False => Ok(Value::bool(false, span)),
296        rmp::Marker::U8 => from_int(input.read_u8(), span),
297        rmp::Marker::U16 => from_int(input.read_u16::<BigEndian>(), span),
298        rmp::Marker::U32 => from_int(input.read_u32::<BigEndian>(), span),
299        rmp::Marker::U64 => {
300            // u64 can be too big
301            let val_u64 = input.read_u64::<BigEndian>().err_span(span)?;
302            val_u64
303                .try_into()
304                .map(|val| Value::int(val, span))
305                .map_err(|err| {
306                    ShellError::GenericError {
307                        error: "MessagePack integer too big for Nushell".into(),
308                        msg: err.to_string(),
309                        span: Some(span),
310                        help: None,
311                        inner: vec![],
312                    }
313                    .into()
314                })
315        }
316        rmp::Marker::I8 => from_int(input.read_i8(), span),
317        rmp::Marker::I16 => from_int(input.read_i16::<BigEndian>(), span),
318        rmp::Marker::I32 => from_int(input.read_i32::<BigEndian>(), span),
319        rmp::Marker::I64 => from_int(input.read_i64::<BigEndian>(), span),
320        rmp::Marker::F32 => Ok(Value::float(
321            input.read_f32::<BigEndian>().err_span(span)? as f64,
322            span,
323        )),
324        rmp::Marker::F64 => Ok(Value::float(
325            input.read_f64::<BigEndian>().err_span(span)?,
326            span,
327        )),
328        rmp::Marker::FixStr(len) => read_str(input, len as usize, span),
329        rmp::Marker::Str8 => {
330            let len = input.read_u8().err_span(span)?;
331            read_str(input, len as usize, span)
332        }
333        rmp::Marker::Str16 => {
334            let len = input.read_u16::<BigEndian>().err_span(span)?;
335            read_str(input, len as usize, span)
336        }
337        rmp::Marker::Str32 => {
338            let len = input.read_u32::<BigEndian>().err_span(span)?;
339            read_str(input, len as usize, span)
340        }
341        rmp::Marker::Bin8 => {
342            let len = input.read_u8().err_span(span)?;
343            read_bin(input, len as usize, span)
344        }
345        rmp::Marker::Bin16 => {
346            let len = input.read_u16::<BigEndian>().err_span(span)?;
347            read_bin(input, len as usize, span)
348        }
349        rmp::Marker::Bin32 => {
350            let len = input.read_u32::<BigEndian>().err_span(span)?;
351            read_bin(input, len as usize, span)
352        }
353        rmp::Marker::FixArray(len) => read_array(input, len as usize, span, depth),
354        rmp::Marker::Array16 => {
355            let len = input.read_u16::<BigEndian>().err_span(span)?;
356            read_array(input, len as usize, span, depth)
357        }
358        rmp::Marker::Array32 => {
359            let len = input.read_u32::<BigEndian>().err_span(span)?;
360            read_array(input, len as usize, span, depth)
361        }
362        rmp::Marker::FixMap(len) => read_map(input, len as usize, span, depth),
363        rmp::Marker::Map16 => {
364            let len = input.read_u16::<BigEndian>().err_span(span)?;
365            read_map(input, len as usize, span, depth)
366        }
367        rmp::Marker::Map32 => {
368            let len = input.read_u32::<BigEndian>().err_span(span)?;
369            read_map(input, len as usize, span, depth)
370        }
371        rmp::Marker::FixExt1 => read_ext(input, 1, span),
372        rmp::Marker::FixExt2 => read_ext(input, 2, span),
373        rmp::Marker::FixExt4 => read_ext(input, 4, span),
374        rmp::Marker::FixExt8 => read_ext(input, 8, span),
375        rmp::Marker::FixExt16 => read_ext(input, 16, span),
376        rmp::Marker::Ext8 => {
377            let len = input.read_u8().err_span(span)?;
378            read_ext(input, len as usize, span)
379        }
380        rmp::Marker::Ext16 => {
381            let len = input.read_u16::<BigEndian>().err_span(span)?;
382            read_ext(input, len as usize, span)
383        }
384        rmp::Marker::Ext32 => {
385            let len = input.read_u32::<BigEndian>().err_span(span)?;
386            read_ext(input, len as usize, span)
387        }
388        mk @ rmp::Marker::Reserved => Err(ReadError::TypeMismatch(mk, span)),
389    }
390}
391
392fn read_str(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
393    let mut buf = vec![0; len];
394    input.read_exact(&mut buf).err_span(span)?;
395    Ok(Value::string(String::from_utf8(buf).err_span(span)?, span))
396}
397
398fn read_bin(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
399    let mut buf = vec![0; len];
400    input.read_exact(&mut buf).err_span(span)?;
401    Ok(Value::binary(buf, span))
402}
403
404fn read_array(
405    input: &mut impl io::Read,
406    len: usize,
407    span: Span,
408    depth: usize,
409) -> Result<Value, ReadError> {
410    let vec = (0..len)
411        .map(|_| read_value(input, span, depth + 1))
412        .collect::<Result<Vec<Value>, ReadError>>()?;
413    Ok(Value::list(vec, span))
414}
415
416fn read_map(
417    input: &mut impl io::Read,
418    len: usize,
419    span: Span,
420    depth: usize,
421) -> Result<Value, ReadError> {
422    let rec = (0..len)
423        .map(|_| {
424            let key = read_value(input, span, depth + 1)?
425                .into_string()
426                .map_err(|_| ShellError::GenericError {
427                    error: "Invalid non-string value in MessagePack map".into(),
428                    msg: "only maps with string keys are supported".into(),
429                    span: Some(span),
430                    help: None,
431                    inner: vec![],
432                })?;
433            let val = read_value(input, span, depth + 1)?;
434            Ok((key, val))
435        })
436        .collect::<Result<Record, ReadError>>()?;
437    Ok(Value::record(rec, span))
438}
439
440fn read_ext(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
441    let ty = input.read_i8().err_span(span)?;
442    match (ty, len) {
443        // "timestamp 32" - u32 seconds only
444        (-1, 4) => {
445            let seconds = input.read_u32::<BigEndian>().err_span(span)?;
446            make_date(seconds as i64, 0, span)
447        }
448        // "timestamp 64" - nanoseconds + seconds packed into u64
449        (-1, 8) => {
450            let packed = input.read_u64::<BigEndian>().err_span(span)?;
451            let nanos = packed >> 34;
452            let secs = packed & ((1 << 34) - 1);
453            make_date(secs as i64, nanos as u32, span)
454        }
455        // "timestamp 96" - nanoseconds + seconds
456        (-1, 12) => {
457            let nanos = input.read_u32::<BigEndian>().err_span(span)?;
458            let secs = input.read_i64::<BigEndian>().err_span(span)?;
459            make_date(secs, nanos, span)
460        }
461        _ => Err(ShellError::GenericError {
462            error: "Unknown MessagePack extension".into(),
463            msg: format!("encountered extension type {ty}, length {len}"),
464            span: Some(span),
465            help: Some("only the timestamp extension (-1) is supported".into()),
466            inner: vec![],
467        }
468        .into()),
469    }
470}
471
472fn make_date(secs: i64, nanos: u32, span: Span) -> Result<Value, ReadError> {
473    match Utc.timestamp_opt(secs, nanos) {
474        chrono::offset::LocalResult::Single(dt) => Ok(Value::date(dt.into(), span)),
475        _ => Err(ShellError::GenericError {
476            error: "Invalid MessagePack timestamp".into(),
477            msg: "datetime is out of supported range".into(),
478            span: Some(span),
479            help: Some("nanoseconds must be less than 1 billion".into()),
480            inner: vec![],
481        }
482        .into()),
483    }
484}
485
486fn from_int<T>(num: Result<T, std::io::Error>, span: Span) -> Result<Value, ReadError>
487where
488    T: Into<i64>,
489{
490    num.map(|num| Value::int(num.into(), span))
491        .map_err(|err| ReadError::Io(err, span))
492}
493
494/// Return an error if this is not the end of file.
495///
496/// This can help detect if parsing succeeded incorrectly, perhaps due to corruption.
497fn assert_eof(input: &mut impl io::Read, span: Span) -> Result<(), ShellError> {
498    let mut buf = [0u8];
499    match input.read_exact(&mut buf) {
500        // End of file
501        Err(_) => Ok(()),
502        // More bytes
503        Ok(()) => Err(ShellError::GenericError {
504            error: "Additional data after end of MessagePack object".into(),
505            msg: "there was more data available after parsing".into(),
506            span: Some(span),
507            help: Some("this might be invalid data, but you can use `from msgpack --objects` to read multiple objects".into()),
508            inner: vec![],
509        })
510    }
511}
512
513#[cfg(test)]
514mod test {
515    use nu_cmd_lang::eval_pipeline_without_terminal_expression;
516
517    use crate::{Metadata, MetadataSet, ToMsgpack};
518
519    use super::*;
520
521    #[test]
522    fn test_examples() {
523        use crate::test_examples;
524
525        test_examples(FromMsgpack {})
526    }
527
528    #[test]
529    fn test_content_type_metadata() {
530        let mut engine_state = Box::new(EngineState::new());
531        let delta = {
532            let mut working_set = StateWorkingSet::new(&engine_state);
533
534            working_set.add_decl(Box::new(ToMsgpack {}));
535            working_set.add_decl(Box::new(FromMsgpack {}));
536            working_set.add_decl(Box::new(Metadata {}));
537            working_set.add_decl(Box::new(MetadataSet {}));
538
539            working_set.render()
540        };
541
542        engine_state
543            .merge_delta(delta)
544            .expect("Error merging delta");
545
546        let cmd = r#"{a: 1 b: 2} | to msgpack | metadata set --datasource-ls | from msgpack | metadata | $in"#;
547        let result = eval_pipeline_without_terminal_expression(
548            cmd,
549            std::env::temp_dir().as_ref(),
550            &mut engine_state,
551        );
552        assert_eq!(
553            Value::test_record(record!("source" => Value::test_string("ls"))),
554            result.expect("There should be a result")
555        )
556    }
557}