1use std::{
5 error::Error,
6 io::{self, Cursor, ErrorKind},
7 string::FromUtf8Error,
8};
9
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::{TimeZone, Utc};
12use nu_engine::command_prelude::*;
13use nu_protocol::{Signals, shell_error::generic::GenericError};
14use rmp::decode::{self as mp, ValueReadError};
15
16const MAX_DEPTH: usize = 50;
18
19#[derive(Clone)]
20pub struct FromMsgpack;
21
22impl Command for FromMsgpack {
23 fn name(&self) -> &str {
24 "from msgpack"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(self.name())
29 .input_output_type(Type::Binary, Type::Any)
30 .switch("objects", "Read multiple objects from input.", None)
31 .category(Category::Formats)
32 }
33
34 fn description(&self) -> &str {
35 "Convert MessagePack data into Nu values."
36 }
37
38 fn extra_description(&self) -> &str {
39 "
40Not all values are representable as MessagePack.
41
42The datetime extension type is read as dates. MessagePack binary values are
43read to their Nu equivalent. Most other types are read in an analogous way to
44`from json`, and may not convert to the exact same type if `to msgpack` was
45used originally to create the data.
46
47MessagePack: https://msgpack.org/
48"
49 }
50
51 fn examples(&self) -> Vec<Example<'_>> {
52 vec![
53 Example {
54 description: "Read a list of values from MessagePack.",
55 example: "0x[93A3666F6F2AC2] | from msgpack",
56 result: Some(Value::test_list(vec![
57 Value::test_string("foo"),
58 Value::test_int(42),
59 Value::test_bool(false),
60 ])),
61 },
62 Example {
63 description: "Read a stream of multiple values from MessagePack.",
64 example: "0x[81A76E757368656C6CA5726F636B73A9736572696F75736C79] | from msgpack --objects",
65 result: Some(Value::test_list(vec![
66 Value::test_record(record! {
67 "nushell" => Value::test_string("rocks"),
68 }),
69 Value::test_string("seriously"),
70 ])),
71 },
72 Example {
73 description: "Read a table from MessagePack.",
74 example: "0x[9282AA6576656E745F6E616D65B141706F6C6C6F203131204C616E64696E67A474696D65C70CFF00000000FFFFFFFFFF2CAB5B82AA6576656E745F6E616D65B44E757368656C6C20666972737420636F6D6D6974A474696D65D6FF5CD5ADE0] | from msgpack",
75 result: Some(Value::test_list(vec![
76 Value::test_record(record! {
77 "event_name" => Value::test_string("Apollo 11 Landing"),
78 "time" => Value::test_date(Utc.with_ymd_and_hms(
79 1969,
80 7,
81 24,
82 16,
83 50,
84 35,
85 ).unwrap().into())
86 }),
87 Value::test_record(record! {
88 "event_name" => Value::test_string("Nushell first commit"),
89 "time" => Value::test_date(Utc.with_ymd_and_hms(
90 2019,
91 5,
92 10,
93 16,
94 59,
95 12,
96 ).unwrap().into())
97 }),
98 ])),
99 },
100 ]
101 }
102
103 fn run(
104 &self,
105 engine_state: &EngineState,
106 stack: &mut Stack,
107 call: &Call,
108 mut input: PipelineData,
109 ) -> Result<PipelineData, ShellError> {
110 let objects = call.has_flag(engine_state, stack, "objects")?;
111 let opts = Opts {
112 span: call.head,
113 objects,
114 signals: engine_state.signals().clone(),
115 };
116 let metadata = input.take_metadata().map(|md| md.with_content_type(None));
117 let out = match input {
118 PipelineData::Value(Value::Binary { val: bytes, .. }, _) => {
120 read_msgpack(Cursor::new(bytes), opts)
121 }
122 PipelineData::ByteStream(stream, ..) => {
124 let span = stream.span();
125 if let Some(reader) = stream.reader() {
126 read_msgpack(reader, opts)
127 } else {
128 Err(ShellError::PipelineMismatch {
129 exp_input_type: "binary or byte stream".into(),
130 dst_span: call.head,
131 src_span: span,
132 })
133 }
134 }
135 input => Err(ShellError::PipelineMismatch {
136 exp_input_type: "binary or byte stream".into(),
137 dst_span: call.head,
138 src_span: input.span().unwrap_or(call.head),
139 }),
140 };
141 out.map(|pd| pd.set_metadata(metadata))
142 }
143}
144
145#[derive(Debug)]
146pub(crate) enum ReadError {
147 MaxDepth(Span),
148 Io(io::Error, Span),
149 TypeMismatch(rmp::Marker, Span),
150 Utf8(FromUtf8Error, Span),
151 Shell(Box<ShellError>),
152}
153
154impl From<Box<ShellError>> for ReadError {
155 fn from(v: Box<ShellError>) -> Self {
156 Self::Shell(v)
157 }
158}
159
160impl From<ShellError> for ReadError {
161 fn from(value: ShellError) -> Self {
162 Box::new(value).into()
163 }
164}
165
166impl From<Spanned<ValueReadError>> for ReadError {
167 fn from(value: Spanned<ValueReadError>) -> Self {
168 match value.item {
169 ValueReadError::InvalidMarkerRead(err) | ValueReadError::InvalidDataRead(err) => {
171 ReadError::Io(err, value.span)
172 }
173 ValueReadError::TypeMismatch(marker) => ReadError::TypeMismatch(marker, value.span),
174 }
175 }
176}
177
178impl From<Spanned<io::Error>> for ReadError {
179 fn from(value: Spanned<io::Error>) -> Self {
180 ReadError::Io(value.item, value.span)
181 }
182}
183
184impl From<Spanned<FromUtf8Error>> for ReadError {
185 fn from(value: Spanned<FromUtf8Error>) -> Self {
186 ReadError::Utf8(value.item, value.span)
187 }
188}
189
190impl From<ReadError> for ShellError {
191 fn from(value: ReadError) -> Self {
192 match value {
193 ReadError::MaxDepth(span) => ShellError::Generic(GenericError::new(
194 "MessagePack data is nested too deeply",
195 format!("exceeded depth limit ({MAX_DEPTH})"),
196 span,
197 )),
198 ReadError::Io(err, span) => ShellError::Generic(
199 GenericError::new(
200 "Error while reading MessagePack data",
201 err.to_string(),
202 span,
203 )
204 .with_inner(
205 err.source()
206 .and_then(|s| s.downcast_ref::<ShellError>())
207 .cloned()
208 .into_iter()
209 .collect::<Vec<_>>(),
210 ),
211 ),
212 ReadError::TypeMismatch(marker, span) => ShellError::Generic(GenericError::new(
213 "Invalid marker while reading MessagePack data",
214 format!("unexpected {marker:?} in data"),
215 span,
216 )),
217 ReadError::Utf8(err, span) => ShellError::NonUtf8Custom {
218 msg: format!("in MessagePack data: {err}"),
219 span,
220 },
221 ReadError::Shell(err) => *err,
222 }
223 }
224}
225
226pub(crate) struct Opts {
227 pub span: Span,
228 pub objects: bool,
229 pub signals: Signals,
230}
231
232pub(crate) fn read_msgpack(
234 mut input: impl io::Read + Send + 'static,
235 opts: Opts,
236) -> Result<PipelineData, ShellError> {
237 let Opts {
238 span,
239 objects,
240 signals,
241 } = opts;
242 if objects {
243 let mut done = false;
245 Ok(std::iter::from_fn(move || {
246 if !done {
247 let result = read_value(&mut input, span, 0);
248 match result {
249 Ok(value) => Some(value),
250 Err(ReadError::Io(err, _)) if err.kind() == ErrorKind::UnexpectedEof => {
252 done = true;
253 None
254 }
255 Err(other_err) => {
256 done = true;
257 Some(Value::error(other_err.into(), span))
258 }
259 }
260 } else {
261 None
262 }
263 })
264 .into_pipeline_data(span, signals))
265 } else {
266 let result = read_value(&mut input, span, 0)?;
268 assert_eof(&mut input, span)?;
269 Ok(result.into_pipeline_data())
270 }
271}
272
273fn read_value(input: &mut impl io::Read, span: Span, depth: usize) -> Result<Value, ReadError> {
274 if depth >= MAX_DEPTH {
276 return Err(ReadError::MaxDepth(span));
277 }
278
279 let marker = mp::read_marker(input)
280 .map_err(ValueReadError::from)
281 .err_span(span)?;
282
283 match marker {
288 rmp::Marker::FixPos(num) => Ok(Value::int(num as i64, span)),
289 rmp::Marker::FixNeg(num) => Ok(Value::int(num as i64, span)),
290 rmp::Marker::Null => Ok(Value::nothing(span)),
291 rmp::Marker::True => Ok(Value::bool(true, span)),
292 rmp::Marker::False => Ok(Value::bool(false, span)),
293 rmp::Marker::U8 => from_int(input.read_u8(), span),
294 rmp::Marker::U16 => from_int(input.read_u16::<BigEndian>(), span),
295 rmp::Marker::U32 => from_int(input.read_u32::<BigEndian>(), span),
296 rmp::Marker::U64 => {
297 let val_u64 = input.read_u64::<BigEndian>().err_span(span)?;
299 val_u64
300 .try_into()
301 .map(|val| Value::int(val, span))
302 .map_err(|err| {
303 ShellError::Generic(GenericError::new(
304 "MessagePack integer too big for Nushell",
305 err.to_string(),
306 span,
307 ))
308 .into()
309 })
310 }
311 rmp::Marker::I8 => from_int(input.read_i8(), span),
312 rmp::Marker::I16 => from_int(input.read_i16::<BigEndian>(), span),
313 rmp::Marker::I32 => from_int(input.read_i32::<BigEndian>(), span),
314 rmp::Marker::I64 => from_int(input.read_i64::<BigEndian>(), span),
315 rmp::Marker::F32 => Ok(Value::float(
316 input.read_f32::<BigEndian>().err_span(span)? as f64,
317 span,
318 )),
319 rmp::Marker::F64 => Ok(Value::float(
320 input.read_f64::<BigEndian>().err_span(span)?,
321 span,
322 )),
323 rmp::Marker::FixStr(len) => read_str(input, len as usize, span),
324 rmp::Marker::Str8 => {
325 let len = input.read_u8().err_span(span)?;
326 read_str(input, len as usize, span)
327 }
328 rmp::Marker::Str16 => {
329 let len = input.read_u16::<BigEndian>().err_span(span)?;
330 read_str(input, len as usize, span)
331 }
332 rmp::Marker::Str32 => {
333 let len = input.read_u32::<BigEndian>().err_span(span)?;
334 read_str(input, len as usize, span)
335 }
336 rmp::Marker::Bin8 => {
337 let len = input.read_u8().err_span(span)?;
338 read_bin(input, len as usize, span)
339 }
340 rmp::Marker::Bin16 => {
341 let len = input.read_u16::<BigEndian>().err_span(span)?;
342 read_bin(input, len as usize, span)
343 }
344 rmp::Marker::Bin32 => {
345 let len = input.read_u32::<BigEndian>().err_span(span)?;
346 read_bin(input, len as usize, span)
347 }
348 rmp::Marker::FixArray(len) => read_array(input, len as usize, span, depth),
349 rmp::Marker::Array16 => {
350 let len = input.read_u16::<BigEndian>().err_span(span)?;
351 read_array(input, len as usize, span, depth)
352 }
353 rmp::Marker::Array32 => {
354 let len = input.read_u32::<BigEndian>().err_span(span)?;
355 read_array(input, len as usize, span, depth)
356 }
357 rmp::Marker::FixMap(len) => read_map(input, len as usize, span, depth),
358 rmp::Marker::Map16 => {
359 let len = input.read_u16::<BigEndian>().err_span(span)?;
360 read_map(input, len as usize, span, depth)
361 }
362 rmp::Marker::Map32 => {
363 let len = input.read_u32::<BigEndian>().err_span(span)?;
364 read_map(input, len as usize, span, depth)
365 }
366 rmp::Marker::FixExt1 => read_ext(input, 1, span),
367 rmp::Marker::FixExt2 => read_ext(input, 2, span),
368 rmp::Marker::FixExt4 => read_ext(input, 4, span),
369 rmp::Marker::FixExt8 => read_ext(input, 8, span),
370 rmp::Marker::FixExt16 => read_ext(input, 16, span),
371 rmp::Marker::Ext8 => {
372 let len = input.read_u8().err_span(span)?;
373 read_ext(input, len as usize, span)
374 }
375 rmp::Marker::Ext16 => {
376 let len = input.read_u16::<BigEndian>().err_span(span)?;
377 read_ext(input, len as usize, span)
378 }
379 rmp::Marker::Ext32 => {
380 let len = input.read_u32::<BigEndian>().err_span(span)?;
381 read_ext(input, len as usize, span)
382 }
383 mk @ rmp::Marker::Reserved => Err(ReadError::TypeMismatch(mk, span)),
384 }
385}
386
387fn read_str(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
388 let mut buf = vec![0; len];
389 input.read_exact(&mut buf).err_span(span)?;
390 Ok(Value::string(String::from_utf8(buf).err_span(span)?, span))
391}
392
393fn read_bin(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
394 let mut buf = vec![0; len];
395 input.read_exact(&mut buf).err_span(span)?;
396 Ok(Value::binary(buf, span))
397}
398
399fn read_array(
400 input: &mut impl io::Read,
401 len: usize,
402 span: Span,
403 depth: usize,
404) -> Result<Value, ReadError> {
405 let vec = (0..len)
406 .map(|_| read_value(input, span, depth + 1))
407 .collect::<Result<Vec<Value>, ReadError>>()?;
408 Ok(Value::list(vec, span))
409}
410
411fn read_map(
412 input: &mut impl io::Read,
413 len: usize,
414 span: Span,
415 depth: usize,
416) -> Result<Value, ReadError> {
417 let rec = (0..len)
418 .map(|_| {
419 let key = read_value(input, span, depth + 1)?
420 .into_string()
421 .map_err(|_| {
422 ShellError::Generic(GenericError::new(
423 "Invalid non-string value in MessagePack map",
424 "only maps with string keys are supported",
425 span,
426 ))
427 })?;
428 let val = read_value(input, span, depth + 1)?;
429 Ok((key, val))
430 })
431 .collect::<Result<Record, ReadError>>()?;
432 Ok(Value::record(rec, span))
433}
434
435fn read_ext(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
436 let ty = input.read_i8().err_span(span)?;
437 match (ty, len) {
438 (-1, 4) => {
440 let seconds = input.read_u32::<BigEndian>().err_span(span)?;
441 make_date(seconds as i64, 0, span)
442 }
443 (-1, 8) => {
445 let packed = input.read_u64::<BigEndian>().err_span(span)?;
446 let nanos = packed >> 34;
447 let secs = packed & ((1 << 34) - 1);
448 make_date(secs as i64, nanos as u32, span)
449 }
450 (-1, 12) => {
452 let nanos = input.read_u32::<BigEndian>().err_span(span)?;
453 let secs = input.read_i64::<BigEndian>().err_span(span)?;
454 make_date(secs, nanos, span)
455 }
456 _ => Err(ShellError::Generic(
457 GenericError::new(
458 "Unknown MessagePack extension",
459 format!("encountered extension type {ty}, length {len}"),
460 span,
461 )
462 .with_help("only the timestamp extension (-1) is supported"),
463 )
464 .into()),
465 }
466}
467
468fn make_date(secs: i64, nanos: u32, span: Span) -> Result<Value, ReadError> {
469 match Utc.timestamp_opt(secs, nanos) {
470 chrono::offset::LocalResult::Single(dt) => Ok(Value::date(dt.into(), span)),
471 _ => Err(ShellError::Generic(
472 GenericError::new(
473 "Invalid MessagePack timestamp",
474 "datetime is out of supported range",
475 span,
476 )
477 .with_help("nanoseconds must be less than 1 billion"),
478 )
479 .into()),
480 }
481}
482
483fn from_int<T>(num: Result<T, std::io::Error>, span: Span) -> Result<Value, ReadError>
484where
485 T: Into<i64>,
486{
487 num.map(|num| Value::int(num.into(), span))
488 .map_err(|err| ReadError::Io(err, span))
489}
490
491fn assert_eof(input: &mut impl io::Read, span: Span) -> Result<(), ShellError> {
495 let mut buf = [0u8];
496 match input.read_exact(&mut buf) {
497 Err(_) => Ok(()),
499 Ok(()) => Err(ShellError::Generic(
501 GenericError::new(
502 "Additional data after end of MessagePack object",
503 "there was more data available after parsing",
504 span,
505 )
506 .with_help(
507 "this might be invalid data, but you can use `from msgpack --objects` to read multiple objects",
508 ),
509 )),
510 }
511}
512
513#[cfg(test)]
514mod test {
515 use nu_cmd_lang::eval_pipeline_without_terminal_expression;
516
517 use crate::Reject;
518 use crate::{Metadata, MetadataSet, ToMsgpack};
519
520 use super::*;
521
522 #[test]
523 fn test_examples() -> nu_test_support::Result {
524 nu_test_support::test().examples(FromMsgpack)
525 }
526
527 #[test]
528 fn test_content_type_metadata() {
529 let mut engine_state = Box::new(EngineState::new());
530 let delta = {
531 let mut working_set = StateWorkingSet::new(&engine_state);
532
533 working_set.add_decl(Box::new(ToMsgpack {}));
534 working_set.add_decl(Box::new(FromMsgpack {}));
535 working_set.add_decl(Box::new(Metadata {}));
536 working_set.add_decl(Box::new(MetadataSet {}));
537 working_set.add_decl(Box::new(Reject {}));
538
539 working_set.render()
540 };
541
542 engine_state
543 .merge_delta(delta)
544 .expect("Error merging delta");
545
546 let cmd = "{a: 1 b: 2} | to msgpack | metadata set --path-columns [name] | from msgpack | metadata | reject span | $in";
547 let result = eval_pipeline_without_terminal_expression(
548 cmd,
549 std::env::temp_dir().as_ref(),
550 &mut engine_state,
551 );
552 assert_eq!(
553 Value::test_record(
554 record!("path_columns" => Value::test_list(vec![Value::test_string("name")]))
555 ),
556 result.expect("There should be a result")
557 )
558 }
559}