1use crate::progress_bar;
2use nu_engine::{command_prelude::*, get_eval_block};
3use nu_path::{expand_path_with, is_windows_device_path};
4use nu_protocol::{
5 ByteStreamSource, DataSource, OutDest, PipelineMetadata, Signals, ast,
6 byte_stream::copy_with_signals, process::ChildPipe, shell_error::generic::GenericError,
7 shell_error::io::IoError,
8};
9use std::{
10 borrow::Cow,
11 fs::File,
12 io::{self, BufRead, BufReader, Read, Write},
13 path::{Path, PathBuf},
14 thread,
15 time::Duration,
16};
17
18use nu_utils::time::Instant;
19
20#[derive(Clone)]
21pub struct Save;
22
23impl Command for Save {
24 fn name(&self) -> &str {
25 "save"
26 }
27
28 fn description(&self) -> &str {
29 "Save a file."
30 }
31
32 fn search_terms(&self) -> Vec<&str> {
33 vec![
34 "write",
35 "write_file",
36 "append",
37 "redirection",
38 "file",
39 "io",
40 ">",
41 ">>",
42 ]
43 }
44
45 fn signature(&self) -> nu_protocol::Signature {
46 Signature::build("save")
47 .input_output_types(vec![(Type::Any, Type::Nothing)])
48 .required("filename", SyntaxShape::Filepath, "The filename to use.")
49 .named(
50 "stderr",
51 SyntaxShape::Filepath,
52 "The filename used to save stderr, only works with `-r` flag.",
53 Some('e'),
54 )
55 .switch("raw", "Save file as raw binary.", Some('r'))
56 .switch("append", "Append input to the end of the file.", Some('a'))
57 .switch("force", "Overwrite the destination.", Some('f'))
58 .switch("progress", "Enable progress bar.", Some('p'))
59 .category(Category::FileSystem)
60 }
61
62 fn run(
63 &self,
64 engine_state: &EngineState,
65 stack: &mut Stack,
66 call: &Call,
67 input: PipelineData,
68 ) -> Result<PipelineData, ShellError> {
69 let raw = call.has_flag(engine_state, stack, "raw")?;
70 let append = call.has_flag(engine_state, stack, "append")?;
71 let force = call.has_flag(engine_state, stack, "force")?;
72 let progress = call.has_flag(engine_state, stack, "progress")?;
73
74 let span = call.head;
75 let cwd = engine_state.cwd(Some(stack))?.into_std_path_buf();
76
77 let path_arg = call.req::<Spanned<PathBuf>>(engine_state, stack, 0)?;
78 let path = Spanned {
79 item: expand_path_with(path_arg.item, &cwd, true),
80 span: path_arg.span,
81 };
82
83 let stderr_path = call
84 .get_flag::<Spanned<PathBuf>>(engine_state, stack, "stderr")?
85 .map(|arg| Spanned {
86 item: expand_path_with(arg.item, cwd, true),
87 span: arg.span,
88 });
89
90 let from_io_error = IoError::factory(span, path.item.as_path());
91 match input {
92 PipelineData::ByteStream(stream, metadata) => {
93 check_saving_to_source_file(metadata.as_ref(), &path, stderr_path.as_ref())?;
94
95 let (file, stderr_file) =
96 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
97
98 let size = stream.known_size();
99 let signals = engine_state.signals();
100
101 match stream.into_source() {
102 ByteStreamSource::Read(read) => {
103 stream_to_file(read, size, signals, file, span, progress)?;
104 }
105 ByteStreamSource::File(source) => {
106 stream_to_file(source, size, signals, file, span, progress)?;
107 }
108 #[cfg(feature = "os")]
109 ByteStreamSource::Child(mut child) => {
110 fn write_or_consume_stderr(
111 stderr: ChildPipe,
112 file: Option<File>,
113 span: Span,
114 signals: &Signals,
115 progress: bool,
116 ) -> Result<(), ShellError> {
117 if let Some(file) = file {
118 match stderr {
119 ChildPipe::Pipe(pipe) => {
120 stream_to_file(pipe, None, signals, file, span, progress)
121 }
122 ChildPipe::Tee(tee) => {
123 stream_to_file(tee, None, signals, file, span, progress)
124 }
125 }?
126 } else {
127 match stderr {
128 ChildPipe::Pipe(mut pipe) => {
129 io::copy(&mut pipe, &mut io::stderr())
130 }
131 ChildPipe::Tee(mut tee) => {
132 io::copy(&mut tee, &mut io::stderr())
133 }
134 }
135 .map_err(|err| IoError::new(err, span, None))?;
136 }
137 Ok(())
138 }
139
140 match (child.stdout.take(), child.stderr.take()) {
141 (Some(stdout), stderr) => {
142 let handler = stderr
144 .map(|stderr| {
145 let signals = signals.clone();
146 thread::Builder::new().name("stderr saver".into()).spawn(
147 move || {
148 write_or_consume_stderr(
149 stderr,
150 stderr_file,
151 span,
152 &signals,
153 progress,
154 )
155 },
156 )
157 })
158 .transpose()
159 .map_err(&from_io_error)?;
160
161 let res = match stdout {
162 ChildPipe::Pipe(pipe) => {
163 stream_to_file(pipe, None, signals, file, span, progress)
164 }
165 ChildPipe::Tee(tee) => {
166 stream_to_file(tee, None, signals, file, span, progress)
167 }
168 };
169 if let Some(h) = handler {
170 h.join().map_err(|err| ShellError::ExternalCommand {
171 label: "Fail to receive external commands stderr message"
172 .to_string(),
173 help: format!("{err:?}"),
174 span,
175 })??;
176 }
177 res?;
178 }
179 (None, Some(stderr)) => {
180 write_or_consume_stderr(
181 stderr,
182 stderr_file,
183 span,
184 signals,
185 progress,
186 )?;
187 }
188 (None, None) => {}
189 };
190
191 child.wait()?;
192 }
193 }
194
195 Ok(PipelineData::empty())
196 }
197 PipelineData::ListStream(ls, pipeline_metadata)
198 if raw || prepare_path(&path, append, force)?.0.extension().is_none() =>
199 {
200 check_saving_to_source_file(
201 pipeline_metadata.as_ref(),
202 &path,
203 stderr_path.as_ref(),
204 )?;
205
206 let (mut file, _) =
207 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
208 for val in ls {
209 file.write_all(&value_to_bytes(val)?)
210 .map_err(&from_io_error)?;
211 file.write_all("\n".as_bytes()).map_err(&from_io_error)?;
212 }
213 file.flush().map_err(&from_io_error)?;
214
215 Ok(PipelineData::empty())
216 }
217 input => {
218 if !matches!(input, PipelineData::Value(..) | PipelineData::Empty) {
221 check_saving_to_source_file(input.metadata_ref(), &path, stderr_path.as_ref())?;
222 }
223
224 let ext = extract_extension(&input, &path.item, raw);
226 let converted = match ext {
227 None => input,
228 Some(ext) => convert_to_extension(engine_state, &ext, stack, input, span)?,
229 };
230
231 if let PipelineData::Value(v @ Value::Custom { .. }, ..) = converted {
233 let val_span = v.span();
234 let val = v.into_custom_value()?;
235 return val
236 .save(
237 Spanned {
238 item: &path.item,
239 span: path.span,
240 },
241 val_span,
242 span,
243 )
244 .map(|()| PipelineData::empty());
245 }
246
247 let bytes = value_to_bytes(converted.into_value(span)?)?;
248
249 let (mut file, _) =
251 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
252
253 file.write_all(&bytes).map_err(&from_io_error)?;
254 file.flush().map_err(&from_io_error)?;
255
256 Ok(PipelineData::empty())
257 }
258 }
259 }
260
261 fn examples(&self) -> Vec<Example<'_>> {
262 vec![
263 Example {
264 description: "Save a string to foo.txt in the current directory.",
265 example: "'save me' | save foo.txt",
266 result: None,
267 },
268 Example {
269 description: "Append a string to the end of foo.txt.",
270 example: "'append me' | save --append foo.txt",
271 result: None,
272 },
273 Example {
274 description: "Save a record to foo.json in the current directory.",
275 example: "{ a: 1, b: 2 } | save foo.json",
276 result: None,
277 },
278 Example {
279 description: "Save a running program's stderr to foo.txt.",
280 example: "do -i {} | save foo.txt --stderr foo.txt",
281 result: None,
282 },
283 Example {
284 description: "Save a running program's stderr to separate file.",
285 example: "do -i {} | save foo.txt --stderr bar.txt",
286 result: None,
287 },
288 Example {
289 description: "Show the extensions for which the `save` command will automatically serialize.",
290 example: r#"scope commands
291 | where name starts-with "to "
292 | insert extension { get name | str replace -r "^to " "" | $"*.($in)" }
293 | select extension name
294 | rename extension command
295"#,
296 result: None,
297 },
298 ]
299 }
300
301 fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
302 (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
303 }
304}
305
306fn saving_to_source_file_error(dest: &Spanned<PathBuf>) -> ShellError {
307 ShellError::Generic(
308 GenericError::new(
309 "pipeline input and output are the same file",
310 format!(
311 "can't save output to '{}' while it's being read",
312 dest.item.display()
313 ),
314 dest.span,
315 )
316 .with_help(
317 "insert a `collect` command in the pipeline before `save` (see `help collect`).",
318 ),
319 )
320}
321
322fn check_saving_to_source_file(
323 metadata: Option<&PipelineMetadata>,
324 dest: &Spanned<PathBuf>,
325 stderr_dest: Option<&Spanned<PathBuf>>,
326) -> Result<(), ShellError> {
327 let Some(DataSource::FilePath(source)) = metadata.map(|meta| &meta.data_source) else {
328 return Ok(());
329 };
330
331 if &dest.item == source {
332 return Err(saving_to_source_file_error(dest));
333 }
334
335 if let Some(dest) = stderr_dest
336 && &dest.item == source
337 {
338 return Err(saving_to_source_file_error(dest));
339 }
340
341 Ok(())
342}
343
344fn extract_extension<'e>(input: &PipelineData, path: &'e Path, raw: bool) -> Option<Cow<'e, str>> {
346 match (raw, input) {
347 (true, _)
348 | (_, PipelineData::ByteStream(..))
349 | (_, PipelineData::Value(Value::String { .. }, ..)) => None,
350 _ => path.extension().map(|name| name.to_string_lossy()),
351 }
352}
353
354fn convert_to_extension(
358 engine_state: &EngineState,
359 extension: &str,
360 stack: &mut Stack,
361 input: PipelineData,
362 span: Span,
363) -> Result<PipelineData, ShellError> {
364 if let Some(decl_id) = engine_state.find_decl(format!("to {extension}").as_bytes(), &[]) {
365 let decl = engine_state.get_decl(decl_id);
366 if let Some(block_id) = decl.block_id() {
367 let block = engine_state.get_block(block_id);
368 let eval_block = get_eval_block(engine_state);
369 eval_block(engine_state, stack, block, input).map(|p| p.body)
370 } else {
371 let call = ast::Call::new(span);
372 decl.run(engine_state, stack, &(&call).into(), input)
373 }
374 } else {
375 Ok(input)
376 }
377}
378
379fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
383 match value {
384 Value::String { val, .. } => Ok(val.into_bytes()),
385 Value::Binary { val, .. } => Ok(val),
386 Value::List { vals, .. } => {
387 let val = vals
388 .into_iter()
389 .map(Value::coerce_into_string)
390 .collect::<Result<Vec<String>, ShellError>>()?
391 .join("\n")
392 + "\n";
393
394 Ok(val.into_bytes())
395 }
396 Value::Error { error, .. } => Err(*error),
398 other => Ok(other.coerce_into_string()?.into_bytes()),
399 }
400}
401
402fn prepare_path(
405 path: &Spanned<PathBuf>,
406 append: bool,
407 force: bool,
408) -> Result<(&Path, Span), ShellError> {
409 let span = path.span;
410 let path = &path.item;
411
412 if !(force || append) && path.exists() {
413 Err(ShellError::Generic(
414 GenericError::new(
415 "Destination file already exists",
416 format!(
417 "Destination file '{}' already exists",
418 path.to_string_lossy()
419 ),
420 span,
421 )
422 .with_help("you can use -f, --force to force overwriting the destination"),
423 ))
424 } else {
425 Ok((path, span))
426 }
427}
428
429fn open_file(
430 engine_state: &EngineState,
431 path: &Path,
432 span: Span,
433 append: bool,
434) -> Result<File, ShellError> {
435 let file: std::io::Result<File> = match (append, path.exists() || is_windows_device_path(path))
436 {
437 (true, true) => std::fs::OpenOptions::new().append(true).open(path),
438 _ => {
439 #[cfg(target_os = "windows")]
442 if path.is_dir() {
443 #[allow(
444 deprecated,
445 reason = "we don't get a IsADirectory error, so we need to provide it"
446 )]
447 Err(std::io::ErrorKind::IsADirectory.into())
448 } else {
449 std::fs::File::create(path)
450 }
451 #[cfg(not(target_os = "windows"))]
452 std::fs::File::create(path)
453 }
454 };
455
456 match file {
457 Ok(file) => Ok(file),
458 Err(err) => {
459 if err.kind() == std::io::ErrorKind::NotFound
462 && let Some(missing_component) =
463 path.ancestors().skip(1).filter(|dir| !dir.exists()).last()
464 {
465 let components_to_remove = path
468 .strip_prefix(missing_component)
469 .expect("Stripping ancestor from a path should never fail")
470 .as_os_str()
471 .as_encoded_bytes();
472
473 return Err(ShellError::Io(IoError::new(
474 ErrorKind::DirectoryNotFound,
475 engine_state
476 .span_match_postfix(span, components_to_remove)
477 .map(|(pre, _post)| pre)
478 .unwrap_or(span),
479 PathBuf::from(missing_component),
480 )));
481 }
482
483 Err(ShellError::Io(IoError::new(err, span, PathBuf::from(path))))
484 }
485 }
486}
487
488fn get_files(
490 engine_state: &EngineState,
491 path: &Spanned<PathBuf>,
492 stderr_path: Option<&Spanned<PathBuf>>,
493 append: bool,
494 force: bool,
495) -> Result<(File, Option<File>), ShellError> {
496 let (path, path_span) = prepare_path(path, append, force)?;
498 let stderr_path_and_span = stderr_path
499 .as_ref()
500 .map(|stderr_path| prepare_path(stderr_path, append, force))
501 .transpose()?;
502
503 let file = open_file(engine_state, path, path_span, append)?;
505
506 let stderr_file = stderr_path_and_span
507 .map(|(stderr_path, stderr_path_span)| {
508 if path == stderr_path {
509 Err(ShellError::Generic(
510 GenericError::new(
511 "input and stderr input to same file",
512 "can't save both input and stderr input to the same file",
513 stderr_path_span,
514 )
515 .with_help("you should use `o+e> file` instead"),
516 ))
517 } else {
518 open_file(engine_state, stderr_path, stderr_path_span, append)
519 }
520 })
521 .transpose()?;
522
523 Ok((file, stderr_file))
524}
525
526fn stream_to_file(
527 source: impl Read,
528 known_size: Option<u64>,
529 signals: &Signals,
530 mut file: File,
531 span: Span,
532 progress: bool,
533) -> Result<(), ShellError> {
534 let from_io_error = IoError::factory(span, None);
536
537 if progress {
539 let mut bytes_processed = 0;
540
541 let mut bar = progress_bar::NuProgressBar::new(known_size);
542
543 let mut last_update = Instant::now();
544
545 let mut reader = BufReader::new(source);
546
547 let res = loop {
548 if let Err(err) = signals.check(&span) {
549 bar.abandoned_msg("# Cancelled #".to_owned());
550 return Err(err);
551 }
552
553 match reader.fill_buf() {
554 Ok(&[]) => break Ok(()),
555 Ok(buf) => {
556 file.write_all(buf).map_err(&from_io_error)?;
557 let len = buf.len();
558 reader.consume(len);
559 bytes_processed += len as u64;
560 if last_update.elapsed() >= Duration::from_millis(75) {
561 bar.update_bar(bytes_processed);
562 last_update = Instant::now();
563 }
564 }
565 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
566 Err(e) => break Err(e),
567 }
568 };
569
570 if let Err(err) = res {
572 let _ = file.flush();
573 bar.abandoned_msg("# Error while saving #".to_owned());
574 Err(from_io_error(err).into())
575 } else {
576 file.flush().map_err(&from_io_error)?;
577 Ok(())
578 }
579 } else {
580 copy_with_signals(source, file, span, signals)?;
581 Ok(())
582 }
583}