Skip to main content

nu_command/filters/
upsert.rs

1use std::borrow::Cow;
2
3use nu_engine::{ClosureEval, ClosureEvalOnce, command_prelude::*};
4use nu_protocol::ast::PathMember;
5
6#[derive(Clone)]
7pub struct Upsert;
8
9impl Command for Upsert {
10    fn name(&self) -> &str {
11        "upsert"
12    }
13
14    fn signature(&self) -> Signature {
15        Signature::build("upsert")
16            .input_output_types(vec![
17                (Type::record(), Type::record()),
18                (Type::table(), Type::table()),
19                (
20                    Type::List(Box::new(Type::Any)),
21                    Type::List(Box::new(Type::Any)),
22                ),
23            ])
24            .required(
25                "field",
26                SyntaxShape::CellPath,
27                "The name of the column to update or insert.",
28            )
29            .required(
30                "replacement value",
31                SyntaxShape::Any,
32                "The new value to give the cell(s), or a closure to create the value.",
33            )
34            .allow_variants_without_examples(true)
35            .category(Category::Filters)
36    }
37
38    fn description(&self) -> &str {
39        "Update an existing column to have a new value, or insert a new column."
40    }
41
42    fn extra_description(&self) -> &str {
43        "When updating or inserting a column, the closure will be run for each row, and the current row will be passed as the first argument. \
44Referencing `$in` inside the closure will provide the value at the column for the current row or null if the column does not exist.
45
46When updating a specific index, the closure will instead be run once. The first argument to the closure and the `$in` value will both be the current value at the index. \
47If the command is inserting at the end of a list or table, then both of these values will be null."
48    }
49
50    fn search_terms(&self) -> Vec<&str> {
51        vec!["add"]
52    }
53
54    fn run(
55        &self,
56        engine_state: &EngineState,
57        stack: &mut Stack,
58        call: &Call,
59        input: PipelineData,
60    ) -> Result<PipelineData, ShellError> {
61        upsert(engine_state, stack, call, input)
62    }
63
64    fn examples(&self) -> Vec<Example<'_>> {
65        vec![
66            Example {
67                description: "Update a record's value.",
68                example: "{'name': 'nu', 'stars': 5} | upsert name 'Nushell'",
69                result: Some(Value::test_record(record! {
70                    "name" => Value::test_string("Nushell"),
71                    "stars" => Value::test_int(5),
72                })),
73            },
74            Example {
75                description: "Insert a new entry into a record.",
76                example: "{'name': 'nu', 'stars': 5} | upsert language 'Rust'",
77                result: Some(Value::test_record(record! {
78                    "name" =>     Value::test_string("nu"),
79                    "stars" =>    Value::test_int(5),
80                    "language" => Value::test_string("Rust"),
81                })),
82            },
83            Example {
84                description: "Update each row of a table.",
85                example: "[[name lang]; [Nushell ''] [Reedline '']] | upsert lang 'Rust'",
86                result: Some(Value::test_list(vec![
87                    Value::test_record(record! {
88                        "name" => Value::test_string("Nushell"),
89                        "lang" => Value::test_string("Rust"),
90                    }),
91                    Value::test_record(record! {
92                        "name" => Value::test_string("Reedline"),
93                        "lang" => Value::test_string("Rust"),
94                    }),
95                ])),
96            },
97            Example {
98                description: "Insert a new column with values computed based off the other columns.",
99                example: "[[foo]; [7] [8] [9]] | upsert bar {|row| $row.foo * 2 }",
100                result: Some(Value::test_list(vec![
101                    Value::test_record(record! {
102                        "foo" => Value::test_int(7),
103                        "bar" => Value::test_int(14),
104                    }),
105                    Value::test_record(record! {
106                        "foo" => Value::test_int(8),
107                        "bar" => Value::test_int(16),
108                    }),
109                    Value::test_record(record! {
110                        "foo" => Value::test_int(9),
111                        "bar" => Value::test_int(18),
112                    }),
113                ])),
114            },
115            Example {
116                description: "Update null values in a column to a default value.",
117                example: "[[foo]; [2] [null] [4]] | upsert foo { default 0 }",
118                result: Some(Value::test_list(vec![
119                    Value::test_record(record! {
120                        "foo" => Value::test_int(2),
121                    }),
122                    Value::test_record(record! {
123                        "foo" => Value::test_int(0),
124                    }),
125                    Value::test_record(record! {
126                        "foo" => Value::test_int(4),
127                    }),
128                ])),
129            },
130            Example {
131                description: "Upsert into a list, updating an existing value at an index.",
132                example: "[1 2 3] | upsert 0 2",
133                result: Some(Value::test_list(vec![
134                    Value::test_int(2),
135                    Value::test_int(2),
136                    Value::test_int(3),
137                ])),
138            },
139            Example {
140                description: "Upsert into a list, inserting a new value at the end.",
141                example: "[1 2 3] | upsert 3 4",
142                result: Some(Value::test_list(vec![
143                    Value::test_int(1),
144                    Value::test_int(2),
145                    Value::test_int(3),
146                    Value::test_int(4),
147                ])),
148            },
149            Example {
150                description: "Upsert into a nested path, creating new values as needed.",
151                example: "[{} {a: [{}]}] | upsert a.0.b \"value\"",
152                result: Some(Value::test_list(vec![
153                    Value::test_record(record!(
154                        "a" => Value::test_record(record!(
155                            "b" => Value::test_string("value"),
156                        )),
157                    )),
158                    Value::test_record(record!(
159                        "a" => Value::test_list(vec![Value::test_record(record!(
160                        ))]),
161                    )),
162                ])),
163            },
164        ]
165    }
166}
167
168fn upsert_recursive(
169    engine_state: &EngineState,
170    stack: &mut Stack,
171    head_span: Span,
172    replacement: Value,
173    input: PipelineData,
174    cell_paths: &[PathMember],
175) -> Result<PipelineData, ShellError> {
176    match input {
177        PipelineData::Value(mut value, metadata) => {
178            if let Value::Closure { val, .. } = replacement {
179                upsert_single_value_by_closure(
180                    &mut value,
181                    ClosureEvalOnce::new(engine_state, stack, *val),
182                    head_span,
183                    cell_paths,
184                    false,
185                )?;
186            } else {
187                value.upsert_data_at_cell_path(cell_paths, replacement)?;
188            }
189            Ok(value.into_pipeline_data_with_metadata(metadata))
190        }
191        PipelineData::ListStream(stream, metadata) => {
192            if let Some((
193                &PathMember::Int {
194                    val,
195                    span: path_span,
196                    ..
197                },
198                path,
199            )) = cell_paths.split_first()
200            {
201                let mut stream = stream.into_iter();
202                let mut pre_elems = vec![];
203
204                for idx in 0..val {
205                    if let Some(v) = stream.next() {
206                        pre_elems.push(v);
207                    } else {
208                        return Err(ShellError::InsertAfterNextFreeIndex {
209                            available_idx: idx,
210                            span: path_span,
211                        });
212                    }
213                }
214
215                let value = if path.is_empty() {
216                    let value = stream.next().unwrap_or(Value::nothing(head_span));
217                    if let Value::Closure { val, .. } = replacement {
218                        ClosureEvalOnce::new(engine_state, stack, *val)
219                            .run_with_value(value)?
220                            .into_value(head_span)?
221                    } else {
222                        replacement
223                    }
224                } else if let Some(mut value) = stream.next() {
225                    if let Value::Closure { val, .. } = replacement {
226                        upsert_single_value_by_closure(
227                            &mut value,
228                            ClosureEvalOnce::new(engine_state, stack, *val),
229                            head_span,
230                            path,
231                            true,
232                        )?;
233                    } else {
234                        value.upsert_data_at_cell_path(path, replacement)?;
235                    }
236                    value
237                } else if pre_elems.is_empty() {
238                    return Err(ShellError::AccessEmptyContent { span: path_span });
239                } else {
240                    return Err(ShellError::AccessBeyondEnd {
241                        max_idx: pre_elems.len() - 1,
242                        span: path_span,
243                    });
244                };
245
246                pre_elems.push(value);
247
248                Ok(pre_elems
249                    .into_iter()
250                    .chain(stream)
251                    .into_pipeline_data_with_metadata(
252                        head_span,
253                        engine_state.signals().clone(),
254                        metadata,
255                    ))
256            } else if let Some(new_cell_paths) = Value::try_put_int_path_member_on_top(cell_paths) {
257                upsert_recursive(
258                    engine_state,
259                    stack,
260                    head_span,
261                    replacement,
262                    PipelineData::ListStream(stream, metadata),
263                    &new_cell_paths,
264                )
265            } else if let Value::Closure { val, .. } = replacement {
266                let mut closure = ClosureEval::new(engine_state, stack, *val);
267                let cell_paths = cell_paths.to_vec();
268                let stream = stream.map(move |mut value| {
269                    let err =
270                        upsert_value_by_closure(&mut value, &mut closure, head_span, &cell_paths);
271
272                    if let Err(e) = err {
273                        Value::error(e, head_span)
274                    } else {
275                        value
276                    }
277                });
278
279                Ok(PipelineData::list_stream(stream, metadata))
280            } else {
281                let cell_paths = cell_paths.to_vec();
282                let stream = stream.map(move |mut value| {
283                    if let Err(e) = value.upsert_data_at_cell_path(&cell_paths, replacement.clone())
284                    {
285                        Value::error(e, head_span)
286                    } else {
287                        value
288                    }
289                });
290
291                Ok(PipelineData::list_stream(stream, metadata))
292            }
293        }
294        PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
295            type_name: "empty pipeline".to_string(),
296            span: head_span,
297        }),
298        PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
299            type_name: stream.type_().describe().into(),
300            span: head_span,
301        }),
302    }
303}
304
305fn upsert(
306    engine_state: &EngineState,
307    stack: &mut Stack,
308    call: &Call,
309    input: PipelineData,
310) -> Result<PipelineData, ShellError> {
311    let head = call.head;
312    let cell_path: CellPath = call.req(engine_state, stack, 0)?;
313    let replacement: Value = call.req(engine_state, stack, 1)?;
314    let input = input.into_stream_or_original(engine_state);
315
316    upsert_recursive(
317        engine_state,
318        stack,
319        head,
320        replacement,
321        input,
322        &cell_path.members,
323    )
324}
325
326fn upsert_value_by_closure(
327    value: &mut Value,
328    closure: &mut ClosureEval,
329    span: Span,
330    cell_path: &[PathMember],
331) -> Result<(), ShellError> {
332    let value_at_path = value.follow_cell_path(cell_path);
333
334    let input = value_at_path
335        .map(Cow::into_owned)
336        .map(IntoPipelineData::into_pipeline_data)
337        .unwrap_or(PipelineData::empty());
338
339    let new_value = closure
340        .add_arg(value.clone())?
341        .run_with_input(input)?
342        .into_value(span)?;
343
344    value.upsert_data_at_cell_path(cell_path, new_value)
345}
346
347fn upsert_single_value_by_closure(
348    value: &mut Value,
349    closure: ClosureEvalOnce,
350    span: Span,
351    cell_path: &[PathMember],
352    cell_value_as_arg: bool,
353) -> Result<(), ShellError> {
354    let value_at_path = value.follow_cell_path(cell_path);
355
356    // FIXME: this leads to inconsistent behaviors between
357    // `{a: b} | upsert a {|x| print $x}` and
358    // `[{a: b}] | upsert 0.a {|x| print $x}`
359    let arg = if cell_value_as_arg {
360        value_at_path
361            .as_deref()
362            .cloned()
363            .unwrap_or(Value::nothing(span))
364    } else {
365        value.clone()
366    };
367
368    let input = value_at_path
369        .map(Cow::into_owned)
370        .map(IntoPipelineData::into_pipeline_data)
371        .unwrap_or(PipelineData::empty());
372
373    let new_value = closure
374        .add_arg(arg)?
375        .run_with_input(input)?
376        .into_value(span)?;
377
378    value.upsert_data_at_cell_path(cell_path, new_value)
379}
380
381#[cfg(test)]
382mod test {
383    use super::*;
384
385    #[test]
386    fn test_examples() -> nu_test_support::Result {
387        nu_test_support::test().examples(Upsert)
388    }
389}