1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::fmt::Debug;
8
9use crate::store::pinner::Pinner;
10
11pub mod pinner;
12
13pub struct Store {
14 pinner: Box<dyn Pinner>,
15 fs: Operator,
16 pather: FolderPather,
17}
18
19impl Debug for Store {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("Store").field("fs", &self.fs).finish()
22 }
23}
24
25impl Store {
26 pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
27 Ok(Store { pinner, fs, pather })
28 }
29
30 pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
31 let res = self.pinner.load(name, &self.fs).await?;
32
33 Ok(res)
34 }
35
36 pub async fn load_migration(&self, name: &str) -> Result<String> {
37 let result = self.fs.read(&name).await?;
38 let bytes = result.to_bytes();
39 let contents = String::from_utf8(bytes.to_vec())?;
40
41 Ok(contents)
42 }
43
44 pub async fn list_migrations(&self) -> Result<Vec<String>> {
45 let mut migrations: Vec<String> = Vec::new();
46 let mut fs_lister = self
47 .fs
48 .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
49 .await?;
50 while let Some(entry) = fs_lister.try_next().await? {
51 let path = entry.path().to_string();
52 if path.ends_with("/") {
53 migrations.push(path)
54 }
55 }
56
57 migrations.sort();
59
60 Ok(migrations)
61 }
62}
63
64pub enum DesiredOperator {
65 Memory,
66 FileSystem,
67}
68
69pub async fn disk_to_operator(
72 source_folder: &str,
73 dest_prefix: Option<&str>,
74 desired_operator: DesiredOperator,
75) -> Result<Operator> {
76 let dest_op = match desired_operator {
77 DesiredOperator::FileSystem => {
78 let dest_service = opendal::services::Fs::default().root("./testout");
79 Operator::new(dest_service)?.finish()
80 }
81 DesiredOperator::Memory => {
82 let dest_service = Memory::default();
83 Operator::new(dest_service)?.finish()
84 }
85 };
86
87 let fs_service = opendal::services::Fs::default().root(source_folder);
89 let source_store = Operator::new(fs_service)
90 .context("disk_to_mem_operator failed to create operator")?
91 .finish();
92
93 let store_loc = dest_prefix.unwrap_or_default();
95 crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
96 .await
97 .context("call to populate memory fs from object store")?;
98
99 Ok(dest_op)
100}
101
102pub async fn populate_store_from_store(
103 source_store: &Operator,
104 target_store: &Operator,
105 source_prefix: &str,
106 dest_prefix: &str,
107) -> Result<()> {
108 let mut lister = source_store
109 .lister_with(source_prefix)
110 .recursive(true)
111 .await
112 .context("lister call")?;
113 let mut list_result: Vec<opendal::Entry> = Vec::new();
114
115 while let Some(entry) = lister.try_next().await? {
116 if entry.path().ends_with("/") {
117 continue;
118 }
119 list_result.push(entry);
120 }
121
122 for entry in list_result {
123 let dest_object_path = format!("{}{}", dest_prefix, entry.path());
125 let source_object_path = entry.path();
126
127 let bytes = source_store
129 .read(&source_object_path)
130 .await
131 .context(format!("read path {}", &source_object_path))?;
132
133 target_store
135 .write(&dest_object_path, bytes)
136 .await
137 .context("write")?;
138 }
139
140 Ok(())
141}
142
143pub async fn operator_from_includedir(
159 dir: &Dir<'_>,
160 dest_prefix: Option<&str>,
161) -> Result<Operator> {
162 let dest_service = Memory::default();
164 let operator = Operator::new(dest_service)?.finish();
165
166 let prefix = dest_prefix.unwrap_or_default();
167
168 let mut files_to_write = Vec::new();
170 collect_files_from_dir(dir, "", &mut files_to_write);
171
172 for (dest_path, contents) in &files_to_write {
174 let final_path = format!("{}{}", prefix, dest_path);
175 operator
176 .write(&final_path, contents.clone())
177 .await
178 .context(format!("Failed to write file {}", final_path))?;
179 }
180
181 Ok(operator)
182}
183
184fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
186 for entry in dir.entries() {
187 match entry {
188 DirEntry::Dir(subdir) => {
189 let new_path = if current_path.is_empty() {
190 subdir.path().to_string_lossy().to_string()
191 } else {
192 format!(
193 "{}/{}",
194 current_path,
195 subdir.path().file_name().unwrap().to_string_lossy()
196 )
197 };
198 collect_files_from_dir(subdir, &new_path, files);
199 }
200 DirEntry::File(file) => {
201 let file_path = if current_path.is_empty() {
202 file.path().to_string_lossy().to_string()
203 } else {
204 format!(
205 "{}/{}",
206 current_path,
207 file.path().file_name().unwrap().to_string_lossy()
208 )
209 };
210 files.push((file_path, file.contents().to_vec()));
211 }
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::store::pinner::latest::Latest;
220 use include_dir::{include_dir, Dir};
221
222 static TEST_DIR: Dir<'_> = include_dir!("./static");
224
225 #[tokio::test]
226 async fn test_operator_from_includedir_with_prefix() {
227 let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
228 assert!(
229 result.is_ok(),
230 "Should create operator with prefix successfully"
231 );
232 }
233
234 #[tokio::test]
235 async fn test_list_migrations_returns_two_migrations() {
236 let op = disk_to_operator(
238 "./static/tests/two_migrations",
239 None,
240 DesiredOperator::Memory,
241 )
242 .await
243 .expect("Failed to create operator from disk");
244
245 let pinner = Latest::new("").expect("Failed to create Latest pinner");
247 let pather = FolderPather {
248 spawn_folder: "".to_string(),
249 };
250
251 let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
253
254 let migrations: Vec<String> = store
256 .list_migrations()
257 .await
258 .expect("Failed to list migrations");
259
260 assert_eq!(
262 migrations.len(),
263 2,
264 "Expected 2 migrations, got {:?}",
265 migrations
266 );
267
268 let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
270 assert!(
271 migration_names
272 .iter()
273 .any(|m| m.contains("20240907212659-initial")),
274 "Expected to find 20240907212659-initial migration, got {:?}",
275 migration_names
276 );
277 assert!(
278 migration_names
279 .iter()
280 .any(|m| m.contains("20240908123456-second")),
281 "Expected to find 20240908123456-second migration, got {:?}",
282 migration_names
283 );
284 }
285}