transferred_files/
destination.rs1use std::path::{Path, PathBuf};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::StreamExt;
8use tokio::fs::File;
9use tracing::warn;
10use transferred_core::{BatchStream, Destination, RunReport, TransferredError};
11
12use crate::formats::FormatWrite;
13
14#[derive(Clone)]
19pub struct FilesDestination {
20 path: PathBuf,
21 format: Arc<dyn FormatWrite>,
22 single_file: bool,
23}
24
25#[async_trait]
26impl Destination for FilesDestination {
27 async fn write_partitions(
28 self: Box<Self>,
29 partitions: Vec<BatchStream>,
30 ) -> Result<RunReport, TransferredError> {
31 let start = Instant::now();
32 let tmp_dir = make_tmp(&self.path);
33 tokio::fs::create_dir_all(&tmp_dir).await?;
34
35 let writtens = match self.write_files(&tmp_dir, partitions).await {
36 Ok(written) => written,
37 Err(err) => {
38 cleanup(&tmp_dir).await;
39 return Err(err);
40 }
41 };
42
43 if let Err(err) = self.atomic_replace(&tmp_dir).await {
44 cleanup(&tmp_dir).await;
45 return Err(err);
46 }
47
48 let mut bytes_written = 0;
49 for written in &writtens {
50 bytes_written += tokio::fs::metadata(&written.path).await?.len();
51 }
52
53 Ok(RunReport {
54 rows: writtens.iter().map(|w| w.rows).sum(),
55 bytes_written,
56 written_objects: writtens
57 .iter()
58 .map(|w| w.path.display().to_string())
59 .collect(),
60 duration: start.elapsed(),
61 coercions: vec![],
62 })
63 }
64}
65
66impl FilesDestination {
67 #[must_use]
69 pub fn new(path: PathBuf, format: Arc<dyn FormatWrite>, single_file: bool) -> Self {
70 Self {
71 path,
72 format,
73 single_file,
74 }
75 }
76
77 fn output_filename(&self, part: usize) -> String {
79 let ext = self.format.file_extension();
80 let base_name = self
81 .path
82 .file_name()
83 .map_or("data".to_string(), |n| n.to_string_lossy().to_string());
84
85 if self.single_file {
86 format!("{base_name}.{ext}")
87 } else {
88 format!("part-{part:05}.{ext}")
89 }
90 }
91
92 async fn write_files(
94 &self,
95 tmp_dir: &Path,
96 partitions: Vec<BatchStream>,
97 ) -> Result<Vec<Written>, TransferredError> {
98 let streams: Vec<BatchStream> = if self.single_file {
99 vec![Box::pin(futures::stream::iter(partitions).flatten())]
100 } else {
101 partitions
102 };
103
104 let mut written = Vec::new();
105 for stream in streams {
106 let mut stream = stream.peekable();
107 if Pin::new(&mut stream).peek().await.is_none() {
108 continue; }
110
111 let name = self.output_filename(written.len() + 1);
112 let file = File::create(tmp_dir.join(&name)).await?;
113
114 let rows = self.format.write(Box::new(file), Box::pin(stream)).await?;
115 written.push(Written {
116 path: self.path.join(&name),
117 rows,
118 });
119 }
120
121 if written.is_empty() {
122 return Err(TransferredError::EmptySource);
123 }
124
125 Ok(written)
126 }
127
128 async fn atomic_replace(&self, tmp_dir: &Path) -> Result<(), TransferredError> {
130 match tokio::fs::metadata(&self.path).await {
131 Ok(meta) if meta.is_dir() => tokio::fs::remove_dir_all(&self.path).await?,
132 Ok(_) => tokio::fs::remove_file(&self.path).await?,
133 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
134 Err(err) => return Err(err.into()),
135 }
136
137 tokio::fs::rename(tmp_dir, &self.path).await?;
138
139 Ok(())
140 }
141}
142
143struct Written {
145 path: PathBuf,
146 rows: u64,
147}
148
149async fn cleanup(tmp_dir: &Path) {
151 if let Err(err) = tokio::fs::remove_dir_all(tmp_dir).await
152 && err.kind() != std::io::ErrorKind::NotFound
153 {
154 warn!(path = %tmp_dir.display(), error = %err, "failed to remove tmp dir");
155 }
156}
157
158fn make_tmp(final_path: &Path) -> PathBuf {
160 let mut name = final_path
161 .file_name()
162 .map(std::ffi::OsStr::to_os_string)
163 .unwrap_or_default();
164 name.push(".tmp");
165
166 final_path.with_file_name(name)
167}