nu_command/filters/
upsert.rs

1use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
2use nu_protocol::ast::PathMember;
3
4#[derive(Clone)]
5pub struct Upsert;
6
7impl Command for Upsert {
8    fn name(&self) -> &str {
9        "upsert"
10    }
11
12    fn signature(&self) -> Signature {
13        Signature::build("upsert")
14            .input_output_types(vec![
15                (Type::record(), Type::record()),
16                (Type::table(), Type::table()),
17                (
18                    Type::List(Box::new(Type::Any)),
19                    Type::List(Box::new(Type::Any)),
20                ),
21            ])
22            .required(
23                "field",
24                SyntaxShape::CellPath,
25                "The name of the column to update or insert.",
26            )
27            .required(
28                "replacement value",
29                SyntaxShape::Any,
30                "The new value to give the cell(s), or a closure to create the value.",
31            )
32            .allow_variants_without_examples(true)
33            .category(Category::Filters)
34    }
35
36    fn description(&self) -> &str {
37        "Update an existing column to have a new value, or insert a new column."
38    }
39
40    fn extra_description(&self) -> &str {
41        "When updating or inserting a column, the closure will be run for each row, and the current row will be passed as the first argument. \
42Referencing `$in` inside the closure will provide the value at the column for the current row or null if the column does not exist.
43
44When updating a specific index, the closure will instead be run once. The first argument to the closure and the `$in` value will both be the current value at the index. \
45If the command is inserting at the end of a list or table, then both of these values will be null."
46    }
47
48    fn search_terms(&self) -> Vec<&str> {
49        vec!["add"]
50    }
51
52    fn run(
53        &self,
54        engine_state: &EngineState,
55        stack: &mut Stack,
56        call: &Call,
57        input: PipelineData,
58    ) -> Result<PipelineData, ShellError> {
59        upsert(engine_state, stack, call, input)
60    }
61
62    fn examples(&self) -> Vec<Example> {
63        vec![
64            Example {
65                description: "Update a record's value",
66                example: "{'name': 'nu', 'stars': 5} | upsert name 'Nushell'",
67                result: Some(Value::test_record(record! {
68                    "name" => Value::test_string("Nushell"),
69                    "stars" => Value::test_int(5),
70                })),
71            },
72            Example {
73                description: "Insert a new entry into a record",
74                example: "{'name': 'nu', 'stars': 5} | upsert language 'Rust'",
75                result: Some(Value::test_record(record! {
76                    "name" =>     Value::test_string("nu"),
77                    "stars" =>    Value::test_int(5),
78                    "language" => Value::test_string("Rust"),
79                })),
80            },
81            Example {
82                description: "Update each row of a table",
83                example: "[[name lang]; [Nushell ''] [Reedline '']] | upsert lang 'Rust'",
84                result: Some(Value::test_list(vec![
85                    Value::test_record(record! {
86                        "name" => Value::test_string("Nushell"),
87                        "lang" => Value::test_string("Rust"),
88                    }),
89                    Value::test_record(record! {
90                        "name" => Value::test_string("Reedline"),
91                        "lang" => Value::test_string("Rust"),
92                    }),
93                ])),
94            },
95            Example {
96                description: "Insert a new column with values computed based off the other columns",
97                example: "[[foo]; [7] [8] [9]] | upsert bar {|row| $row.foo * 2 }",
98                result: Some(Value::test_list(vec![
99                    Value::test_record(record! {
100                        "foo" => Value::test_int(7),
101                        "bar" => Value::test_int(14),
102                    }),
103                    Value::test_record(record! {
104                        "foo" => Value::test_int(8),
105                        "bar" => Value::test_int(16),
106                    }),
107                    Value::test_record(record! {
108                        "foo" => Value::test_int(9),
109                        "bar" => Value::test_int(18),
110                    }),
111                ])),
112            },
113            Example {
114                description: "Update null values in a column to a default value",
115                example: "[[foo]; [2] [null] [4]] | upsert foo { default 0 }",
116                result: Some(Value::test_list(vec![
117                    Value::test_record(record! {
118                        "foo" => Value::test_int(2),
119                    }),
120                    Value::test_record(record! {
121                        "foo" => Value::test_int(0),
122                    }),
123                    Value::test_record(record! {
124                        "foo" => Value::test_int(4),
125                    }),
126                ])),
127            },
128            Example {
129                description: "Upsert into a list, updating an existing value at an index",
130                example: "[1 2 3] | upsert 0 2",
131                result: Some(Value::test_list(vec![
132                    Value::test_int(2),
133                    Value::test_int(2),
134                    Value::test_int(3),
135                ])),
136            },
137            Example {
138                description: "Upsert into a list, inserting a new value at the end",
139                example: "[1 2 3] | upsert 3 4",
140                result: Some(Value::test_list(vec![
141                    Value::test_int(1),
142                    Value::test_int(2),
143                    Value::test_int(3),
144                    Value::test_int(4),
145                ])),
146            },
147            Example {
148                description: "Upsert into a nested path, creating new values as needed",
149                example: "[{} {a: [{}]}] | upsert a.0.b \"value\"",
150                result: Some(Value::test_list(vec![
151                    Value::test_record(record!(
152                        "a" => Value::test_list(vec![Value::test_record(record!(
153                            "b" => Value::test_string("value"),
154                        ))]),
155                    )),
156                    Value::test_record(record!(
157                        "a" => Value::test_list(vec![Value::test_record(record!(
158                            "b" => Value::test_string("value"),
159                        ))]),
160                    )),
161                ])),
162            },
163        ]
164    }
165}
166
167fn upsert(
168    engine_state: &EngineState,
169    stack: &mut Stack,
170    call: &Call,
171    input: PipelineData,
172) -> Result<PipelineData, ShellError> {
173    let head = call.head;
174    let cell_path: CellPath = call.req(engine_state, stack, 0)?;
175    let replacement: Value = call.req(engine_state, stack, 1)?;
176
177    match input {
178        PipelineData::Value(mut value, metadata) => {
179            if let Value::Closure { val, .. } = replacement {
180                match (cell_path.members.first(), &mut value) {
181                    (Some(PathMember::String { .. }), Value::List { vals, .. }) => {
182                        let mut closure = ClosureEval::new(engine_state, stack, *val);
183                        for val in vals {
184                            upsert_value_by_closure(
185                                val,
186                                &mut closure,
187                                head,
188                                &cell_path.members,
189                                false,
190                            )?;
191                        }
192                    }
193                    (first, _) => {
194                        upsert_single_value_by_closure(
195                            &mut value,
196                            ClosureEvalOnce::new(engine_state, stack, *val),
197                            head,
198                            &cell_path.members,
199                            matches!(first, Some(PathMember::Int { .. })),
200                        )?;
201                    }
202                }
203            } else {
204                value.upsert_data_at_cell_path(&cell_path.members, replacement)?;
205            }
206            Ok(value.into_pipeline_data_with_metadata(metadata))
207        }
208        PipelineData::ListStream(stream, metadata) => {
209            if let Some((
210                &PathMember::Int {
211                    val,
212                    span: path_span,
213                    ..
214                },
215                path,
216            )) = cell_path.members.split_first()
217            {
218                let mut stream = stream.into_iter();
219                let mut pre_elems = vec![];
220
221                for idx in 0..val {
222                    if let Some(v) = stream.next() {
223                        pre_elems.push(v);
224                    } else {
225                        return Err(ShellError::InsertAfterNextFreeIndex {
226                            available_idx: idx,
227                            span: path_span,
228                        });
229                    }
230                }
231
232                let value = if path.is_empty() {
233                    let value = stream.next().unwrap_or(Value::nothing(head));
234                    if let Value::Closure { val, .. } = replacement {
235                        ClosureEvalOnce::new(engine_state, stack, *val)
236                            .run_with_value(value)?
237                            .into_value(head)?
238                    } else {
239                        replacement
240                    }
241                } else if let Some(mut value) = stream.next() {
242                    if let Value::Closure { val, .. } = replacement {
243                        upsert_single_value_by_closure(
244                            &mut value,
245                            ClosureEvalOnce::new(engine_state, stack, *val),
246                            head,
247                            path,
248                            true,
249                        )?;
250                    } else {
251                        value.upsert_data_at_cell_path(path, replacement)?;
252                    }
253                    value
254                } else {
255                    return Err(ShellError::AccessBeyondEnd {
256                        max_idx: pre_elems.len() - 1,
257                        span: path_span,
258                    });
259                };
260
261                pre_elems.push(value);
262
263                Ok(pre_elems
264                    .into_iter()
265                    .chain(stream)
266                    .into_pipeline_data_with_metadata(
267                        head,
268                        engine_state.signals().clone(),
269                        metadata,
270                    ))
271            } else if let Value::Closure { val, .. } = replacement {
272                let mut closure = ClosureEval::new(engine_state, stack, *val);
273                let stream = stream.map(move |mut value| {
274                    let err = upsert_value_by_closure(
275                        &mut value,
276                        &mut closure,
277                        head,
278                        &cell_path.members,
279                        false,
280                    );
281
282                    if let Err(e) = err {
283                        Value::error(e, head)
284                    } else {
285                        value
286                    }
287                });
288
289                Ok(PipelineData::ListStream(stream, metadata))
290            } else {
291                let stream = stream.map(move |mut value| {
292                    if let Err(e) =
293                        value.upsert_data_at_cell_path(&cell_path.members, replacement.clone())
294                    {
295                        Value::error(e, head)
296                    } else {
297                        value
298                    }
299                });
300
301                Ok(PipelineData::ListStream(stream, metadata))
302            }
303        }
304        PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
305            type_name: "empty pipeline".to_string(),
306            span: head,
307        }),
308        PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
309            type_name: stream.type_().describe().into(),
310            span: head,
311        }),
312    }
313}
314
315fn upsert_value_by_closure(
316    value: &mut Value,
317    closure: &mut ClosureEval,
318    span: Span,
319    cell_path: &[PathMember],
320    first_path_member_int: bool,
321) -> Result<(), ShellError> {
322    let value_at_path = value.clone().follow_cell_path(cell_path, false);
323
324    let arg = if first_path_member_int {
325        value_at_path.clone().unwrap_or(Value::nothing(span))
326    } else {
327        value.clone()
328    };
329
330    let input = value_at_path
331        .map(IntoPipelineData::into_pipeline_data)
332        .unwrap_or(PipelineData::Empty);
333
334    let new_value = closure
335        .add_arg(arg)
336        .run_with_input(input)?
337        .into_value(span)?;
338
339    value.upsert_data_at_cell_path(cell_path, new_value)
340}
341
342fn upsert_single_value_by_closure(
343    value: &mut Value,
344    closure: ClosureEvalOnce,
345    span: Span,
346    cell_path: &[PathMember],
347    first_path_member_int: bool,
348) -> Result<(), ShellError> {
349    let value_at_path = value.clone().follow_cell_path(cell_path, false);
350
351    let arg = if first_path_member_int {
352        value_at_path.clone().unwrap_or(Value::nothing(span))
353    } else {
354        value.clone()
355    };
356
357    let input = value_at_path
358        .map(IntoPipelineData::into_pipeline_data)
359        .unwrap_or(PipelineData::Empty);
360
361    let new_value = closure
362        .add_arg(arg)
363        .run_with_input(input)?
364        .into_value(span)?;
365
366    value.upsert_data_at_cell_path(cell_path, new_value)
367}
368
369#[cfg(test)]
370mod test {
371    use super::*;
372
373    #[test]
374    fn test_examples() {
375        use crate::test_examples;
376
377        test_examples(Upsert {})
378    }
379}