amadeus_node/utils/
archiver.rs1use once_cell::sync::OnceCell;
2use std::borrow::Cow;
3use tokio::fs::{OpenOptions, create_dir_all, read_dir};
4use tokio::io::AsyncWriteExt;
5
6static ARCHIVER_DIR: OnceCell<String> = OnceCell::new();
7
8#[derive(Debug, thiserror::Error)]
9pub enum Error {
10 #[error(transparent)]
11 TokioIo(#[from] tokio::io::Error),
12 #[error("once cell {0}")]
13 OnceCell(&'static str),
14}
15
16pub async fn init_storage(base: &str) -> Result<(), Error> {
17 if let Some(_) = ARCHIVER_DIR.get() {
19 return Ok(());
20 }
21
22 let path = format!("{}/log", base);
24
25 let _ = ARCHIVER_DIR.set(path);
28
29 let chosen = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
31 create_dir_all(chosen).await?;
32
33 Ok(())
34}
35
36pub async fn store<'a>(
37 data: impl Into<Cow<'a, [u8]>>,
38 subdir: impl AsRef<str>,
39 name: impl AsRef<str>,
40) -> Result<(), Error> {
41 let bin: Cow<[u8]> = data.into();
42 let base = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
43
44 let path = if subdir.as_ref().is_empty() {
45 format!("{}/{}", base, name.as_ref())
46 } else {
47 create_dir_all(&format!("{}/{}", base, subdir.as_ref())).await?;
48 format!("{}/{}/{}", base, subdir.as_ref(), name.as_ref())
49 };
50
51 let mut file = OpenOptions::new().create(true).append(true).open(&path).await?;
52 file.write_all(&bin).await?;
53 file.flush().await?;
54
55 Ok(())
56}
57
58pub async fn get_archived_filenames() -> Result<Vec<(String, u64)>, Error> {
60 let base = ARCHIVER_DIR.get().ok_or(Error::OnceCell("archiver_dir_get"))?;
61 let mut filenames = Vec::new();
62 collect_filenames_recursive(base, "", &mut filenames).await?;
63 Ok(filenames)
64}
65
66fn collect_filenames_recursive<'a>(
68 base_path: &'a str,
69 current_subdir: &'a str,
70 filenames: &'a mut Vec<(String, u64)>,
71) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + 'a>> {
72 Box::pin(async move {
73 let dir_path =
74 if current_subdir.is_empty() { base_path.to_string() } else { format!("{}/{}", base_path, current_subdir) };
75
76 let mut entries = read_dir(&dir_path).await?;
77 while let Some(entry) = entries.next_entry().await? {
78 let file_type = entry.file_type().await?;
79 let file_name = entry.file_name().to_string_lossy().to_string();
80
81 if file_type.is_dir() {
82 let new_subdir = if current_subdir.is_empty() {
84 file_name.clone()
85 } else {
86 format!("{}/{}", current_subdir, file_name)
87 };
88 collect_filenames_recursive(base_path, &new_subdir, filenames).await?;
89 } else if file_type.is_file() {
90 let metadata = entry.metadata().await?;
92 let file_size = metadata.len();
93
94 let full_path =
96 if current_subdir.is_empty() { file_name } else { format!("{}/{}", current_subdir, file_name) };
97 filenames.push((full_path, file_size));
98 }
99 }
100
101 Ok(())
102 })
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use tokio::fs::read_to_string;
109
110 fn unique_base() -> String {
111 let ts = crate::utils::misc::get_unix_nanos_now();
112 let pid = std::process::id();
113 format!("{}/rs_node_archiver_test_{}_{}", std::env::temp_dir().display(), pid, ts)
114 }
115
116 #[tokio::test]
117 async fn archiver_end_to_end_single_test() {
118 use tokio::fs::read;
119
120 if ARCHIVER_DIR.get().is_none() {
122 let err = store(b"x", "", "a.bin").await.err();
124 if let Some(err) = err {
125 matches!(err, Error::OnceCell(_));
126 }
127
128 let base = unique_base();
130 init_storage(&base).await.expect("init ok");
131 init_storage(&base).await.expect("init idempotent");
132
133 store(b"hello", "", "one.txt").await.expect("store ok");
135 let content = read(format!("{}/log/one.txt", base)).await.expect("read file");
136 assert_eq!(content, b"hello");
137
138 store(b" world", "", "one.txt").await.expect("append ok");
140 let s = read_to_string(format!("{}/log/one.txt", base)).await.expect("read string");
141 assert_eq!(s, "hello world");
142
143 store(b"sub", "subd", "two.bin").await.expect("subdir store");
145 let content2 = read(format!("{}/log/subd/two.bin", base)).await.expect("read file2");
146 assert_eq!(content2, b"sub");
147 } else {
148 eprintln!("Skipping archiver test - already initialized by another test");
151 }
152 }
153}