1use crate::progress_bar;
2use nu_engine::{command_prelude::*, get_eval_block};
3use nu_path::{expand_path_with, is_windows_device_path};
4use nu_protocol::{
5 ByteStreamSource, DataSource, OutDest, PipelineMetadata, Signals, ast,
6 byte_stream::copy_with_signals, process::ChildPipe, shell_error::io::IoError,
7};
8use std::{
9 borrow::Cow,
10 fs::File,
11 io::{self, BufRead, BufReader, Read, Write},
12 path::{Path, PathBuf},
13 thread,
14 time::{Duration, Instant},
15};
16
17#[derive(Clone)]
18pub struct Save;
19
20impl Command for Save {
21 fn name(&self) -> &str {
22 "save"
23 }
24
25 fn description(&self) -> &str {
26 "Save a file."
27 }
28
29 fn search_terms(&self) -> Vec<&str> {
30 vec![
31 "write",
32 "write_file",
33 "append",
34 "redirection",
35 "file",
36 "io",
37 ">",
38 ">>",
39 ]
40 }
41
42 fn signature(&self) -> nu_protocol::Signature {
43 Signature::build("save")
44 .input_output_types(vec![(Type::Any, Type::Nothing)])
45 .required("filename", SyntaxShape::Filepath, "The filename to use.")
46 .named(
47 "stderr",
48 SyntaxShape::Filepath,
49 "the filename used to save stderr, only works with `-r` flag",
50 Some('e'),
51 )
52 .switch("raw", "save file as raw binary", Some('r'))
53 .switch("append", "append input to the end of the file", Some('a'))
54 .switch("force", "overwrite the destination", Some('f'))
55 .switch("progress", "enable progress bar", Some('p'))
56 .category(Category::FileSystem)
57 }
58
59 fn run(
60 &self,
61 engine_state: &EngineState,
62 stack: &mut Stack,
63 call: &Call,
64 input: PipelineData,
65 ) -> Result<PipelineData, ShellError> {
66 let raw = call.has_flag(engine_state, stack, "raw")?;
67 let append = call.has_flag(engine_state, stack, "append")?;
68 let force = call.has_flag(engine_state, stack, "force")?;
69 let progress = call.has_flag(engine_state, stack, "progress")?;
70
71 let span = call.head;
72 let cwd = engine_state.cwd(Some(stack))?.into_std_path_buf();
73
74 let path_arg = call.req::<Spanned<PathBuf>>(engine_state, stack, 0)?;
75 let path = Spanned {
76 item: expand_path_with(path_arg.item, &cwd, true),
77 span: path_arg.span,
78 };
79
80 let stderr_path = call
81 .get_flag::<Spanned<PathBuf>>(engine_state, stack, "stderr")?
82 .map(|arg| Spanned {
83 item: expand_path_with(arg.item, cwd, true),
84 span: arg.span,
85 });
86
87 let from_io_error = IoError::factory(span, path.item.as_path());
88 match input {
89 PipelineData::ByteStream(stream, metadata) => {
90 check_saving_to_source_file(metadata.as_ref(), &path, stderr_path.as_ref())?;
91
92 let (file, stderr_file) =
93 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
94
95 let size = stream.known_size();
96 let signals = engine_state.signals();
97
98 match stream.into_source() {
99 ByteStreamSource::Read(read) => {
100 stream_to_file(read, size, signals, file, span, progress)?;
101 }
102 ByteStreamSource::File(source) => {
103 stream_to_file(source, size, signals, file, span, progress)?;
104 }
105 #[cfg(feature = "os")]
106 ByteStreamSource::Child(mut child) => {
107 fn write_or_consume_stderr(
108 stderr: ChildPipe,
109 file: Option<File>,
110 span: Span,
111 signals: &Signals,
112 progress: bool,
113 ) -> Result<(), ShellError> {
114 if let Some(file) = file {
115 match stderr {
116 ChildPipe::Pipe(pipe) => {
117 stream_to_file(pipe, None, signals, file, span, progress)
118 }
119 ChildPipe::Tee(tee) => {
120 stream_to_file(tee, None, signals, file, span, progress)
121 }
122 }?
123 } else {
124 match stderr {
125 ChildPipe::Pipe(mut pipe) => {
126 io::copy(&mut pipe, &mut io::stderr())
127 }
128 ChildPipe::Tee(mut tee) => {
129 io::copy(&mut tee, &mut io::stderr())
130 }
131 }
132 .map_err(|err| IoError::new(err, span, None))?;
133 }
134 Ok(())
135 }
136
137 match (child.stdout.take(), child.stderr.take()) {
138 (Some(stdout), stderr) => {
139 let handler = stderr
141 .map(|stderr| {
142 let signals = signals.clone();
143 thread::Builder::new().name("stderr saver".into()).spawn(
144 move || {
145 write_or_consume_stderr(
146 stderr,
147 stderr_file,
148 span,
149 &signals,
150 progress,
151 )
152 },
153 )
154 })
155 .transpose()
156 .map_err(&from_io_error)?;
157
158 let res = match stdout {
159 ChildPipe::Pipe(pipe) => {
160 stream_to_file(pipe, None, signals, file, span, progress)
161 }
162 ChildPipe::Tee(tee) => {
163 stream_to_file(tee, None, signals, file, span, progress)
164 }
165 };
166 if let Some(h) = handler {
167 h.join().map_err(|err| ShellError::ExternalCommand {
168 label: "Fail to receive external commands stderr message"
169 .to_string(),
170 help: format!("{err:?}"),
171 span,
172 })??;
173 }
174 res?;
175 }
176 (None, Some(stderr)) => {
177 write_or_consume_stderr(
178 stderr,
179 stderr_file,
180 span,
181 signals,
182 progress,
183 )?;
184 }
185 (None, None) => {}
186 };
187
188 child.wait()?;
189 }
190 }
191
192 Ok(PipelineData::empty())
193 }
194 PipelineData::ListStream(ls, pipeline_metadata)
195 if raw || prepare_path(&path, append, force)?.0.extension().is_none() =>
196 {
197 check_saving_to_source_file(
198 pipeline_metadata.as_ref(),
199 &path,
200 stderr_path.as_ref(),
201 )?;
202
203 let (mut file, _) =
204 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
205 for val in ls {
206 file.write_all(&value_to_bytes(val)?)
207 .map_err(&from_io_error)?;
208 file.write_all("\n".as_bytes()).map_err(&from_io_error)?;
209 }
210 file.flush().map_err(&from_io_error)?;
211
212 Ok(PipelineData::empty())
213 }
214 input => {
215 if !matches!(input, PipelineData::Value(..) | PipelineData::Empty) {
218 check_saving_to_source_file(
219 input.metadata().as_ref(),
220 &path,
221 stderr_path.as_ref(),
222 )?;
223 }
224
225 let ext = extract_extension(&input, &path.item, raw);
227 let converted = match ext {
228 None => input,
229 Some(ext) => convert_to_extension(engine_state, &ext, stack, input, span)?,
230 };
231
232 if let PipelineData::Value(v @ Value::Custom { .. }, ..) = converted {
234 let val_span = v.span();
235 let val = v.into_custom_value()?;
236 return val
237 .save(
238 Spanned {
239 item: &path.item,
240 span: path.span,
241 },
242 val_span,
243 span,
244 )
245 .map(|()| PipelineData::empty());
246 }
247
248 let bytes = value_to_bytes(converted.into_value(span)?)?;
249
250 let (mut file, _) =
252 get_files(engine_state, &path, stderr_path.as_ref(), append, force)?;
253
254 file.write_all(&bytes).map_err(&from_io_error)?;
255 file.flush().map_err(&from_io_error)?;
256
257 Ok(PipelineData::empty())
258 }
259 }
260 }
261
262 fn examples(&self) -> Vec<Example<'_>> {
263 vec![
264 Example {
265 description: "Save a string to foo.txt in the current directory",
266 example: r#"'save me' | save foo.txt"#,
267 result: None,
268 },
269 Example {
270 description: "Append a string to the end of foo.txt",
271 example: r#"'append me' | save --append foo.txt"#,
272 result: None,
273 },
274 Example {
275 description: "Save a record to foo.json in the current directory",
276 example: r#"{ a: 1, b: 2 } | save foo.json"#,
277 result: None,
278 },
279 Example {
280 description: "Save a running program's stderr to foo.txt",
281 example: r#"do -i {} | save foo.txt --stderr foo.txt"#,
282 result: None,
283 },
284 Example {
285 description: "Save a running program's stderr to separate file",
286 example: r#"do -i {} | save foo.txt --stderr bar.txt"#,
287 result: None,
288 },
289 Example {
290 description: "Show the extensions for which the `save` command will automatically serialize",
291 example: r#"scope commands
292 | where name starts-with "to "
293 | insert extension { get name | str replace -r "^to " "" | $"*.($in)" }
294 | select extension name
295 | rename extension command
296"#,
297 result: None,
298 },
299 ]
300 }
301
302 fn pipe_redirection(&self) -> (Option<OutDest>, Option<OutDest>) {
303 (Some(OutDest::PipeSeparate), Some(OutDest::PipeSeparate))
304 }
305}
306
307fn saving_to_source_file_error(dest: &Spanned<PathBuf>) -> ShellError {
308 ShellError::GenericError {
309 error: "pipeline input and output are the same file".into(),
310 msg: format!(
311 "can't save output to '{}' while it's being read",
312 dest.item.display()
313 ),
314 span: Some(dest.span),
315 help: Some(
316 "insert a `collect` command in the pipeline before `save` (see `help collect`).".into(),
317 ),
318 inner: vec![],
319 }
320}
321
322fn check_saving_to_source_file(
323 metadata: Option<&PipelineMetadata>,
324 dest: &Spanned<PathBuf>,
325 stderr_dest: Option<&Spanned<PathBuf>>,
326) -> Result<(), ShellError> {
327 let Some(DataSource::FilePath(source)) = metadata.map(|meta| &meta.data_source) else {
328 return Ok(());
329 };
330
331 if &dest.item == source {
332 return Err(saving_to_source_file_error(dest));
333 }
334
335 if let Some(dest) = stderr_dest
336 && &dest.item == source
337 {
338 return Err(saving_to_source_file_error(dest));
339 }
340
341 Ok(())
342}
343
344fn extract_extension<'e>(input: &PipelineData, path: &'e Path, raw: bool) -> Option<Cow<'e, str>> {
346 match (raw, input) {
347 (true, _)
348 | (_, PipelineData::ByteStream(..))
349 | (_, PipelineData::Value(Value::String { .. }, ..)) => None,
350 _ => path.extension().map(|name| name.to_string_lossy()),
351 }
352}
353
354fn convert_to_extension(
358 engine_state: &EngineState,
359 extension: &str,
360 stack: &mut Stack,
361 input: PipelineData,
362 span: Span,
363) -> Result<PipelineData, ShellError> {
364 if let Some(decl_id) = engine_state.find_decl(format!("to {extension}").as_bytes(), &[]) {
365 let decl = engine_state.get_decl(decl_id);
366 if let Some(block_id) = decl.block_id() {
367 let block = engine_state.get_block(block_id);
368 let eval_block = get_eval_block(engine_state);
369 eval_block(engine_state, stack, block, input).map(|p| p.body)
370 } else {
371 let call = ast::Call::new(span);
372 decl.run(engine_state, stack, &(&call).into(), input)
373 }
374 } else {
375 Ok(input)
376 }
377}
378
379fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
383 match value {
384 Value::String { val, .. } => Ok(val.into_bytes()),
385 Value::Binary { val, .. } => Ok(val),
386 Value::List { vals, .. } => {
387 let val = vals
388 .into_iter()
389 .map(Value::coerce_into_string)
390 .collect::<Result<Vec<String>, ShellError>>()?
391 .join("\n")
392 + "\n";
393
394 Ok(val.into_bytes())
395 }
396 Value::Error { error, .. } => Err(*error),
398 other => Ok(other.coerce_into_string()?.into_bytes()),
399 }
400}
401
402fn prepare_path(
405 path: &Spanned<PathBuf>,
406 append: bool,
407 force: bool,
408) -> Result<(&Path, Span), ShellError> {
409 let span = path.span;
410 let path = &path.item;
411
412 if !(force || append) && path.exists() {
413 Err(ShellError::GenericError {
414 error: "Destination file already exists".into(),
415 msg: format!(
416 "Destination file '{}' already exists",
417 path.to_string_lossy()
418 ),
419 span: Some(span),
420 help: Some("you can use -f, --force to force overwriting the destination".into()),
421 inner: vec![],
422 })
423 } else {
424 Ok((path, span))
425 }
426}
427
428fn open_file(
429 engine_state: &EngineState,
430 path: &Path,
431 span: Span,
432 append: bool,
433) -> Result<File, ShellError> {
434 let file: std::io::Result<File> = match (append, path.exists() || is_windows_device_path(path))
435 {
436 (true, true) => std::fs::OpenOptions::new().append(true).open(path),
437 _ => {
438 #[cfg(target_os = "windows")]
441 if path.is_dir() {
442 #[allow(
443 deprecated,
444 reason = "we don't get a IsADirectory error, so we need to provide it"
445 )]
446 Err(std::io::ErrorKind::IsADirectory.into())
447 } else {
448 std::fs::File::create(path)
449 }
450 #[cfg(not(target_os = "windows"))]
451 std::fs::File::create(path)
452 }
453 };
454
455 match file {
456 Ok(file) => Ok(file),
457 Err(err) => {
458 if err.kind() == std::io::ErrorKind::NotFound
461 && let Some(missing_component) =
462 path.ancestors().skip(1).filter(|dir| !dir.exists()).last()
463 {
464 let components_to_remove = path
467 .strip_prefix(missing_component)
468 .expect("Stripping ancestor from a path should never fail")
469 .as_os_str()
470 .as_encoded_bytes();
471
472 return Err(ShellError::Io(IoError::new(
473 ErrorKind::DirectoryNotFound,
474 engine_state
475 .span_match_postfix(span, components_to_remove)
476 .map(|(pre, _post)| pre)
477 .unwrap_or(span),
478 PathBuf::from(missing_component),
479 )));
480 }
481
482 Err(ShellError::Io(IoError::new(err, span, PathBuf::from(path))))
483 }
484 }
485}
486
487fn get_files(
489 engine_state: &EngineState,
490 path: &Spanned<PathBuf>,
491 stderr_path: Option<&Spanned<PathBuf>>,
492 append: bool,
493 force: bool,
494) -> Result<(File, Option<File>), ShellError> {
495 let (path, path_span) = prepare_path(path, append, force)?;
497 let stderr_path_and_span = stderr_path
498 .as_ref()
499 .map(|stderr_path| prepare_path(stderr_path, append, force))
500 .transpose()?;
501
502 let file = open_file(engine_state, path, path_span, append)?;
504
505 let stderr_file = stderr_path_and_span
506 .map(|(stderr_path, stderr_path_span)| {
507 if path == stderr_path {
508 Err(ShellError::GenericError {
509 error: "input and stderr input to same file".into(),
510 msg: "can't save both input and stderr input to the same file".into(),
511 span: Some(stderr_path_span),
512 help: Some("you should use `o+e> file` instead".into()),
513 inner: vec![],
514 })
515 } else {
516 open_file(engine_state, stderr_path, stderr_path_span, append)
517 }
518 })
519 .transpose()?;
520
521 Ok((file, stderr_file))
522}
523
524fn stream_to_file(
525 source: impl Read,
526 known_size: Option<u64>,
527 signals: &Signals,
528 mut file: File,
529 span: Span,
530 progress: bool,
531) -> Result<(), ShellError> {
532 let from_io_error = IoError::factory(span, None);
534
535 if progress {
537 let mut bytes_processed = 0;
538
539 let mut bar = progress_bar::NuProgressBar::new(known_size);
540
541 let mut last_update = Instant::now();
542
543 let mut reader = BufReader::new(source);
544
545 let res = loop {
546 if let Err(err) = signals.check(&span) {
547 bar.abandoned_msg("# Cancelled #".to_owned());
548 return Err(err);
549 }
550
551 match reader.fill_buf() {
552 Ok(&[]) => break Ok(()),
553 Ok(buf) => {
554 file.write_all(buf).map_err(&from_io_error)?;
555 let len = buf.len();
556 reader.consume(len);
557 bytes_processed += len as u64;
558 if last_update.elapsed() >= Duration::from_millis(75) {
559 bar.update_bar(bytes_processed);
560 last_update = Instant::now();
561 }
562 }
563 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
564 Err(e) => break Err(e),
565 }
566 };
567
568 if let Err(err) = res {
570 let _ = file.flush();
571 bar.abandoned_msg("# Error while saving #".to_owned());
572 Err(from_io_error(err).into())
573 } else {
574 file.flush().map_err(&from_io_error)?;
575 Ok(())
576 }
577 } else {
578 copy_with_signals(source, file, span, signals)?;
579 Ok(())
580 }
581}