1use std::{
5 error::Error,
6 io::{self, Cursor, ErrorKind},
7 string::FromUtf8Error,
8};
9
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::{TimeZone, Utc};
12use nu_engine::command_prelude::*;
13use nu_protocol::Signals;
14use rmp::decode::{self as mp, ValueReadError};
15
16const MAX_DEPTH: usize = 50;
18
19#[derive(Clone)]
20pub struct FromMsgpack;
21
22impl Command for FromMsgpack {
23 fn name(&self) -> &str {
24 "from msgpack"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(self.name())
29 .input_output_type(Type::Binary, Type::Any)
30 .switch("objects", "Read multiple objects from input", None)
31 .category(Category::Formats)
32 }
33
34 fn description(&self) -> &str {
35 "Convert MessagePack data into Nu values."
36 }
37
38 fn extra_description(&self) -> &str {
39 r#"
40Not all values are representable as MessagePack.
41
42The datetime extension type is read as dates. MessagePack binary values are
43read to their Nu equivalent. Most other types are read in an analogous way to
44`from json`, and may not convert to the exact same type if `to msgpack` was
45used originally to create the data.
46
47MessagePack: https://msgpack.org/
48"#
49 }
50
51 fn examples(&self) -> Vec<Example> {
52 vec![
53 Example {
54 description: "Read a list of values from MessagePack",
55 example: "0x[93A3666F6F2AC2] | from msgpack",
56 result: Some(Value::test_list(vec![
57 Value::test_string("foo"),
58 Value::test_int(42),
59 Value::test_bool(false),
60 ])),
61 },
62 Example {
63 description: "Read a stream of multiple values from MessagePack",
64 example: "0x[81A76E757368656C6CA5726F636B73A9736572696F75736C79] | from msgpack --objects",
65 result: Some(Value::test_list(vec![
66 Value::test_record(record! {
67 "nushell" => Value::test_string("rocks"),
68 }),
69 Value::test_string("seriously"),
70 ])),
71 },
72 Example {
73 description: "Read a table from MessagePack",
74 example: "0x[9282AA6576656E745F6E616D65B141706F6C6C6F203131204C616E64696E67A474696D65C70CFF00000000FFFFFFFFFF2CAB5B82AA6576656E745F6E616D65B44E757368656C6C20666972737420636F6D6D6974A474696D65D6FF5CD5ADE0] | from msgpack",
75 result: Some(Value::test_list(vec![
76 Value::test_record(record! {
77 "event_name" => Value::test_string("Apollo 11 Landing"),
78 "time" => Value::test_date(Utc.with_ymd_and_hms(
79 1969,
80 7,
81 24,
82 16,
83 50,
84 35,
85 ).unwrap().into())
86 }),
87 Value::test_record(record! {
88 "event_name" => Value::test_string("Nushell first commit"),
89 "time" => Value::test_date(Utc.with_ymd_and_hms(
90 2019,
91 5,
92 10,
93 16,
94 59,
95 12,
96 ).unwrap().into())
97 }),
98 ])),
99 },
100 ]
101 }
102
103 fn run(
104 &self,
105 engine_state: &EngineState,
106 stack: &mut Stack,
107 call: &Call,
108 input: PipelineData,
109 ) -> Result<PipelineData, ShellError> {
110 let objects = call.has_flag(engine_state, stack, "objects")?;
111 let opts = Opts {
112 span: call.head,
113 objects,
114 signals: engine_state.signals().clone(),
115 };
116 let metadata = input.metadata().map(|md| md.with_content_type(None));
117 let out = match input {
118 PipelineData::Value(Value::Binary { val: bytes, .. }, _) => {
120 read_msgpack(Cursor::new(bytes), opts)
121 }
122 PipelineData::ByteStream(stream, ..) => {
124 let span = stream.span();
125 if let Some(reader) = stream.reader() {
126 read_msgpack(reader, opts)
127 } else {
128 Err(ShellError::PipelineMismatch {
129 exp_input_type: "binary or byte stream".into(),
130 dst_span: call.head,
131 src_span: span,
132 })
133 }
134 }
135 input => Err(ShellError::PipelineMismatch {
136 exp_input_type: "binary or byte stream".into(),
137 dst_span: call.head,
138 src_span: input.span().unwrap_or(call.head),
139 }),
140 };
141 out.map(|pd| pd.set_metadata(metadata))
142 }
143}
144
145#[derive(Debug)]
146pub(crate) enum ReadError {
147 MaxDepth(Span),
148 Io(io::Error, Span),
149 TypeMismatch(rmp::Marker, Span),
150 Utf8(FromUtf8Error, Span),
151 Shell(Box<ShellError>),
152}
153
154impl From<Box<ShellError>> for ReadError {
155 fn from(v: Box<ShellError>) -> Self {
156 Self::Shell(v)
157 }
158}
159
160impl From<ShellError> for ReadError {
161 fn from(value: ShellError) -> Self {
162 Box::new(value).into()
163 }
164}
165
166impl From<Spanned<ValueReadError>> for ReadError {
167 fn from(value: Spanned<ValueReadError>) -> Self {
168 match value.item {
169 ValueReadError::InvalidMarkerRead(err) | ValueReadError::InvalidDataRead(err) => {
171 ReadError::Io(err, value.span)
172 }
173 ValueReadError::TypeMismatch(marker) => ReadError::TypeMismatch(marker, value.span),
174 }
175 }
176}
177
178impl From<Spanned<io::Error>> for ReadError {
179 fn from(value: Spanned<io::Error>) -> Self {
180 ReadError::Io(value.item, value.span)
181 }
182}
183
184impl From<Spanned<FromUtf8Error>> for ReadError {
185 fn from(value: Spanned<FromUtf8Error>) -> Self {
186 ReadError::Utf8(value.item, value.span)
187 }
188}
189
190impl From<ReadError> for ShellError {
191 fn from(value: ReadError) -> Self {
192 match value {
193 ReadError::MaxDepth(span) => ShellError::GenericError {
194 error: "MessagePack data is nested too deeply".into(),
195 msg: format!("exceeded depth limit ({MAX_DEPTH})"),
196 span: Some(span),
197 help: None,
198 inner: vec![],
199 },
200 ReadError::Io(err, span) => ShellError::GenericError {
201 error: "Error while reading MessagePack data".into(),
202 msg: err.to_string(),
203 span: Some(span),
204 help: None,
205 inner: err
207 .source()
208 .and_then(|s| s.downcast_ref::<ShellError>())
209 .cloned()
210 .into_iter()
211 .collect(),
212 },
213 ReadError::TypeMismatch(marker, span) => ShellError::GenericError {
214 error: "Invalid marker while reading MessagePack data".into(),
215 msg: format!("unexpected {:?} in data", marker),
216 span: Some(span),
217 help: None,
218 inner: vec![],
219 },
220 ReadError::Utf8(err, span) => ShellError::NonUtf8Custom {
221 msg: format!("in MessagePack data: {err}"),
222 span,
223 },
224 ReadError::Shell(err) => *err,
225 }
226 }
227}
228
229pub(crate) struct Opts {
230 pub span: Span,
231 pub objects: bool,
232 pub signals: Signals,
233}
234
235pub(crate) fn read_msgpack(
237 mut input: impl io::Read + Send + 'static,
238 opts: Opts,
239) -> Result<PipelineData, ShellError> {
240 let Opts {
241 span,
242 objects,
243 signals,
244 } = opts;
245 if objects {
246 let mut done = false;
248 Ok(std::iter::from_fn(move || {
249 if !done {
250 let result = read_value(&mut input, span, 0);
251 match result {
252 Ok(value) => Some(value),
253 Err(ReadError::Io(err, _)) if err.kind() == ErrorKind::UnexpectedEof => {
255 done = true;
256 None
257 }
258 Err(other_err) => {
259 done = true;
260 Some(Value::error(other_err.into(), span))
261 }
262 }
263 } else {
264 None
265 }
266 })
267 .into_pipeline_data(span, signals))
268 } else {
269 let result = read_value(&mut input, span, 0)?;
271 assert_eof(&mut input, span)?;
272 Ok(result.into_pipeline_data())
273 }
274}
275
276fn read_value(input: &mut impl io::Read, span: Span, depth: usize) -> Result<Value, ReadError> {
277 if depth >= MAX_DEPTH {
279 return Err(ReadError::MaxDepth(span));
280 }
281
282 let marker = mp::read_marker(input)
283 .map_err(ValueReadError::from)
284 .err_span(span)?;
285
286 match marker {
291 rmp::Marker::FixPos(num) => Ok(Value::int(num as i64, span)),
292 rmp::Marker::FixNeg(num) => Ok(Value::int(num as i64, span)),
293 rmp::Marker::Null => Ok(Value::nothing(span)),
294 rmp::Marker::True => Ok(Value::bool(true, span)),
295 rmp::Marker::False => Ok(Value::bool(false, span)),
296 rmp::Marker::U8 => from_int(input.read_u8(), span),
297 rmp::Marker::U16 => from_int(input.read_u16::<BigEndian>(), span),
298 rmp::Marker::U32 => from_int(input.read_u32::<BigEndian>(), span),
299 rmp::Marker::U64 => {
300 let val_u64 = input.read_u64::<BigEndian>().err_span(span)?;
302 val_u64
303 .try_into()
304 .map(|val| Value::int(val, span))
305 .map_err(|err| {
306 ShellError::GenericError {
307 error: "MessagePack integer too big for Nushell".into(),
308 msg: err.to_string(),
309 span: Some(span),
310 help: None,
311 inner: vec![],
312 }
313 .into()
314 })
315 }
316 rmp::Marker::I8 => from_int(input.read_i8(), span),
317 rmp::Marker::I16 => from_int(input.read_i16::<BigEndian>(), span),
318 rmp::Marker::I32 => from_int(input.read_i32::<BigEndian>(), span),
319 rmp::Marker::I64 => from_int(input.read_i64::<BigEndian>(), span),
320 rmp::Marker::F32 => Ok(Value::float(
321 input.read_f32::<BigEndian>().err_span(span)? as f64,
322 span,
323 )),
324 rmp::Marker::F64 => Ok(Value::float(
325 input.read_f64::<BigEndian>().err_span(span)?,
326 span,
327 )),
328 rmp::Marker::FixStr(len) => read_str(input, len as usize, span),
329 rmp::Marker::Str8 => {
330 let len = input.read_u8().err_span(span)?;
331 read_str(input, len as usize, span)
332 }
333 rmp::Marker::Str16 => {
334 let len = input.read_u16::<BigEndian>().err_span(span)?;
335 read_str(input, len as usize, span)
336 }
337 rmp::Marker::Str32 => {
338 let len = input.read_u32::<BigEndian>().err_span(span)?;
339 read_str(input, len as usize, span)
340 }
341 rmp::Marker::Bin8 => {
342 let len = input.read_u8().err_span(span)?;
343 read_bin(input, len as usize, span)
344 }
345 rmp::Marker::Bin16 => {
346 let len = input.read_u16::<BigEndian>().err_span(span)?;
347 read_bin(input, len as usize, span)
348 }
349 rmp::Marker::Bin32 => {
350 let len = input.read_u32::<BigEndian>().err_span(span)?;
351 read_bin(input, len as usize, span)
352 }
353 rmp::Marker::FixArray(len) => read_array(input, len as usize, span, depth),
354 rmp::Marker::Array16 => {
355 let len = input.read_u16::<BigEndian>().err_span(span)?;
356 read_array(input, len as usize, span, depth)
357 }
358 rmp::Marker::Array32 => {
359 let len = input.read_u32::<BigEndian>().err_span(span)?;
360 read_array(input, len as usize, span, depth)
361 }
362 rmp::Marker::FixMap(len) => read_map(input, len as usize, span, depth),
363 rmp::Marker::Map16 => {
364 let len = input.read_u16::<BigEndian>().err_span(span)?;
365 read_map(input, len as usize, span, depth)
366 }
367 rmp::Marker::Map32 => {
368 let len = input.read_u32::<BigEndian>().err_span(span)?;
369 read_map(input, len as usize, span, depth)
370 }
371 rmp::Marker::FixExt1 => read_ext(input, 1, span),
372 rmp::Marker::FixExt2 => read_ext(input, 2, span),
373 rmp::Marker::FixExt4 => read_ext(input, 4, span),
374 rmp::Marker::FixExt8 => read_ext(input, 8, span),
375 rmp::Marker::FixExt16 => read_ext(input, 16, span),
376 rmp::Marker::Ext8 => {
377 let len = input.read_u8().err_span(span)?;
378 read_ext(input, len as usize, span)
379 }
380 rmp::Marker::Ext16 => {
381 let len = input.read_u16::<BigEndian>().err_span(span)?;
382 read_ext(input, len as usize, span)
383 }
384 rmp::Marker::Ext32 => {
385 let len = input.read_u32::<BigEndian>().err_span(span)?;
386 read_ext(input, len as usize, span)
387 }
388 mk @ rmp::Marker::Reserved => Err(ReadError::TypeMismatch(mk, span)),
389 }
390}
391
392fn read_str(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
393 let mut buf = vec![0; len];
394 input.read_exact(&mut buf).err_span(span)?;
395 Ok(Value::string(String::from_utf8(buf).err_span(span)?, span))
396}
397
398fn read_bin(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
399 let mut buf = vec![0; len];
400 input.read_exact(&mut buf).err_span(span)?;
401 Ok(Value::binary(buf, span))
402}
403
404fn read_array(
405 input: &mut impl io::Read,
406 len: usize,
407 span: Span,
408 depth: usize,
409) -> Result<Value, ReadError> {
410 let vec = (0..len)
411 .map(|_| read_value(input, span, depth + 1))
412 .collect::<Result<Vec<Value>, ReadError>>()?;
413 Ok(Value::list(vec, span))
414}
415
416fn read_map(
417 input: &mut impl io::Read,
418 len: usize,
419 span: Span,
420 depth: usize,
421) -> Result<Value, ReadError> {
422 let rec = (0..len)
423 .map(|_| {
424 let key = read_value(input, span, depth + 1)?
425 .into_string()
426 .map_err(|_| ShellError::GenericError {
427 error: "Invalid non-string value in MessagePack map".into(),
428 msg: "only maps with string keys are supported".into(),
429 span: Some(span),
430 help: None,
431 inner: vec![],
432 })?;
433 let val = read_value(input, span, depth + 1)?;
434 Ok((key, val))
435 })
436 .collect::<Result<Record, ReadError>>()?;
437 Ok(Value::record(rec, span))
438}
439
440fn read_ext(input: &mut impl io::Read, len: usize, span: Span) -> Result<Value, ReadError> {
441 let ty = input.read_i8().err_span(span)?;
442 match (ty, len) {
443 (-1, 4) => {
445 let seconds = input.read_u32::<BigEndian>().err_span(span)?;
446 make_date(seconds as i64, 0, span)
447 }
448 (-1, 8) => {
450 let packed = input.read_u64::<BigEndian>().err_span(span)?;
451 let nanos = packed >> 34;
452 let secs = packed & ((1 << 34) - 1);
453 make_date(secs as i64, nanos as u32, span)
454 }
455 (-1, 12) => {
457 let nanos = input.read_u32::<BigEndian>().err_span(span)?;
458 let secs = input.read_i64::<BigEndian>().err_span(span)?;
459 make_date(secs, nanos, span)
460 }
461 _ => Err(ShellError::GenericError {
462 error: "Unknown MessagePack extension".into(),
463 msg: format!("encountered extension type {ty}, length {len}"),
464 span: Some(span),
465 help: Some("only the timestamp extension (-1) is supported".into()),
466 inner: vec![],
467 }
468 .into()),
469 }
470}
471
472fn make_date(secs: i64, nanos: u32, span: Span) -> Result<Value, ReadError> {
473 match Utc.timestamp_opt(secs, nanos) {
474 chrono::offset::LocalResult::Single(dt) => Ok(Value::date(dt.into(), span)),
475 _ => Err(ShellError::GenericError {
476 error: "Invalid MessagePack timestamp".into(),
477 msg: "datetime is out of supported range".into(),
478 span: Some(span),
479 help: Some("nanoseconds must be less than 1 billion".into()),
480 inner: vec![],
481 }
482 .into()),
483 }
484}
485
486fn from_int<T>(num: Result<T, std::io::Error>, span: Span) -> Result<Value, ReadError>
487where
488 T: Into<i64>,
489{
490 num.map(|num| Value::int(num.into(), span))
491 .map_err(|err| ReadError::Io(err, span))
492}
493
494fn assert_eof(input: &mut impl io::Read, span: Span) -> Result<(), ShellError> {
498 let mut buf = [0u8];
499 match input.read_exact(&mut buf) {
500 Err(_) => Ok(()),
502 Ok(()) => Err(ShellError::GenericError {
504 error: "Additional data after end of MessagePack object".into(),
505 msg: "there was more data available after parsing".into(),
506 span: Some(span),
507 help: Some("this might be invalid data, but you can use `from msgpack --objects` to read multiple objects".into()),
508 inner: vec![],
509 })
510 }
511}
512
513#[cfg(test)]
514mod test {
515 use nu_cmd_lang::eval_pipeline_without_terminal_expression;
516
517 use crate::{Metadata, MetadataSet, ToMsgpack};
518
519 use super::*;
520
521 #[test]
522 fn test_examples() {
523 use crate::test_examples;
524
525 test_examples(FromMsgpack {})
526 }
527
528 #[test]
529 fn test_content_type_metadata() {
530 let mut engine_state = Box::new(EngineState::new());
531 let delta = {
532 let mut working_set = StateWorkingSet::new(&engine_state);
533
534 working_set.add_decl(Box::new(ToMsgpack {}));
535 working_set.add_decl(Box::new(FromMsgpack {}));
536 working_set.add_decl(Box::new(Metadata {}));
537 working_set.add_decl(Box::new(MetadataSet {}));
538
539 working_set.render()
540 };
541
542 engine_state
543 .merge_delta(delta)
544 .expect("Error merging delta");
545
546 let cmd = r#"{a: 1 b: 2} | to msgpack | metadata set --datasource-ls | from msgpack | metadata | $in"#;
547 let result = eval_pipeline_without_terminal_expression(
548 cmd,
549 std::env::temp_dir().as_ref(),
550 &mut engine_state,
551 );
552 assert_eq!(
553 Value::test_record(record!("source" => Value::test_string("ls"))),
554 result.expect("There should be a result")
555 )
556 }
557}