Skip to main content

nu_command/formats/from/
msgpack.rs

1// Credit to https://github.com/hulthe/nu_plugin_msgpack for the original idea, though the
2// implementation here is unique.
3
4use std::{
5    error::Error,
6    io::{self, Cursor, ErrorKind},
7    string::FromUtf8Error,
8};
9
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::{TimeZone, Utc};
12use nu_engine::command_prelude::*;
13use nu_protocol::{Signals, shell_error::generic::GenericError};
14use rmp::decode::{self as mp, ValueReadError};
15
16/// Max recursion depth
17const MAX_DEPTH: usize = 50;
18
19#[derive(Clone)]
20pub struct FromMsgpack;
21
22impl Command for FromMsgpack {
23    fn name(&self) -> &str {
24        "from msgpack"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(self.name())
29            .input_output_type(Type::Binary, Type::Any)
30            .switch("objects", "Read multiple objects from input.", None)
31            .category(Category::Formats)
32    }
33
34    fn description(&self) -> &str {
35        "Convert MessagePack data into Nu values."
36    }
37
38    fn extra_description(&self) -> &str {
39        "
40Not all values are representable as MessagePack.
41
42The datetime extension type is read as dates. MessagePack binary values are
43read to their Nu equivalent. Most other types are read in an analogous way to
44`from json`, and may not convert to the exact same type if `to msgpack` was
45used originally to create the data.
46
47MessagePack: https://msgpack.org/
48"
49    }
50
51    fn examples(&self) -> Vec<Example<'_>> {
52        vec![
53            Example {
54                description: "Read a list of values from MessagePack.",
55                example: "0x[93A3666F6F2AC2] | from msgpack",
56                result: Some(Value::test_list(vec![
57                    Value::test_string("foo"),
58                    Value::test_int(42),
59                    Value::test_bool(false),
60                ])),
61            },
62            Example {
63                description: "Read a stream of multiple values from MessagePack.",
64                example: "0x[81A76E757368656C6CA5726F636B73A9736572696F75736C79] | from msgpack --objects",
65                result: Some(Value::test_list(vec![
66                    Value::test_record(record! {
67                        "nushell" => Value::test_string("rocks"),
68                    }),
69                    Value::test_string("seriously"),
70                ])),
71            },
72            Example {
73                description: "Read a table from MessagePack.",
74                example: "0x[9282AA6576656E745F6E616D65B141706F6C6C6F203131204C616E64696E67A474696D65C70CFF00000000FFFFFFFFFF2CAB5B82AA6576656E745F6E616D65B44E757368656C6C20666972737420636F6D6D6974A474696D65D6FF5CD5ADE0] | from msgpack",
75                result: Some(Value::test_list(vec![
76                    Value::test_record(record! {
77                        "event_name" => Value::test_string("Apollo 11 Landing"),
78                        "time" => Value::test_date(Utc.with_ymd_and_hms(
79                            1969,
80                            7,
81                            24,
82                            16,
83                            50,
84                            35,
85                        ).unwrap().into())
86                    }),
87                    Value::test_record(record! {
88                        "event_name" => Value::test_string("Nushell first commit"),
89                        "time" => Value::test_date(Utc.with_ymd_and_hms(
90                            2019,
91                            5,
92                            10,
93                            16,
94                            59,
95                            12,
96                        ).unwrap().into())
97                    }),
98                ])),
99            },
100        ]
101    }
102
103    fn run(
104        &self,
105        engine_state: &EngineState,
106        stack: &mut Stack,
107        call: &Call,
108        mut input: PipelineData,
109    ) -> Result<PipelineData, ShellError> {
110        let objects = call.has_flag(engine_state, stack, "objects")?;
111        let opts = Opts {
112            span: call.head,
113            objects,
114            signals: engine_state.signals().clone(),
115        };
116        let metadata = input.take_metadata().map(|md| md.with_content_type(None));
117        let out = match input {
118            // Deserialize from a byte buffer
119            PipelineData::Value(Value::Binary { val: bytes, .. }, _) => {
120                read_msgpack(Cursor::new(bytes), opts)
121            }
122            // Deserialize from a raw stream directly without having to collect it
123            PipelineData::ByteStream(stream, ..) => {
124                let span = stream.span();
125                if let Some(reader) = stream.reader() {
126                    read_msgpack(reader, opts)
127                } else {
128                    Err(ShellError::PipelineMismatch {
129                        exp_input_type: "binary or byte stream".into(),
130                        dst_span: call.head,
131                        src_span: span,
132                    })
133                }
134            }
135            input => Err(ShellError::PipelineMismatch {
136                exp_input_type: "binary or byte stream".into(),
137                dst_span: call.head,
138                src_span: input.span().unwrap_or(call.head),
139            }),
140        };
141        out.map(|pd| pd.set_metadata(metadata))
142    }
143}
144
145#[derive(Debug)]
146pub(crate) enum ReadError {
147    MaxDepth(Span),
148    Io(io::Error, Span),
149    TypeMismatch(rmp::Marker, Span),
150    Utf8(FromUtf8Error, Span),
151    Shell(Box<ShellError>),
152}
153
154impl From<Box<ShellError>> for ReadError {
155    fn from(v: Box<ShellError>) -> Self {
156        Self::Shell(v)
157    }
158}
159
160impl From<ShellError> for ReadError {
161    fn from(value: ShellError) -> Self {
162        Box::new(value).into()
163    }
164}
165
166impl From<Spanned<ValueReadError>> for ReadError {
167    fn from(value: Spanned<ValueReadError>) -> Self {
168        match value.item {
169            // All I/O errors:
170            ValueReadError::InvalidMarkerRead(err) | ValueReadError::InvalidDataRead(err) => {
171                ReadError::Io(err, value.span)
172            }
173            ValueReadError::TypeMismatch(marker) => ReadError::TypeMismatch(marker, value.span),
174        }
175    }
176}
177
178impl From<Spanned<io::Error>> for ReadError {
179    fn from(value: Spanned<io::Error>) -> Self {
180        ReadError::Io(value.item, value.span)
181    }
182}
183
184impl From<Spanned<FromUtf8Error>> for ReadError {
185    fn from(value: Spanned<FromUtf8Error>) -> Self {
186        ReadError::Utf8(value.item, value.span)
187    }
188}
189
190impl From<ReadError> for ShellError {
191    fn from(value: ReadError) -> Self {
192        match value {
193            ReadError::MaxDepth(span) => ShellError::Generic(GenericError::new(
194                "MessagePack data is nested too deeply",
195                format!("exceeded depth limit ({MAX_DEPTH})"),
196                span,
197            )),
198            ReadError::Io(err, span) => ShellError::Generic(
199                GenericError::new(
200                    "Error while reading MessagePack data",
201                    err.to_string(),
202                    span,
203                )
204                .with_inner(
205                    err.source()
206                        .and_then(|s| s.downcast_ref::<ShellError>())
207                        .cloned()
208                        .into_iter()
209                        .collect::<Vec<_>>(),
210                ),
211            ),
212            ReadError::TypeMismatch(marker, span) => ShellError::Generic(GenericError::new(
213                "Invalid marker while reading MessagePack data",
214                format!("unexpected {marker:?} in data"),
215                span,
216            )),
217            ReadError::Utf8(err, span) => ShellError::NonUtf8Custom {
218                msg: format!("in MessagePack data: {err}"),
219                span,
220            },
221            ReadError::Shell(err) => *err,
222        }
223    }
224}
225
226pub(crate) struct Opts {
227    pub span: Span,
228    pub objects: bool,
229    pub signals: Signals,
230}
231
232/// Read single or multiple values into PipelineData
233pub(crate) fn read_msgpack(
234    mut input: impl io::Read + Send + 'static,
235    opts: Opts,
236) -> Result<PipelineData, ShellError> {
237    let Opts {
238        span,
239        objects,
240        signals,
241    } = opts;
242    if objects {
243        // Make an iterator that reads multiple values from the reader
244        let mut done = false;
245        Ok(std::iter::from_fn(move || {
246            if !done {
247                let result = read_value(&mut input, span, 0);
248                match result {
249                    Ok(value) => Some(value),
250                    // Any error should cause us to not read anymore
251                    Err(ReadError::Io(err, _)) if err.kind() == ErrorKind::UnexpectedEof => {
252                        done = true;
253                        None
254                    }
255                    Err(other_err) => {
256                        done = true;
257                        Some(Value::error(other_err.into(), span))
258                    }
259                }
260            } else {
261                None
262            }
263        })
264        .into_pipeline_data(span, signals))
265    } else {
266        // Read a single value and then make sure it's EOF
267        let result = read_value(&mut input, span, 0)?;
268        assert_eof(&mut input, span)?;
269        Ok(result.into_pipeline_data())
270    }
271}
272
273fn read_value(input: &mut impl io::Read, span: Span, depth: usize) -> Result<Value, ReadError> {
274    // Prevent stack overflow
275    if depth >= MAX_DEPTH {
276        return Err(ReadError::MaxDepth(span));
277    }
278
279    let marker = mp::read_marker(input)
280        .map_err(ValueReadError::from)
281        .err_span(span)?;
282
283    // We decide what kind of value to make depending on the marker. rmp doesn't really provide us
284    // a lot of utilities for reading the data after the marker, I think they assume you want to
285    // use rmp-serde or rmpv, but we don't have that kind of serde implementation for Value and so
286    // hand-written deserialization is going to be the fastest
287    match marker {
288        rmp::Marker::FixPos(num) => Ok(Value::int(num as i64, span)),
289        rmp::Marker::FixNeg(num) => Ok(Value::int(num as i64, span)),
290        rmp::Marker::Null => Ok(Value::nothing(span)),
291        rmp::Marker::True => Ok(Value::bool(true, span)),
292        rmp::Marker::False => Ok(Value::bool(false, span)),
293        rmp::Marker::U8 => from_int(input.read_u8(), span),
294        rmp::Marker::U16 => from_int(input.read_u16::<BigEndian>(), span),
295        rmp::Marker::U32 => from_int(input.read_u32::<BigEndian>(), span),
296        rmp::Marker::U64 => {
297            // u64 can be too big
298            let val_u64 = input.read_u64::<BigEndian>().err_span(span)?;
299            val_u64
300                .try_into()
301                .map(|val| Value::int(val, span))
302                .map_err(|err| {
303                    ShellError::Generic(GenericError::new(
304                        "MessagePack integer too big for Nushell",
305                        err.to_string(),
306                        span,
307                    ))
308                    .into()
309                })
310        }
311        rmp::Marker::I8 => from_int(input.read_i8(), span),
312        rmp::Marker::I16 => from_int(input.read_i16::<BigEndian>(), span),
313        rmp::Marker::I32 => from_int(input.read_i32::<BigEndian>(), span),
314        rmp::Marker::I64 => from_int(input.read_i64::<BigEndian>(), span),
315        rmp::Marker::F32 => Ok(Value::float(
316            input.read_f32::<BigEndian>().err_span(span)? as f64,
317            span,
318        )),
319        rmp::Marker::F64 => Ok(Value::float(
320            input.read_f64::<BigEndian>().err_span(span)?,
321            span,
322        )),
323        rmp::Marker::FixStr(len) => read_str(input, len as usize, span),
324        rmp::Marker::Str8 => {
325            let len = input.read_u8().err_span(span)?;
326            read_str(input, len as usize, span)
327        }
328        rmp::Marker::Str16 => {
329            let len = input.read_u16::<BigEndian>().err_span(span)?;
330            read_str(input, len as usize, span)
331        }
332        rmp::Marker::Str32 => {
333            let len = input.read_u32::<BigEndian>().err_span(span)?;
334            read_str(input, len as usize, span)
335        }
336        rmp::Marker::Bin8 => {
337            let len = input.read_u8().err_span(span)?;
338            read_bin(input, len as usize, span)
339        }
340        rmp::Marker::Bin16 => {
341            let len = input.read_u16::<BigEndian>().err_span(span)?;
342            read_bin(input, len as usize, span)
343        }
344        rmp::Marker::Bin32 => {
345            let len = input.read_u32::<BigEndian>().err_span(span)?;
346            read_bin(input, len as usize, span)
347        }
348        rmp::Marker::FixArray(len) => read_array(input, len as usize, span, depth),
349        rmp::Marker::Array16 => {
350            let len = input.read_u16::<BigEndian>().err_span(span)?;
351            read_array(input, len as usize, span, depth)
352        }
353        rmp::Marker::Array32 => {
354            let len = input.read_u32::<BigEndian>().err_span(span)?;
355            read_array(input, len as usize, span, depth)
356        }
357        rmp::Marker::FixMap(len) => read_map(input, len as usize, span, depth),
358        rmp::Marker::Map16 => {
359            let len = input.read_u16::<BigEndian>().err_span(span)?;
360            read_map(input, len as usize, span, depth)
361        }
362        rmp::Marker::Map32 => {
363            let len = input.read_u32::<BigEndian>().err_span(span)?;
364            read_map(input, len as usize, span, depth)
365        }
366        rmp::Marker::FixExt1 => read_ext(input, 1, span),
367        rmp::Marker::FixExt2 => read_ext(input, 2, span),
368        rmp::Marker::FixExt4 => read_ext(input, 4, span),
369        rmp::Marker::FixExt8 => read_ext(input, 8, span),
370        rmp::Marker::FixExt16 => read_ext(input, 16, span),
371        rmp::Marker::Ext8 => {
372            let len = input.read_u8().err_span(span)?;
373            read_ext(input, len as usize, span)
374        }
375        rmp::Marker::Ext16 => {
376            let len = input.read_u16::<BigEndian>().err_span(span)?;
377            read_ext(input, len as usize, span)
378        }
379        rmp::Marker::Ext32 => {
380            let len = input.read_u32::<BigEndian>().err_span(span)?;
381            read_ext(input, len as usize, span)
382        }
383        mk @ rmp::Marker::Reserved => Err(ReadError::TypeMismatch(mk, span)),
384    }
385}
386
387fn read_str(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
388    let mut buf = vec![0; len];
389    input.read_exact(&mut buf).err_span(span)?;
390    Ok(Value::string(String::from_utf8(buf).err_span(span)?, span))
391}
392
393fn read_bin(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
394    let mut buf = vec![0; len];
395    input.read_exact(&mut buf).err_span(span)?;
396    Ok(Value::binary(buf, span))
397}
398
399fn read_array(
400    input: &mut impl io::Read,
401    len: usize,
402    span: Span,
403    depth: usize,
404) -> Result<Value, ReadError> {
405    let vec = (0..len)
406        .map(|_| read_value(input, span, depth + 1))
407        .collect::<Result<Vec<Value>, ReadError>>()?;
408    Ok(Value::list(vec, span))
409}
410
411fn read_map(
412    input: &mut impl io::Read,
413    len: usize,
414    span: Span,
415    depth: usize,
416) -> Result<Value, ReadError> {
417    let rec = (0..len)
418        .map(|_| {
419            let key = read_value(input, span, depth + 1)?
420                .into_string()
421                .map_err(|_| {
422                    ShellError::Generic(GenericError::new(
423                        "Invalid non-string value in MessagePack map",
424                        "only maps with string keys are supported",
425                        span,
426                    ))
427                })?;
428            let val = read_value(input, span, depth + 1)?;
429            Ok((key, val))
430        })
431        .collect::<Result<Record, ReadError>>()?;
432    Ok(Value::record(rec, span))
433}
434
435fn read_ext(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
436    let ty = input.read_i8().err_span(span)?;
437    match (ty, len) {
438        // "timestamp 32" - u32 seconds only
439        (-1, 4) => {
440            let seconds = input.read_u32::<BigEndian>().err_span(span)?;
441            make_date(seconds as i64, 0, span)
442        }
443        // "timestamp 64" - nanoseconds + seconds packed into u64
444        (-1, 8) => {
445            let packed = input.read_u64::<BigEndian>().err_span(span)?;
446            let nanos = packed >> 34;
447            let secs = packed & ((1 << 34) - 1);
448            make_date(secs as i64, nanos as u32, span)
449        }
450        // "timestamp 96" - nanoseconds + seconds
451        (-1, 12) => {
452            let nanos = input.read_u32::<BigEndian>().err_span(span)?;
453            let secs = input.read_i64::<BigEndian>().err_span(span)?;
454            make_date(secs, nanos, span)
455        }
456        _ => Err(ShellError::Generic(
457            GenericError::new(
458                "Unknown MessagePack extension",
459                format!("encountered extension type {ty}, length {len}"),
460                span,
461            )
462            .with_help("only the timestamp extension (-1) is supported"),
463        )
464        .into()),
465    }
466}
467
468fn make_date(secs: i64, nanos: u32, span: Span) -> Result<Value, ReadError> {
469    match Utc.timestamp_opt(secs, nanos) {
470        chrono::offset::LocalResult::Single(dt) => Ok(Value::date(dt.into(), span)),
471        _ => Err(ShellError::Generic(
472            GenericError::new(
473                "Invalid MessagePack timestamp",
474                "datetime is out of supported range",
475                span,
476            )
477            .with_help("nanoseconds must be less than 1 billion"),
478        )
479        .into()),
480    }
481}
482
483fn from_int<T>(num: Result<T, std::io::Error>, span: Span) -> Result<Value, ReadError>
484where
485    T: Into<i64>,
486{
487    num.map(|num| Value::int(num.into(), span))
488        .map_err(|err| ReadError::Io(err, span))
489}
490
491/// Return an error if this is not the end of file.
492///
493/// This can help detect if parsing succeeded incorrectly, perhaps due to corruption.
494fn assert_eof(input: &mut impl io::Read, span: Span) -> Result<(), ShellError> {
495    let mut buf = [0u8];
496    match input.read_exact(&mut buf) {
497        // End of file
498        Err(_) => Ok(()),
499        // More bytes
500        Ok(()) => Err(ShellError::Generic(
501            GenericError::new(
502                "Additional data after end of MessagePack object",
503                "there was more data available after parsing",
504                span,
505            )
506            .with_help(
507                "this might be invalid data, but you can use `from msgpack --objects` to read multiple objects",
508            ),
509        )),
510    }
511}
512
513#[cfg(test)]
514mod test {
515    use nu_cmd_lang::eval_pipeline_without_terminal_expression;
516
517    use crate::Reject;
518    use crate::{Metadata, MetadataSet, ToMsgpack};
519
520    use super::*;
521
522    #[test]
523    fn test_examples() -> nu_test_support::Result {
524        nu_test_support::test().examples(FromMsgpack)
525    }
526
527    #[test]
528    fn test_content_type_metadata() {
529        let mut engine_state = Box::new(EngineState::new());
530        let delta = {
531            let mut working_set = StateWorkingSet::new(&engine_state);
532
533            working_set.add_decl(Box::new(ToMsgpack {}));
534            working_set.add_decl(Box::new(FromMsgpack {}));
535            working_set.add_decl(Box::new(Metadata {}));
536            working_set.add_decl(Box::new(MetadataSet {}));
537            working_set.add_decl(Box::new(Reject {}));
538
539            working_set.render()
540        };
541
542        engine_state
543            .merge_delta(delta)
544            .expect("Error merging delta");
545
546        let cmd = "{a: 1 b: 2} | to msgpack | metadata set --path-columns [name] | from msgpack | metadata | reject span | $in";
547        let result = eval_pipeline_without_terminal_expression(
548            cmd,
549            std::env::temp_dir().as_ref(),
550            &mut engine_state,
551        );
552        assert_eq!(
553            Value::test_record(
554                record!("path_columns" => Value::test_list(vec![Value::test_string("name")]))
555            ),
556            result.expect("There should be a result")
557        )
558    }
559}