1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17 pub has_up_sql: bool,
18 pub has_lock_toml: bool,
19}
20
21pub async fn get_migration_fs_status(
24 op: &Operator,
25 pather: &FolderPather,
26 migration_name: &str,
27) -> Result<MigrationFileStatus> {
28 let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31 Ok(statuses
32 .get(migration_name)
33 .cloned()
34 .unwrap_or(MigrationFileStatus {
35 has_up_sql: false,
36 has_lock_toml: false,
37 }))
38}
39
40pub async fn list_migration_fs_status(
47 op: &Operator,
48 pather: &FolderPather,
49 migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51 let migrations_folder = pather.migrations_folder();
52 let migrations_prefix = if let Some(name) = migration_name {
53 format!("{}/{}/", migrations_folder.trim_start_matches('/'), name)
55 } else {
56 format!("{}/", migrations_folder.trim_start_matches('/'))
58 };
59
60 let mut lister = op
62 .lister_with(&migrations_prefix)
63 .recursive(true)
64 .await
65 .context("listing migrations")?;
66
67 let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
68
69 while let Some(entry) = lister.try_next().await? {
70 let path = entry.path().to_string();
71 let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
72
73 let (name, filename) = match relative_path.split_once('/') {
77 Some((name, filename)) => (name, filename),
78 None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
79 None => continue,
80 };
81
82 let status = result
83 .entry(name.to_string())
84 .or_insert(MigrationFileStatus {
85 has_up_sql: false,
86 has_lock_toml: false,
87 });
88
89 if filename == "up.sql" {
90 status.has_up_sql = true;
91 } else if filename == "lock.toml" {
92 status.has_lock_toml = true;
93 }
94 }
95
96 Ok(result)
97}
98
99pub struct Store {
100 pinner: Box<dyn Pinner>,
101 fs: Operator,
102 pather: FolderPather,
103}
104
105impl Debug for Store {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("Store").field("fs", &self.fs).finish()
108 }
109}
110
111impl Store {
112 pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
113 Ok(Store { pinner, fs, pather })
114 }
115
116 pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
117 let res = self.pinner.load(name, &self.fs).await?;
118
119 Ok(res)
120 }
121
122 pub async fn load_migration(&self, name: &str) -> Result<String> {
123 let result = self.fs.read(&name).await?;
124 let bytes = result.to_bytes();
125 let contents = String::from_utf8(bytes.to_vec())?;
126
127 Ok(contents)
128 }
129
130 pub async fn list_migrations(&self) -> Result<Vec<String>> {
131 let mut migrations: Vec<String> = Vec::new();
132 let mut fs_lister = self
133 .fs
134 .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
135 .await?;
136 while let Some(entry) = fs_lister.try_next().await? {
137 let path = entry.path().to_string();
138 if path.ends_with("/") {
139 migrations.push(path)
140 }
141 }
142
143 migrations.sort();
145
146 Ok(migrations)
147 }
148}
149
150pub enum DesiredOperator {
151 Memory,
152 FileSystem,
153}
154
155pub async fn disk_to_operator(
158 source_folder: &str,
159 dest_prefix: Option<&str>,
160 desired_operator: DesiredOperator,
161) -> Result<Operator> {
162 let dest_op = match desired_operator {
163 DesiredOperator::FileSystem => {
164 let dest_service = opendal::services::Fs::default().root("./testout");
165 Operator::new(dest_service)?.finish()
166 }
167 DesiredOperator::Memory => {
168 let dest_service = Memory::default();
169 Operator::new(dest_service)?.finish()
170 }
171 };
172
173 let fs_service = opendal::services::Fs::default().root(source_folder);
175 let source_store = Operator::new(fs_service)
176 .context("disk_to_mem_operator failed to create operator")?
177 .finish();
178
179 let store_loc = dest_prefix.unwrap_or_default();
181 crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
182 .await
183 .context("call to populate memory fs from object store")?;
184
185 Ok(dest_op)
186}
187
188pub async fn populate_store_from_store(
189 source_store: &Operator,
190 target_store: &Operator,
191 source_prefix: &str,
192 dest_prefix: &str,
193) -> Result<()> {
194 let mut lister = source_store
195 .lister_with(source_prefix)
196 .recursive(true)
197 .await
198 .context("lister call")?;
199 let mut list_result: Vec<opendal::Entry> = Vec::new();
200
201 while let Some(entry) = lister.try_next().await? {
202 if entry.path().ends_with("/") {
203 continue;
204 }
205 list_result.push(entry);
206 }
207
208 for entry in list_result {
209 let dest_object_path = format!("{}{}", dest_prefix, entry.path());
211 let source_object_path = entry.path();
212
213 let bytes = source_store
215 .read(&source_object_path)
216 .await
217 .context(format!("read path {}", &source_object_path))?;
218
219 target_store
221 .write(&dest_object_path, bytes)
222 .await
223 .context("write")?;
224 }
225
226 Ok(())
227}
228
229pub async fn operator_from_includedir(
245 dir: &Dir<'_>,
246 dest_prefix: Option<&str>,
247) -> Result<Operator> {
248 let dest_service = Memory::default();
250 let operator = Operator::new(dest_service)?.finish();
251
252 let prefix = dest_prefix.unwrap_or_default();
253
254 let mut files_to_write = Vec::new();
256 collect_files_from_dir(dir, "", &mut files_to_write);
257
258 for (dest_path, contents) in &files_to_write {
260 let final_path = format!("{}{}", prefix, dest_path);
261 operator
262 .write(&final_path, contents.clone())
263 .await
264 .context(format!("Failed to write file {}", final_path))?;
265 }
266
267 Ok(operator)
268}
269
270fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
272 for entry in dir.entries() {
273 match entry {
274 DirEntry::Dir(subdir) => {
275 let new_path = if current_path.is_empty() {
276 subdir.path().to_string_lossy().to_string()
277 } else {
278 format!(
279 "{}/{}",
280 current_path,
281 subdir.path().file_name().unwrap().to_string_lossy()
282 )
283 };
284 collect_files_from_dir(subdir, &new_path, files);
285 }
286 DirEntry::File(file) => {
287 let file_path = if current_path.is_empty() {
288 file.path().to_string_lossy().to_string()
289 } else {
290 format!(
291 "{}/{}",
292 current_path,
293 file.path().file_name().unwrap().to_string_lossy()
294 )
295 };
296 files.push((file_path, file.contents().to_vec()));
297 }
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::store::pinner::latest::Latest;
306 use include_dir::{include_dir, Dir};
307
308 static TEST_DIR: Dir<'_> = include_dir!("./static");
310
311 #[tokio::test]
312 async fn test_operator_from_includedir_with_prefix() {
313 let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
314 assert!(
315 result.is_ok(),
316 "Should create operator with prefix successfully"
317 );
318 }
319
320 #[tokio::test]
321 async fn test_list_migrations_returns_two_migrations() {
322 let op = disk_to_operator(
324 "./static/tests/two_migrations",
325 None,
326 DesiredOperator::Memory,
327 )
328 .await
329 .expect("Failed to create operator from disk");
330
331 let pinner = Latest::new("").expect("Failed to create Latest pinner");
333 let pather = FolderPather {
334 spawn_folder: "".to_string(),
335 };
336
337 let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
339
340 let migrations: Vec<String> = store
342 .list_migrations()
343 .await
344 .expect("Failed to list migrations");
345
346 assert_eq!(
348 migrations.len(),
349 2,
350 "Expected 2 migrations, got {:?}",
351 migrations
352 );
353
354 let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
356 assert!(
357 migration_names
358 .iter()
359 .any(|m| m.contains("20240907212659-initial")),
360 "Expected to find 20240907212659-initial migration, got {:?}",
361 migration_names
362 );
363 assert!(
364 migration_names
365 .iter()
366 .any(|m| m.contains("20240908123456-second")),
367 "Expected to find 20240908123456-second migration, got {:?}",
368 migration_names
369 );
370 }
371
372 #[tokio::test]
373 async fn test_get_migration_fs_status_with_lock() {
374 let mem_service = Memory::default();
376 let op = Operator::new(mem_service).unwrap().finish();
377
378 op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
380 .await
381 .expect("Failed to write up.sql");
382 op.write(
383 "/migrations/20240101000000-test/lock.toml",
384 "pin = \"abc123\"",
385 )
386 .await
387 .expect("Failed to write lock.toml");
388
389 let pather = FolderPather {
390 spawn_folder: "".to_string(),
391 };
392
393 let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
395 .await
396 .expect("Failed to get migration status");
397
398 assert!(status.has_up_sql, "Migration should have up.sql");
399 assert!(status.has_lock_toml, "Migration should have lock.toml");
400 }
401
402 #[tokio::test]
403 async fn test_get_migration_fs_status_without_lock() {
404 let mem_service = Memory::default();
406 let op = Operator::new(mem_service).unwrap().finish();
407
408 op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
410 .await
411 .expect("Failed to write up.sql");
412
413 let pather = FolderPather {
414 spawn_folder: "".to_string(),
415 };
416
417 let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
418 .await
419 .expect("Failed to get migration status");
420
421 assert!(status.has_up_sql, "Migration should have up.sql");
422 assert!(!status.has_lock_toml, "Migration should not have lock.toml");
423 }
424
425 #[tokio::test]
426 async fn test_list_migration_fs_status_single() {
427 let mem_service = Memory::default();
429 let op = Operator::new(mem_service).unwrap().finish();
430
431 op.write("/migrations/20240101-first/up.sql", "SELECT 1;")
433 .await
434 .expect("Failed to write first up.sql");
435 op.write("/migrations/20240101-first/lock.toml", "pin = \"abc\"")
436 .await
437 .expect("Failed to write first lock.toml");
438
439 op.write("/migrations/20240102-second/up.sql", "SELECT 2;")
440 .await
441 .expect("Failed to write second up.sql");
442
443 let pather = FolderPather {
444 spawn_folder: "".to_string(),
445 };
446
447 let statuses = list_migration_fs_status(&op, &pather, Some("20240101-first"))
449 .await
450 .expect("Failed to list migration status");
451
452 assert_eq!(statuses.len(), 1, "Should return exactly one migration");
454
455 let status = statuses
456 .get("20240101-first")
457 .expect("Should have first migration");
458 assert!(status.has_up_sql, "First migration should have up.sql");
459 assert!(
460 status.has_lock_toml,
461 "First migration should have lock.toml"
462 );
463 }
464
465 #[tokio::test]
466 async fn test_list_migration_fs_status_all() {
467 let mem_service = Memory::default();
469 let op = Operator::new(mem_service).unwrap().finish();
470
471 op.write("/migrations/20240101-first/up.sql", "SELECT 1;")
473 .await
474 .expect("Failed to write first up.sql");
475 op.write("/migrations/20240101-first/lock.toml", "pin = \"abc\"")
476 .await
477 .expect("Failed to write first lock.toml");
478
479 op.write("/migrations/20240102-second/up.sql", "SELECT 2;")
480 .await
481 .expect("Failed to write second up.sql");
482 let pather = FolderPather {
485 spawn_folder: "".to_string(),
486 };
487
488 let statuses = list_migration_fs_status(&op, &pather, None)
490 .await
491 .expect("Failed to list migration status");
492
493 assert_eq!(statuses.len(), 2, "Should return two migrations");
495
496 let first = statuses
497 .get("20240101-first")
498 .expect("Should have first migration");
499 assert!(first.has_up_sql, "First migration should have up.sql");
500 assert!(first.has_lock_toml, "First migration should have lock.toml");
501
502 let second = statuses
503 .get("20240102-second")
504 .expect("Should have second migration");
505 assert!(second.has_up_sql, "Second migration should have up.sql");
506 assert!(
507 !second.has_lock_toml,
508 "Second migration should not have lock.toml"
509 );
510 }
511}