Skip to main content

transferred_files/
destination.rs

1use std::path::{Path, PathBuf};
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Instant;
5
6use async_trait::async_trait;
7use futures::StreamExt;
8use tokio::fs::File;
9use tracing::warn;
10use transferred_core::{BatchStream, Destination, RunReport, TransferredError};
11
12use crate::formats::FormatWrite;
13
14/// Local files destination. Writes `part-NNNNN.{extension}` files to a directory,
15/// or one `{dir}.{extension}` when `single_file`.
16/// Write is atomic via tmp dir + rename.
17/// Written paths land in `RunReport.written_objects`.
18#[derive(Clone)]
19pub struct FilesDestination {
20    path: PathBuf,
21    format: Arc<dyn FormatWrite>,
22    single_file: bool,
23}
24
25#[async_trait]
26impl Destination for FilesDestination {
27    async fn write_partitions(
28        self: Box<Self>,
29        partitions: Vec<BatchStream>,
30    ) -> Result<RunReport, TransferredError> {
31        let start = Instant::now();
32        let tmp_dir = make_tmp(&self.path);
33        tokio::fs::create_dir_all(&tmp_dir).await?;
34
35        let writtens = match self.write_files(&tmp_dir, partitions).await {
36            Ok(written) => written,
37            Err(err) => {
38                cleanup(&tmp_dir).await;
39                return Err(err);
40            }
41        };
42
43        if let Err(err) = self.atomic_replace(&tmp_dir).await {
44            cleanup(&tmp_dir).await;
45            return Err(err);
46        }
47
48        let mut bytes_written = 0;
49        for written in &writtens {
50            bytes_written += tokio::fs::metadata(&written.path).await?.len();
51        }
52
53        Ok(RunReport {
54            rows: writtens.iter().map(|w| w.rows).sum(),
55            bytes_written,
56            written_objects: writtens
57                .iter()
58                .map(|w| w.path.display().to_string())
59                .collect(),
60            duration: start.elapsed(),
61            coercions: vec![],
62        })
63    }
64}
65
66impl FilesDestination {
67    /// Build a destination. No I/O performed.
68    #[must_use]
69    pub fn new(path: PathBuf, format: Arc<dyn FormatWrite>, single_file: bool) -> Self {
70        Self {
71            path,
72            format,
73            single_file,
74        }
75    }
76
77    /// Pick a filename: `{dir}.{ext}` if `single_file`, else `part-NNNNN.{ext}`.
78    fn output_filename(&self, part: usize) -> String {
79        let ext = self.format.file_extension();
80        let base_name = self
81            .path
82            .file_name()
83            .map_or("data".to_string(), |n| n.to_string_lossy().to_string());
84
85        if self.single_file {
86            format!("{base_name}.{ext}")
87        } else {
88            format!("part-{part:05}.{ext}")
89        }
90    }
91
92    /// Create and write files into `tmp_dir`; returns the final paths and row counts.
93    async fn write_files(
94        &self,
95        tmp_dir: &Path,
96        partitions: Vec<BatchStream>,
97    ) -> Result<Vec<Written>, TransferredError> {
98        let streams: Vec<BatchStream> = if self.single_file {
99            vec![Box::pin(futures::stream::iter(partitions).flatten())]
100        } else {
101            partitions
102        };
103
104        let mut written = Vec::new();
105        for stream in streams {
106            let mut stream = stream.peekable();
107            if Pin::new(&mut stream).peek().await.is_none() {
108                continue; // skip empty partitions — no stray part file
109            }
110
111            let name = self.output_filename(written.len() + 1);
112            let file = File::create(tmp_dir.join(&name)).await?;
113
114            let rows = self.format.write(Box::new(file), Box::pin(stream)).await?;
115            written.push(Written {
116                path: self.path.join(&name),
117                rows,
118            });
119        }
120
121        if written.is_empty() {
122            return Err(TransferredError::EmptySource);
123        }
124
125        Ok(written)
126    }
127
128    /// Atomically overwrite `path` dir with `tmp_dir`, removing any existing output first.
129    async fn atomic_replace(&self, tmp_dir: &Path) -> Result<(), TransferredError> {
130        match tokio::fs::metadata(&self.path).await {
131            Ok(meta) if meta.is_dir() => tokio::fs::remove_dir_all(&self.path).await?,
132            Ok(_) => tokio::fs::remove_file(&self.path).await?,
133            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
134            Err(err) => return Err(err.into()),
135        }
136
137        tokio::fs::rename(tmp_dir, &self.path).await?;
138
139        Ok(())
140    }
141}
142
143/// A file the destination produced.
144struct Written {
145    path: PathBuf,
146    rows: u64,
147}
148
149/// Remove a leftover tmp directory, logging non-`NotFound` failures.
150async fn cleanup(tmp_dir: &Path) {
151    if let Err(err) = tokio::fs::remove_dir_all(tmp_dir).await
152        && err.kind() != std::io::ErrorKind::NotFound
153    {
154        warn!(path = %tmp_dir.display(), error = %err, "failed to remove tmp dir");
155    }
156}
157
158/// Create `{name}.tmp` near the `final_path`, for staging files.
159fn make_tmp(final_path: &Path) -> PathBuf {
160    let mut name = final_path
161        .file_name()
162        .map(std::ffi::OsStr::to_os_string)
163        .unwrap_or_default();
164    name.push(".tmp");
165
166    final_path.with_file_name(name)
167}