Skip to main content

spawn_db/store/
mod.rs

1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14/// Filesystem-level status of a single migration.
15#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17    pub has_up_sql: bool,
18    pub has_lock_toml: bool,
19}
20
21/// Get the filesystem status of a single migration.
22/// Returns the status indicating whether up.sql and lock.toml files exist.
23pub async fn get_migration_fs_status(
24    op: &Operator,
25    pather: &FolderPather,
26    migration_name: &str,
27) -> Result<MigrationFileStatus> {
28    // List only this specific migration folder
29    let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31    Ok(statuses
32        .get(migration_name)
33        .cloned()
34        .unwrap_or(MigrationFileStatus {
35            has_up_sql: false,
36            has_lock_toml: false,
37        }))
38}
39
40/// Scan the migrations folder and return the filesystem status of each migration,
41/// keyed by migration name. This does not touch the database.
42/// Performs a single recursive list for efficiency with remote storage.
43///
44/// If `migration_name` is provided, only lists that specific migration folder.
45/// Otherwise lists all migrations.
46pub async fn list_migration_fs_status(
47    op: &Operator,
48    pather: &FolderPather,
49    migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51    let migrations_folder = pather.migrations_folder();
52    let migrations_prefix = if let Some(name) = migration_name {
53        // List only the specific migration folder
54        format!("{}/{}/", migrations_folder.trim_start_matches('/'), name)
55    } else {
56        // List all migrations
57        format!("{}/", migrations_folder.trim_start_matches('/'))
58    };
59
60    // Single recursive list - efficient for remote storage like S3
61    let mut lister = op
62        .lister_with(&migrations_prefix)
63        .recursive(true)
64        .await
65        .context("listing migrations")?;
66
67    let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
68
69    while let Some(entry) = lister.try_next().await? {
70        let path = entry.path().to_string();
71        let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
72
73        // When listing a specific migration, relative_path is just "up.sql" or "lock.toml".
74        // When listing all migrations, relative_path is "migration-name/up.sql" etc.
75        // Resolve the migration name and filename from the relative path.
76        let (name, filename) = match relative_path.split_once('/') {
77            Some((name, filename)) => (name, filename),
78            None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
79            None => continue,
80        };
81
82        let status = result
83            .entry(name.to_string())
84            .or_insert(MigrationFileStatus {
85                has_up_sql: false,
86                has_lock_toml: false,
87            });
88
89        if filename == "up.sql" {
90            status.has_up_sql = true;
91        } else if filename == "lock.toml" {
92            status.has_lock_toml = true;
93        }
94    }
95
96    Ok(result)
97}
98
99pub struct Store {
100    pinner: Box<dyn Pinner>,
101    fs: Operator,
102    pather: FolderPather,
103}
104
105impl Debug for Store {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("Store").field("fs", &self.fs).finish()
108    }
109}
110
111impl Store {
112    pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
113        Ok(Store { pinner, fs, pather })
114    }
115
116    pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
117        let res = self.pinner.load(name, &self.fs).await?;
118
119        Ok(res)
120    }
121
122    pub async fn load_migration(&self, name: &str) -> Result<String> {
123        let result = self.fs.read(&name).await?;
124        let bytes = result.to_bytes();
125        let contents = String::from_utf8(bytes.to_vec())?;
126
127        Ok(contents)
128    }
129
130    pub async fn list_migrations(&self) -> Result<Vec<String>> {
131        let mut migrations: Vec<String> = Vec::new();
132        let mut fs_lister = self
133            .fs
134            .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
135            .await?;
136        while let Some(entry) = fs_lister.try_next().await? {
137            let path = entry.path().to_string();
138            if path.ends_with("/") {
139                migrations.push(path)
140            }
141        }
142
143        // Sort migrations by name to ensure oldest to newest order
144        migrations.sort();
145
146        Ok(migrations)
147    }
148}
149
150pub enum DesiredOperator {
151    Memory,
152    FileSystem,
153}
154
155// Handy function for getting a disk based folder of data and return an in
156// memory operator that has the same contents.  Particularly useful for tests.
157pub async fn disk_to_operator(
158    source_folder: &str,
159    dest_prefix: Option<&str>,
160    desired_operator: DesiredOperator,
161) -> Result<Operator> {
162    let dest_op = match desired_operator {
163        DesiredOperator::FileSystem => {
164            let dest_service = opendal::services::Fs::default().root("./testout");
165            Operator::new(dest_service)?.finish()
166        }
167        DesiredOperator::Memory => {
168            let dest_service = Memory::default();
169            Operator::new(dest_service)?.finish()
170        }
171    };
172
173    // Create a LocalFileSystem to read from static/example
174    let fs_service = opendal::services::Fs::default().root(source_folder);
175    let source_store = Operator::new(fs_service)
176        .context("disk_to_mem_operator failed to create operator")?
177        .finish();
178
179    // Populate the in-memory store with contents from static/example
180    let store_loc = dest_prefix.unwrap_or_default();
181    crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
182        .await
183        .context("call to populate memory fs from object store")?;
184
185    Ok(dest_op)
186}
187
188pub async fn populate_store_from_store(
189    source_store: &Operator,
190    target_store: &Operator,
191    source_prefix: &str,
192    dest_prefix: &str,
193) -> Result<()> {
194    let mut lister = source_store
195        .lister_with(source_prefix)
196        .recursive(true)
197        .await
198        .context("lister call")?;
199    let mut list_result: Vec<opendal::Entry> = Vec::new();
200
201    while let Some(entry) = lister.try_next().await? {
202        if entry.path().ends_with("/") {
203            continue;
204        }
205        list_result.push(entry);
206    }
207
208    for entry in list_result {
209        // Print out the file we're writing:
210        let dest_object_path = format!("{}{}", dest_prefix, entry.path());
211        let source_object_path = entry.path();
212
213        // Get the object data
214        let bytes = source_store
215            .read(&source_object_path)
216            .await
217            .context(format!("read path {}", &source_object_path))?;
218
219        // Store in target with the same path
220        target_store
221            .write(&dest_object_path, bytes)
222            .await
223            .context("write")?;
224    }
225
226    Ok(())
227}
228
229/// Creates a memory-based OpenDAL operator from an include_dir bundle.
230///
231/// This function takes a bundled directory (created with include_dir!) and
232/// creates an in-memory OpenDAL operator containing all files from that bundle.
233/// This is useful for embedding migrations, templates, or other static files
234/// directly into the binary while still being able to use OpenDAL's interface.
235///
236/// # Arguments
237///
238/// * `included_dir` - A reference to a Dir created with include_dir! macro
239/// * `dest_prefix` - Optional prefix to add to all file paths in the operator
240///
241/// # Returns
242///
243/// A memory-based OpenDAL operator containing all files from the bundled directory
244pub async fn operator_from_includedir(
245    dir: &Dir<'_>,
246    dest_prefix: Option<&str>,
247) -> Result<Operator> {
248    // Create a memory operator
249    let dest_service = Memory::default();
250    let operator = Operator::new(dest_service)?.finish();
251
252    let prefix = dest_prefix.unwrap_or_default();
253
254    // First collect all file information
255    let mut files_to_write = Vec::new();
256    collect_files_from_dir(dir, "", &mut files_to_write);
257
258    // Then write all files to the operator
259    for (dest_path, contents) in &files_to_write {
260        let final_path = format!("{}{}", prefix, dest_path);
261        operator
262            .write(&final_path, contents.clone())
263            .await
264            .context(format!("Failed to write file {}", final_path))?;
265    }
266
267    Ok(operator)
268}
269
270// Helper function to recursively collect file information
271fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
272    for entry in dir.entries() {
273        match entry {
274            DirEntry::Dir(subdir) => {
275                let new_path = if current_path.is_empty() {
276                    subdir.path().to_string_lossy().to_string()
277                } else {
278                    format!(
279                        "{}/{}",
280                        current_path,
281                        subdir.path().file_name().unwrap().to_string_lossy()
282                    )
283                };
284                collect_files_from_dir(subdir, &new_path, files);
285            }
286            DirEntry::File(file) => {
287                let file_path = if current_path.is_empty() {
288                    file.path().to_string_lossy().to_string()
289                } else {
290                    format!(
291                        "{}/{}",
292                        current_path,
293                        file.path().file_name().unwrap().to_string_lossy()
294                    )
295                };
296                files.push((file_path, file.contents().to_vec()));
297            }
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::store::pinner::latest::Latest;
306    use include_dir::{include_dir, Dir};
307
308    // Create a test directory structure for testing
309    static TEST_DIR: Dir<'_> = include_dir!("./static");
310
311    #[tokio::test]
312    async fn test_operator_from_includedir_with_prefix() {
313        let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
314        assert!(
315            result.is_ok(),
316            "Should create operator with prefix successfully"
317        );
318    }
319
320    #[tokio::test]
321    async fn test_list_migrations_returns_two_migrations() {
322        // Load the two_migrations test folder into a memory operator
323        let op = disk_to_operator(
324            "./static/tests/two_migrations",
325            None,
326            DesiredOperator::Memory,
327        )
328        .await
329        .expect("Failed to create operator from disk");
330
331        // Create a pinner and FolderPather
332        let pinner = Latest::new("").expect("Failed to create Latest pinner");
333        let pather = FolderPather {
334            spawn_folder: "".to_string(),
335        };
336
337        // Create the Store
338        let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
339
340        // Call list_migrations and verify the result
341        let migrations: Vec<String> = store
342            .list_migrations()
343            .await
344            .expect("Failed to list migrations");
345
346        // Should have exactly 2 migrations
347        assert_eq!(
348            migrations.len(),
349            2,
350            "Expected 2 migrations, got {:?}",
351            migrations
352        );
353
354        // Verify the migration names are present (they end with /)
355        let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
356        assert!(
357            migration_names
358                .iter()
359                .any(|m| m.contains("20240907212659-initial")),
360            "Expected to find 20240907212659-initial migration, got {:?}",
361            migration_names
362        );
363        assert!(
364            migration_names
365                .iter()
366                .any(|m| m.contains("20240908123456-second")),
367            "Expected to find 20240908123456-second migration, got {:?}",
368            migration_names
369        );
370    }
371
372    #[tokio::test]
373    async fn test_get_migration_fs_status_with_lock() {
374        // Create an in-memory operator with both up.sql and lock.toml
375        let mem_service = Memory::default();
376        let op = Operator::new(mem_service).unwrap().finish();
377
378        // Write both up.sql and lock.toml
379        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
380            .await
381            .expect("Failed to write up.sql");
382        op.write(
383            "/migrations/20240101000000-test/lock.toml",
384            "pin = \"abc123\"",
385        )
386        .await
387        .expect("Failed to write lock.toml");
388
389        let pather = FolderPather {
390            spawn_folder: "".to_string(),
391        };
392
393        // This migration should have both up.sql and lock.toml
394        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
395            .await
396            .expect("Failed to get migration status");
397
398        assert!(status.has_up_sql, "Migration should have up.sql");
399        assert!(status.has_lock_toml, "Migration should have lock.toml");
400    }
401
402    #[tokio::test]
403    async fn test_get_migration_fs_status_without_lock() {
404        // Create an in-memory operator with just an up.sql file
405        let mem_service = Memory::default();
406        let op = Operator::new(mem_service).unwrap().finish();
407
408        // Write only up.sql, no lock.toml
409        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
410            .await
411            .expect("Failed to write up.sql");
412
413        let pather = FolderPather {
414            spawn_folder: "".to_string(),
415        };
416
417        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
418            .await
419            .expect("Failed to get migration status");
420
421        assert!(status.has_up_sql, "Migration should have up.sql");
422        assert!(!status.has_lock_toml, "Migration should not have lock.toml");
423    }
424
425    #[tokio::test]
426    async fn test_list_migration_fs_status_single() {
427        // Create an in-memory operator with multiple migrations
428        let mem_service = Memory::default();
429        let op = Operator::new(mem_service).unwrap().finish();
430
431        // Create two migrations
432        op.write("/migrations/20240101-first/up.sql", "SELECT 1;")
433            .await
434            .expect("Failed to write first up.sql");
435        op.write("/migrations/20240101-first/lock.toml", "pin = \"abc\"")
436            .await
437            .expect("Failed to write first lock.toml");
438
439        op.write("/migrations/20240102-second/up.sql", "SELECT 2;")
440            .await
441            .expect("Failed to write second up.sql");
442
443        let pather = FolderPather {
444            spawn_folder: "".to_string(),
445        };
446
447        // List only the first migration
448        let statuses = list_migration_fs_status(&op, &pather, Some("20240101-first"))
449            .await
450            .expect("Failed to list migration status");
451
452        // Should only return one migration
453        assert_eq!(statuses.len(), 1, "Should return exactly one migration");
454
455        let status = statuses
456            .get("20240101-first")
457            .expect("Should have first migration");
458        assert!(status.has_up_sql, "First migration should have up.sql");
459        assert!(
460            status.has_lock_toml,
461            "First migration should have lock.toml"
462        );
463    }
464
465    #[tokio::test]
466    async fn test_list_migration_fs_status_all() {
467        // Create an in-memory operator with multiple migrations
468        let mem_service = Memory::default();
469        let op = Operator::new(mem_service).unwrap().finish();
470
471        // Create two migrations
472        op.write("/migrations/20240101-first/up.sql", "SELECT 1;")
473            .await
474            .expect("Failed to write first up.sql");
475        op.write("/migrations/20240101-first/lock.toml", "pin = \"abc\"")
476            .await
477            .expect("Failed to write first lock.toml");
478
479        op.write("/migrations/20240102-second/up.sql", "SELECT 2;")
480            .await
481            .expect("Failed to write second up.sql");
482        // Note: second migration has no lock.toml
483
484        let pather = FolderPather {
485            spawn_folder: "".to_string(),
486        };
487
488        // List all migrations
489        let statuses = list_migration_fs_status(&op, &pather, None)
490            .await
491            .expect("Failed to list migration status");
492
493        // Should return both migrations
494        assert_eq!(statuses.len(), 2, "Should return two migrations");
495
496        let first = statuses
497            .get("20240101-first")
498            .expect("Should have first migration");
499        assert!(first.has_up_sql, "First migration should have up.sql");
500        assert!(first.has_lock_toml, "First migration should have lock.toml");
501
502        let second = statuses
503            .get("20240102-second")
504            .expect("Should have second migration");
505        assert!(second.has_up_sql, "Second migration should have up.sql");
506        assert!(
507            !second.has_lock_toml,
508            "Second migration should not have lock.toml"
509        );
510    }
511}