1use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
2use nu_protocol::ast::PathMember;
3
4#[derive(Clone)]
5pub struct Upsert;
6
7impl Command for Upsert {
8 fn name(&self) -> &str {
9 "upsert"
10 }
11
12 fn signature(&self) -> Signature {
13 Signature::build("upsert")
14 .input_output_types(vec![
15 (Type::record(), Type::record()),
16 (Type::table(), Type::table()),
17 (
18 Type::List(Box::new(Type::Any)),
19 Type::List(Box::new(Type::Any)),
20 ),
21 ])
22 .required(
23 "field",
24 SyntaxShape::CellPath,
25 "The name of the column to update or insert.",
26 )
27 .required(
28 "replacement value",
29 SyntaxShape::Any,
30 "The new value to give the cell(s), or a closure to create the value.",
31 )
32 .allow_variants_without_examples(true)
33 .category(Category::Filters)
34 }
35
36 fn description(&self) -> &str {
37 "Update an existing column to have a new value, or insert a new column."
38 }
39
40 fn extra_description(&self) -> &str {
41 "When updating or inserting a column, the closure will be run for each row, and the current row will be passed as the first argument. \
42Referencing `$in` inside the closure will provide the value at the column for the current row or null if the column does not exist.
43
44When updating a specific index, the closure will instead be run once. The first argument to the closure and the `$in` value will both be the current value at the index. \
45If the command is inserting at the end of a list or table, then both of these values will be null."
46 }
47
48 fn search_terms(&self) -> Vec<&str> {
49 vec!["add"]
50 }
51
52 fn run(
53 &self,
54 engine_state: &EngineState,
55 stack: &mut Stack,
56 call: &Call,
57 input: PipelineData,
58 ) -> Result<PipelineData, ShellError> {
59 upsert(engine_state, stack, call, input)
60 }
61
62 fn examples(&self) -> Vec<Example> {
63 vec![
64 Example {
65 description: "Update a record's value",
66 example: "{'name': 'nu', 'stars': 5} | upsert name 'Nushell'",
67 result: Some(Value::test_record(record! {
68 "name" => Value::test_string("Nushell"),
69 "stars" => Value::test_int(5),
70 })),
71 },
72 Example {
73 description: "Insert a new entry into a record",
74 example: "{'name': 'nu', 'stars': 5} | upsert language 'Rust'",
75 result: Some(Value::test_record(record! {
76 "name" => Value::test_string("nu"),
77 "stars" => Value::test_int(5),
78 "language" => Value::test_string("Rust"),
79 })),
80 },
81 Example {
82 description: "Update each row of a table",
83 example: "[[name lang]; [Nushell ''] [Reedline '']] | upsert lang 'Rust'",
84 result: Some(Value::test_list(vec![
85 Value::test_record(record! {
86 "name" => Value::test_string("Nushell"),
87 "lang" => Value::test_string("Rust"),
88 }),
89 Value::test_record(record! {
90 "name" => Value::test_string("Reedline"),
91 "lang" => Value::test_string("Rust"),
92 }),
93 ])),
94 },
95 Example {
96 description: "Insert a new column with values computed based off the other columns",
97 example: "[[foo]; [7] [8] [9]] | upsert bar {|row| $row.foo * 2 }",
98 result: Some(Value::test_list(vec![
99 Value::test_record(record! {
100 "foo" => Value::test_int(7),
101 "bar" => Value::test_int(14),
102 }),
103 Value::test_record(record! {
104 "foo" => Value::test_int(8),
105 "bar" => Value::test_int(16),
106 }),
107 Value::test_record(record! {
108 "foo" => Value::test_int(9),
109 "bar" => Value::test_int(18),
110 }),
111 ])),
112 },
113 Example {
114 description: "Update null values in a column to a default value",
115 example: "[[foo]; [2] [null] [4]] | upsert foo { default 0 }",
116 result: Some(Value::test_list(vec![
117 Value::test_record(record! {
118 "foo" => Value::test_int(2),
119 }),
120 Value::test_record(record! {
121 "foo" => Value::test_int(0),
122 }),
123 Value::test_record(record! {
124 "foo" => Value::test_int(4),
125 }),
126 ])),
127 },
128 Example {
129 description: "Upsert into a list, updating an existing value at an index",
130 example: "[1 2 3] | upsert 0 2",
131 result: Some(Value::test_list(vec![
132 Value::test_int(2),
133 Value::test_int(2),
134 Value::test_int(3),
135 ])),
136 },
137 Example {
138 description: "Upsert into a list, inserting a new value at the end",
139 example: "[1 2 3] | upsert 3 4",
140 result: Some(Value::test_list(vec![
141 Value::test_int(1),
142 Value::test_int(2),
143 Value::test_int(3),
144 Value::test_int(4),
145 ])),
146 },
147 Example {
148 description: "Upsert into a nested path, creating new values as needed",
149 example: "[{} {a: [{}]}] | upsert a.0.b \"value\"",
150 result: Some(Value::test_list(vec![
151 Value::test_record(record!(
152 "a" => Value::test_list(vec![Value::test_record(record!(
153 "b" => Value::test_string("value"),
154 ))]),
155 )),
156 Value::test_record(record!(
157 "a" => Value::test_list(vec![Value::test_record(record!(
158 "b" => Value::test_string("value"),
159 ))]),
160 )),
161 ])),
162 },
163 ]
164 }
165}
166
167fn upsert(
168 engine_state: &EngineState,
169 stack: &mut Stack,
170 call: &Call,
171 input: PipelineData,
172) -> Result<PipelineData, ShellError> {
173 let head = call.head;
174 let cell_path: CellPath = call.req(engine_state, stack, 0)?;
175 let replacement: Value = call.req(engine_state, stack, 1)?;
176
177 match input {
178 PipelineData::Value(mut value, metadata) => {
179 if let Value::Closure { val, .. } = replacement {
180 match (cell_path.members.first(), &mut value) {
181 (Some(PathMember::String { .. }), Value::List { vals, .. }) => {
182 let mut closure = ClosureEval::new(engine_state, stack, *val);
183 for val in vals {
184 upsert_value_by_closure(
185 val,
186 &mut closure,
187 head,
188 &cell_path.members,
189 false,
190 )?;
191 }
192 }
193 (first, _) => {
194 upsert_single_value_by_closure(
195 &mut value,
196 ClosureEvalOnce::new(engine_state, stack, *val),
197 head,
198 &cell_path.members,
199 matches!(first, Some(PathMember::Int { .. })),
200 )?;
201 }
202 }
203 } else {
204 value.upsert_data_at_cell_path(&cell_path.members, replacement)?;
205 }
206 Ok(value.into_pipeline_data_with_metadata(metadata))
207 }
208 PipelineData::ListStream(stream, metadata) => {
209 if let Some((
210 &PathMember::Int {
211 val,
212 span: path_span,
213 ..
214 },
215 path,
216 )) = cell_path.members.split_first()
217 {
218 let mut stream = stream.into_iter();
219 let mut pre_elems = vec![];
220
221 for idx in 0..val {
222 if let Some(v) = stream.next() {
223 pre_elems.push(v);
224 } else {
225 return Err(ShellError::InsertAfterNextFreeIndex {
226 available_idx: idx,
227 span: path_span,
228 });
229 }
230 }
231
232 let value = if path.is_empty() {
233 let value = stream.next().unwrap_or(Value::nothing(head));
234 if let Value::Closure { val, .. } = replacement {
235 ClosureEvalOnce::new(engine_state, stack, *val)
236 .run_with_value(value)?
237 .into_value(head)?
238 } else {
239 replacement
240 }
241 } else if let Some(mut value) = stream.next() {
242 if let Value::Closure { val, .. } = replacement {
243 upsert_single_value_by_closure(
244 &mut value,
245 ClosureEvalOnce::new(engine_state, stack, *val),
246 head,
247 path,
248 true,
249 )?;
250 } else {
251 value.upsert_data_at_cell_path(path, replacement)?;
252 }
253 value
254 } else {
255 return Err(ShellError::AccessBeyondEnd {
256 max_idx: pre_elems.len() - 1,
257 span: path_span,
258 });
259 };
260
261 pre_elems.push(value);
262
263 Ok(pre_elems
264 .into_iter()
265 .chain(stream)
266 .into_pipeline_data_with_metadata(
267 head,
268 engine_state.signals().clone(),
269 metadata,
270 ))
271 } else if let Value::Closure { val, .. } = replacement {
272 let mut closure = ClosureEval::new(engine_state, stack, *val);
273 let stream = stream.map(move |mut value| {
274 let err = upsert_value_by_closure(
275 &mut value,
276 &mut closure,
277 head,
278 &cell_path.members,
279 false,
280 );
281
282 if let Err(e) = err {
283 Value::error(e, head)
284 } else {
285 value
286 }
287 });
288
289 Ok(PipelineData::ListStream(stream, metadata))
290 } else {
291 let stream = stream.map(move |mut value| {
292 if let Err(e) =
293 value.upsert_data_at_cell_path(&cell_path.members, replacement.clone())
294 {
295 Value::error(e, head)
296 } else {
297 value
298 }
299 });
300
301 Ok(PipelineData::ListStream(stream, metadata))
302 }
303 }
304 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
305 type_name: "empty pipeline".to_string(),
306 span: head,
307 }),
308 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
309 type_name: stream.type_().describe().into(),
310 span: head,
311 }),
312 }
313}
314
315fn upsert_value_by_closure(
316 value: &mut Value,
317 closure: &mut ClosureEval,
318 span: Span,
319 cell_path: &[PathMember],
320 first_path_member_int: bool,
321) -> Result<(), ShellError> {
322 let value_at_path = value.clone().follow_cell_path(cell_path, false);
323
324 let arg = if first_path_member_int {
325 value_at_path.clone().unwrap_or(Value::nothing(span))
326 } else {
327 value.clone()
328 };
329
330 let input = value_at_path
331 .map(IntoPipelineData::into_pipeline_data)
332 .unwrap_or(PipelineData::Empty);
333
334 let new_value = closure
335 .add_arg(arg)
336 .run_with_input(input)?
337 .into_value(span)?;
338
339 value.upsert_data_at_cell_path(cell_path, new_value)
340}
341
342fn upsert_single_value_by_closure(
343 value: &mut Value,
344 closure: ClosureEvalOnce,
345 span: Span,
346 cell_path: &[PathMember],
347 first_path_member_int: bool,
348) -> Result<(), ShellError> {
349 let value_at_path = value.clone().follow_cell_path(cell_path, false);
350
351 let arg = if first_path_member_int {
352 value_at_path.clone().unwrap_or(Value::nothing(span))
353 } else {
354 value.clone()
355 };
356
357 let input = value_at_path
358 .map(IntoPipelineData::into_pipeline_data)
359 .unwrap_or(PipelineData::Empty);
360
361 let new_value = closure
362 .add_arg(arg)
363 .run_with_input(input)?
364 .into_value(span)?;
365
366 value.upsert_data_at_cell_path(cell_path, new_value)
367}
368
369#[cfg(test)]
370mod test {
371 use super::*;
372
373 #[test]
374 fn test_examples() {
375 use crate::test_examples;
376
377 test_examples(Upsert {})
378 }
379}