common/
rm.rs

1use anyhow::{anyhow, Context};
2use async_recursion::async_recursion;
3use std::os::unix::fs::PermissionsExt;
4use tracing::instrument;
5
6use crate::progress;
7
8#[derive(Debug, thiserror::Error)]
9#[error("{source}")]
10pub struct Error {
11    #[source]
12    pub source: anyhow::Error,
13    pub summary: Summary,
14}
15
16impl Error {
17    fn new(source: anyhow::Error, summary: Summary) -> Self {
18        Error { source, summary }
19    }
20}
21
22#[derive(Debug, Clone)]
23pub struct Settings {
24    pub fail_early: bool,
25}
26
27#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
28pub struct Summary {
29    pub files_removed: usize,
30    pub symlinks_removed: usize,
31    pub directories_removed: usize,
32}
33
34impl std::ops::Add for Summary {
35    type Output = Self;
36    fn add(self, other: Self) -> Self {
37        Self {
38            files_removed: self.files_removed + other.files_removed,
39            symlinks_removed: self.symlinks_removed + other.symlinks_removed,
40            directories_removed: self.directories_removed + other.directories_removed,
41        }
42    }
43}
44
45impl std::fmt::Display for Summary {
46    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
47        write!(
48            f,
49            "files removed: {}\n\
50            symlinks removed: {}\n\
51            directories removed: {}\n",
52            self.files_removed, self.symlinks_removed, self.directories_removed
53        )
54    }
55}
56
57#[instrument(skip(prog_track))]
58#[async_recursion]
59pub async fn rm(
60    prog_track: &'static progress::Progress,
61    path: &std::path::Path,
62    settings: &Settings,
63) -> Result<Summary, Error> {
64    let _ops_guard = prog_track.ops.guard();
65    tracing::debug!("read path metadata");
66    let src_metadata = tokio::fs::symlink_metadata(path)
67        .await
68        .with_context(|| format!("failed reading metadata from {:?}", &path))
69        .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
70    if !src_metadata.is_dir() {
71        tracing::debug!("not a directory, just remove");
72        tokio::fs::remove_file(path)
73            .await
74            .with_context(|| format!("failed removing {:?}", &path))
75            .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
76        if src_metadata.file_type().is_symlink() {
77            prog_track.symlinks_removed.inc();
78            return Ok(Summary {
79                symlinks_removed: 1,
80                ..Default::default()
81            });
82        }
83        prog_track.files_removed.inc();
84        return Ok(Summary {
85            files_removed: 1,
86            ..Default::default()
87        });
88    }
89    tracing::debug!("remove contents of the directory first");
90    if src_metadata.permissions().readonly() {
91        tracing::debug!("directory is read-only - change the permissions");
92        tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o777))
93            .await
94            .with_context(|| {
95                format!(
96                    "failed to make '{:?}' directory readable and writeable",
97                    &path
98                )
99            })
100            .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
101    }
102    let mut entries = tokio::fs::read_dir(path)
103        .await
104        .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
105    let mut join_set = tokio::task::JoinSet::new();
106    let mut success = true;
107    while let Some(entry) = entries
108        .next_entry()
109        .await
110        .with_context(|| format!("failed traversing directory {:?}", &path))
111        .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?
112    {
113        // it's better to await the token here so that we throttle the syscalls generated by the
114        // DirEntry call. the ops-throttle will never cause a deadlock (unlike max-open-files limit)
115        // so it's safe to do here.
116        throttle::get_ops_token().await;
117        let entry_path = entry.path();
118        let settings = settings.clone();
119        let do_rm = || async move { rm(prog_track, &entry_path, &settings).await };
120        join_set.spawn(do_rm());
121    }
122    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
123    // one thing we CAN do however is to drop it as soon as we're done with it
124    drop(entries);
125    let mut rm_summary = Summary {
126        directories_removed: 0,
127        ..Default::default()
128    };
129    while let Some(res) = join_set.join_next().await {
130        match res.map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))? {
131            Ok(summary) => rm_summary = rm_summary + summary,
132            Err(error) => {
133                tracing::error!("remove: {:?} failed with: {:?}", path, &error);
134                rm_summary = rm_summary + error.summary;
135                if settings.fail_early {
136                    return Err(Error::new(error.source, rm_summary));
137                }
138                success = false;
139            }
140        }
141    }
142    if !success {
143        return Err(Error::new(anyhow!("rm: {:?} failed!", &path), rm_summary));
144    }
145    tracing::debug!("finally remove the empty directory");
146    tokio::fs::remove_dir(path)
147        .await
148        .with_context(|| format!("failed removing directory {:?}", &path))
149        .map_err(|err| Error::new(anyhow::Error::msg(err), rm_summary))?;
150    prog_track.directories_removed.inc();
151    rm_summary.directories_removed += 1;
152    Ok(rm_summary)
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::testutils;
159    use tracing_test::traced_test;
160
161    lazy_static! {
162        static ref PROGRESS: progress::Progress = progress::Progress::new();
163    }
164
165    #[tokio::test]
166    #[traced_test]
167    async fn no_write_permission() -> Result<(), anyhow::Error> {
168        let tmp_dir = testutils::setup_test_dir().await?;
169        let test_path = tmp_dir.as_path();
170        let filepaths = vec![
171            test_path.join("foo").join("0.txt"),
172            test_path.join("foo").join("bar").join("2.txt"),
173            test_path.join("foo").join("baz").join("4.txt"),
174            test_path.join("foo").join("baz"),
175        ];
176        for fpath in &filepaths {
177            // change file permissions to not readable and not writable
178            tokio::fs::set_permissions(&fpath, std::fs::Permissions::from_mode(0o555)).await?;
179        }
180        let summary = rm(
181            &PROGRESS,
182            &test_path.join("foo"),
183            &Settings { fail_early: false },
184        )
185        .await?;
186        assert!(!test_path.join("foo").exists());
187        assert_eq!(summary.files_removed, 5);
188        assert_eq!(summary.symlinks_removed, 2);
189        assert_eq!(summary.directories_removed, 3);
190        Ok(())
191    }
192}