1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17 pub has_up_sql: bool,
18 pub has_lock_toml: bool,
19}
20
21pub async fn get_migration_fs_status(
24 op: &Operator,
25 pather: &FolderPather,
26 migration_name: &str,
27) -> Result<MigrationFileStatus> {
28 let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31 Ok(statuses
32 .get(migration_name)
33 .cloned()
34 .unwrap_or(MigrationFileStatus {
35 has_up_sql: false,
36 has_lock_toml: false,
37 }))
38}
39
40pub async fn list_migration_fs_status(
47 op: &Operator,
48 pather: &FolderPather,
49 migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51 let migrations_folder = pather.migrations_folder();
52 let normalized_folder = migrations_folder
56 .trim_start_matches("./")
57 .trim_start_matches('/');
58 let migrations_prefix = if let Some(name) = migration_name {
59 format!("{}/{}/", normalized_folder, name)
61 } else {
62 format!("{}/", normalized_folder)
64 };
65
66 let mut lister = op
68 .lister_with(&migrations_prefix)
69 .recursive(true)
70 .await
71 .context("listing migrations")?;
72
73 let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
74
75 while let Some(entry) = lister.try_next().await? {
76 let path = entry.path().to_string();
77 let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
78
79 let (name, filename) = match relative_path.split_once('/') {
83 Some((name, filename)) => (name, filename),
84 None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
85 None => continue,
86 };
87
88 let status = result
89 .entry(name.to_string())
90 .or_insert(MigrationFileStatus {
91 has_up_sql: false,
92 has_lock_toml: false,
93 });
94
95 if filename == "up.sql" {
96 status.has_up_sql = true;
97 } else if filename == "lock.toml" {
98 status.has_lock_toml = true;
99 }
100 }
101
102 Ok(result)
103}
104
105pub struct Store {
106 pinner: Box<dyn Pinner>,
107 fs: Operator,
108 pather: FolderPather,
109}
110
111impl Debug for Store {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("Store").field("fs", &self.fs).finish()
114 }
115}
116
117impl Store {
118 pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
119 Ok(Store { pinner, fs, pather })
120 }
121
122 pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
123 let res = self.pinner.load(name, &self.fs).await?;
124
125 Ok(res)
126 }
127
128 pub async fn load_component_bytes(&self, name: &str) -> Result<Option<Vec<u8>>> {
129 self.pinner.load_bytes(name, &self.fs).await
130 }
131
132 pub async fn read_file_bytes(&self, path: &str) -> Result<Vec<u8>> {
133 self.load_component_bytes(path)
134 .await?
135 .ok_or_else(|| anyhow::anyhow!("file not found in components: {}", path))
136 }
137
138 pub async fn load_migration(&self, name: &str) -> Result<String> {
139 let result = self.fs.read(&name).await?;
140 let bytes = result.to_bytes();
141 let contents = String::from_utf8(bytes.to_vec())?;
142
143 Ok(contents)
144 }
145
146 pub async fn list_migrations(&self) -> Result<Vec<String>> {
147 let mut migrations: Vec<String> = Vec::new();
148 let mut fs_lister = self
149 .fs
150 .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
151 .await?;
152 while let Some(entry) = fs_lister.try_next().await? {
153 let path = entry.path().to_string();
154 if path.ends_with("/") {
155 migrations.push(path)
156 }
157 }
158
159 migrations.sort();
161
162 Ok(migrations)
163 }
164}
165
166pub enum DesiredOperator {
167 Memory,
168 FileSystem,
169}
170
171pub async fn disk_to_operator(
174 source_folder: &str,
175 dest_prefix: Option<&str>,
176 desired_operator: DesiredOperator,
177) -> Result<Operator> {
178 let dest_op = match desired_operator {
179 DesiredOperator::FileSystem => {
180 let dest_service = opendal::services::Fs::default().root("./testout");
181 Operator::new(dest_service)?.finish()
182 }
183 DesiredOperator::Memory => {
184 let dest_service = Memory::default();
185 Operator::new(dest_service)?.finish()
186 }
187 };
188
189 let fs_service = opendal::services::Fs::default().root(source_folder);
191 let source_store = Operator::new(fs_service)
192 .context("disk_to_mem_operator failed to create operator")?
193 .finish();
194
195 let store_loc = dest_prefix.unwrap_or_default();
197 crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
198 .await
199 .context("call to populate memory fs from object store")?;
200
201 Ok(dest_op)
202}
203
204pub async fn populate_store_from_store(
205 source_store: &Operator,
206 target_store: &Operator,
207 source_prefix: &str,
208 dest_prefix: &str,
209) -> Result<()> {
210 let mut lister = source_store
211 .lister_with(source_prefix)
212 .recursive(true)
213 .await
214 .context("lister call")?;
215 let mut list_result: Vec<opendal::Entry> = Vec::new();
216
217 while let Some(entry) = lister.try_next().await? {
218 if entry.path().ends_with("/") {
219 continue;
220 }
221 list_result.push(entry);
222 }
223
224 for entry in list_result {
225 let dest_object_path = format!("{}{}", dest_prefix, entry.path());
227 let source_object_path = entry.path();
228
229 let bytes = source_store
231 .read(&source_object_path)
232 .await
233 .context(format!("read path {}", &source_object_path))?;
234
235 target_store
237 .write(&dest_object_path, bytes)
238 .await
239 .context("write")?;
240 }
241
242 Ok(())
243}
244
245pub async fn operator_from_includedir(
261 dir: &Dir<'_>,
262 dest_prefix: Option<&str>,
263) -> Result<Operator> {
264 let dest_service = Memory::default();
266 let operator = Operator::new(dest_service)?.finish();
267
268 let prefix = dest_prefix.unwrap_or_default();
269
270 let mut files_to_write = Vec::new();
272 collect_files_from_dir(dir, "", &mut files_to_write);
273
274 for (dest_path, contents) in &files_to_write {
276 let final_path = format!("{}{}", prefix, dest_path);
277 operator
278 .write(&final_path, contents.clone())
279 .await
280 .context(format!("Failed to write file {}", final_path))?;
281 }
282
283 Ok(operator)
284}
285
286fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
288 for entry in dir.entries() {
289 match entry {
290 DirEntry::Dir(subdir) => {
291 let new_path = if current_path.is_empty() {
292 subdir.path().to_string_lossy().to_string()
293 } else {
294 format!(
295 "{}/{}",
296 current_path,
297 subdir.path().file_name().unwrap().to_string_lossy()
298 )
299 };
300 collect_files_from_dir(subdir, &new_path, files);
301 }
302 DirEntry::File(file) => {
303 let file_path = if current_path.is_empty() {
304 file.path().to_string_lossy().to_string()
305 } else {
306 format!(
307 "{}/{}",
308 current_path,
309 file.path().file_name().unwrap().to_string_lossy()
310 )
311 };
312 files.push((file_path, file.contents().to_vec()));
313 }
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::store::pinner::latest::Latest;
322 use include_dir::{include_dir, Dir};
323
324 static TEST_DIR: Dir<'_> = include_dir!("./static");
326
327 #[tokio::test]
328 async fn test_operator_from_includedir_with_prefix() {
329 let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
330 assert!(
331 result.is_ok(),
332 "Should create operator with prefix successfully"
333 );
334 }
335
336 #[tokio::test]
337 async fn test_list_migrations_returns_two_migrations() {
338 let op = disk_to_operator(
340 "./static/tests/two_migrations",
341 None,
342 DesiredOperator::Memory,
343 )
344 .await
345 .expect("Failed to create operator from disk");
346
347 let pinner = Latest::new("").expect("Failed to create Latest pinner");
349 let pather = FolderPather {
350 spawn_folder: "".to_string(),
351 };
352
353 let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
355
356 let migrations: Vec<String> = store
358 .list_migrations()
359 .await
360 .expect("Failed to list migrations");
361
362 assert_eq!(
364 migrations.len(),
365 2,
366 "Expected 2 migrations, got {:?}",
367 migrations
368 );
369
370 let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
372 assert!(
373 migration_names
374 .iter()
375 .any(|m| m.contains("20240907212659-initial")),
376 "Expected to find 20240907212659-initial migration, got {:?}",
377 migration_names
378 );
379 assert!(
380 migration_names
381 .iter()
382 .any(|m| m.contains("20240908123456-second")),
383 "Expected to find 20240908123456-second migration, got {:?}",
384 migration_names
385 );
386 }
387
388 #[tokio::test]
389 async fn test_get_migration_fs_status_with_lock() {
390 let mem_service = Memory::default();
392 let op = Operator::new(mem_service).unwrap().finish();
393
394 op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
396 .await
397 .expect("Failed to write up.sql");
398 op.write(
399 "/migrations/20240101000000-test/lock.toml",
400 "pin = \"abc123\"",
401 )
402 .await
403 .expect("Failed to write lock.toml");
404
405 let pather = FolderPather {
406 spawn_folder: "".to_string(),
407 };
408
409 let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
411 .await
412 .expect("Failed to get migration status");
413
414 assert!(status.has_up_sql, "Migration should have up.sql");
415 assert!(status.has_lock_toml, "Migration should have lock.toml");
416 }
417
418 #[tokio::test]
419 async fn test_get_migration_fs_status_without_lock() {
420 let mem_service = Memory::default();
422 let op = Operator::new(mem_service).unwrap().finish();
423
424 op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
426 .await
427 .expect("Failed to write up.sql");
428
429 let pather = FolderPather {
430 spawn_folder: "".to_string(),
431 };
432
433 let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
434 .await
435 .expect("Failed to get migration status");
436
437 assert!(status.has_up_sql, "Migration should have up.sql");
438 assert!(!status.has_lock_toml, "Migration should not have lock.toml");
439 }
440
441 #[tokio::test]
442 async fn test_list_migration_fs_status() {
443 for spawn_folder in [
446 "",
447 "./database/spawn",
448 "/database/spawn",
449 "./spawn",
450 "/spawn",
451 ] {
452 let mem_service = Memory::default();
453 let op = Operator::new(mem_service).unwrap().finish();
454
455 let prefix = spawn_folder
456 .trim_start_matches("./")
457 .trim_start_matches('/');
458 let migrations = if prefix.is_empty() {
459 "migrations".to_string()
460 } else {
461 format!("{}/migrations", prefix)
462 };
463
464 op.write(
465 &format!("{}/20240101-first/up.sql", migrations),
466 "SELECT 1;",
467 )
468 .await
469 .unwrap();
470 op.write(
471 &format!("{}/20240101-first/lock.toml", migrations),
472 "pin = \"abc\"",
473 )
474 .await
475 .unwrap();
476 op.write(
477 &format!("{}/20240102-second/up.sql", migrations),
478 "SELECT 2;",
479 )
480 .await
481 .unwrap();
482
483 let pather = FolderPather {
484 spawn_folder: spawn_folder.to_string(),
485 };
486
487 for filter in [None, Some("20240101-first")] {
488 let statuses = list_migration_fs_status(&op, &pather, filter)
489 .await
490 .expect("Failed to list migration statuses");
491
492 let first = statuses
493 .get("20240101-first")
494 .expect("Should have first migration");
495 assert!(first.has_up_sql);
496 assert!(first.has_lock_toml);
497
498 if filter.is_none() {
499 assert_eq!(statuses.len(), 2);
500 let second = statuses
501 .get("20240102-second")
502 .expect("Should have second migration");
503 assert!(second.has_up_sql);
504 assert!(!second.has_lock_toml);
505 } else {
506 assert_eq!(statuses.len(), 1);
507 }
508 }
509 }
510 }
511}