amadeus_node/utils/
archiver.rs

1use once_cell::sync::OnceCell;
2use std::borrow::Cow;
3use tokio::fs::{OpenOptions, create_dir_all, read_dir};
4use tokio::io::AsyncWriteExt;
5
6static ARCHIVER_DIR: OnceCell<String> = OnceCell::new();
7
8#[derive(Debug, thiserror::Error)]
9pub enum Error {
10    #[error(transparent)]
11    TokioIo(#[from] tokio::io::Error),
12    #[error("once cell {0}")]
13    OnceCell(&'static str),
14}
15
16pub async fn init_storage(base: &str) -> Result<(), Error> {
17    // Fast path if already initialized
18    if let Some(_) = ARCHIVER_DIR.get() {
19        return Ok(());
20    }
21
22    // Compute desired path
23    let path = format!("{}/log", base);
24
25    // Try to set the OnceCell but do not treat "already set" as an error.
26    // This avoids races when multiple tests/contexts initialize concurrently.
27    let _ = ARCHIVER_DIR.set(path);
28
29    // Ensure the chosen path exists
30    let chosen = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
31    create_dir_all(chosen).await?;
32
33    Ok(())
34}
35
36pub async fn store<'a>(
37    data: impl Into<Cow<'a, [u8]>>,
38    subdir: impl AsRef<str>,
39    name: impl AsRef<str>,
40) -> Result<(), Error> {
41    let bin: Cow<[u8]> = data.into();
42    let base = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
43
44    let path = if subdir.as_ref().is_empty() {
45        format!("{}/{}", base, name.as_ref())
46    } else {
47        create_dir_all(&format!("{}/{}", base, subdir.as_ref())).await?;
48        format!("{}/{}/{}", base, subdir.as_ref(), name.as_ref())
49    };
50
51    let mut file = OpenOptions::new().create(true).append(true).open(&path).await?;
52    file.write_all(&bin).await?;
53    file.flush().await?;
54
55    Ok(())
56}
57
58/// Recursively get all archived filenames with their sizes from the archiver directory
59pub async fn get_archived_filenames() -> Result<Vec<(String, u64)>, Error> {
60    let base = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
61    let mut filenames = Vec::new();
62    collect_filenames_recursive(base, "", &mut filenames).await?;
63    Ok(filenames)
64}
65
66/// Recursively collect filenames with sizes from a directory
67fn collect_filenames_recursive<'a>(
68    base_path: &'a str,
69    current_subdir: &'a str,
70    filenames: &'a mut Vec<(String, u64)>,
71) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + 'a>> {
72    Box::pin(async move {
73        let dir_path =
74            if current_subdir.is_empty() { base_path.to_string() } else { format!("{}/{}", base_path, current_subdir) };
75
76        let mut entries = read_dir(&dir_path).await?;
77        while let Some(entry) = entries.next_entry().await? {
78            let file_type = entry.file_type().await?;
79            let file_name = entry.file_name().to_string_lossy().to_string();
80
81            if file_type.is_dir() {
82                // Recursively process subdirectories
83                let new_subdir = if current_subdir.is_empty() {
84                    file_name.clone()
85                } else {
86                    format!("{}/{}", current_subdir, file_name)
87                };
88                collect_filenames_recursive(base_path, &new_subdir, filenames).await?;
89            } else if file_type.is_file() {
90                // Get file size
91                let metadata = entry.metadata().await?;
92                let file_size = metadata.len();
93
94                // Add file to the list with full path relative to base and its size
95                let full_path =
96                    if current_subdir.is_empty() { file_name } else { format!("{}/{}", current_subdir, file_name) };
97                filenames.push((full_path, file_size));
98            }
99        }
100
101        Ok(())
102    })
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use tokio::fs::read_to_string;
109
110    fn unique_base() -> String {
111        let ts = crate::utils::misc::get_unix_nanos_now();
112        let pid = std::process::id();
113        format!("{}/rs_node_archiver_test_{}_{}", std::env::temp_dir().display(), pid, ts)
114    }
115
116    #[tokio::test]
117    async fn archiver_end_to_end_single_test() {
118        use tokio::fs::read;
119
120        // Check if already initialized by another test
121        if ARCHIVER_DIR.get().is_none() {
122            // store before init must error (only if not already initialized)
123            let err = store(b"x", "", "a.bin").await.err();
124            if let Some(err) = err {
125                matches!(err, Error::OnceCell(_));
126            }
127
128            // init creates base/log and is idempotent
129            let base = unique_base();
130            init_storage(&base).await.expect("init ok");
131            init_storage(&base).await.expect("init idempotent");
132
133            // store without subdir
134            store(b"hello", "", "one.txt").await.expect("store ok");
135            let content = read(format!("{}/log/one.txt", base)).await.expect("read file");
136            assert_eq!(content, b"hello");
137
138            // append
139            store(b" world", "", "one.txt").await.expect("append ok");
140            let s = read_to_string(format!("{}/log/one.txt", base)).await.expect("read string");
141            assert_eq!(s, "hello world");
142
143            // subdir write
144            store(b"sub", "subd", "two.bin").await.expect("subdir store");
145            let content2 = read(format!("{}/log/subd/two.bin", base)).await.expect("read file2");
146            assert_eq!(content2, b"sub");
147        } else {
148            // Skip test if already initialized by another test
149            // This happens when tests run in parallel
150            eprintln!("Skipping archiver test - already initialized by another test");
151        }
152    }
153}