Skip to main content

spawn_db/store/
mod.rs

1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14/// Filesystem-level status of a single migration.
15#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17    pub has_up_sql: bool,
18    pub has_lock_toml: bool,
19}
20
21/// Get the filesystem status of a single migration.
22/// Returns the status indicating whether up.sql and lock.toml files exist.
23pub async fn get_migration_fs_status(
24    op: &Operator,
25    pather: &FolderPather,
26    migration_name: &str,
27) -> Result<MigrationFileStatus> {
28    // List only this specific migration folder
29    let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31    Ok(statuses
32        .get(migration_name)
33        .cloned()
34        .unwrap_or(MigrationFileStatus {
35            has_up_sql: false,
36            has_lock_toml: false,
37        }))
38}
39
40/// Scan the migrations folder and return the filesystem status of each migration,
41/// keyed by migration name. This does not touch the database.
42/// Performs a single recursive list for efficiency with remote storage.
43///
44/// If `migration_name` is provided, only lists that specific migration folder.
45/// Otherwise lists all migrations.
46pub async fn list_migration_fs_status(
47    op: &Operator,
48    pather: &FolderPather,
49    migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51    let migrations_folder = pather.migrations_folder();
52    // Normalize the prefix to match opendal's path normalization:
53    // opendal strips leading "./" and "/" from entry.path() results,
54    // so we must do the same for strip_prefix to work correctly.
55    let normalized_folder = migrations_folder
56        .trim_start_matches("./")
57        .trim_start_matches('/');
58    let migrations_prefix = if let Some(name) = migration_name {
59        // List only the specific migration folder
60        format!("{}/{}/", normalized_folder, name)
61    } else {
62        // List all migrations
63        format!("{}/", normalized_folder)
64    };
65
66    // Single recursive list - efficient for remote storage like S3
67    let mut lister = op
68        .lister_with(&migrations_prefix)
69        .recursive(true)
70        .await
71        .context("listing migrations")?;
72
73    let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
74
75    while let Some(entry) = lister.try_next().await? {
76        let path = entry.path().to_string();
77        let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
78
79        // When listing a specific migration, relative_path is just "up.sql" or "lock.toml".
80        // When listing all migrations, relative_path is "migration-name/up.sql" etc.
81        // Resolve the migration name and filename from the relative path.
82        let (name, filename) = match relative_path.split_once('/') {
83            Some((name, filename)) => (name, filename),
84            None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
85            None => continue,
86        };
87
88        let status = result
89            .entry(name.to_string())
90            .or_insert(MigrationFileStatus {
91                has_up_sql: false,
92                has_lock_toml: false,
93            });
94
95        if filename == "up.sql" {
96            status.has_up_sql = true;
97        } else if filename == "lock.toml" {
98            status.has_lock_toml = true;
99        }
100    }
101
102    Ok(result)
103}
104
105pub struct Store {
106    pinner: Box<dyn Pinner>,
107    fs: Operator,
108    pather: FolderPather,
109}
110
111impl Debug for Store {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Store").field("fs", &self.fs).finish()
114    }
115}
116
117impl Store {
118    pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
119        Ok(Store { pinner, fs, pather })
120    }
121
122    pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
123        let res = self.pinner.load(name, &self.fs).await?;
124
125        Ok(res)
126    }
127
128    pub async fn load_component_bytes(&self, name: &str) -> Result<Option<Vec<u8>>> {
129        self.pinner.load_bytes(name, &self.fs).await
130    }
131
132    pub async fn read_file_bytes(&self, path: &str) -> Result<Vec<u8>> {
133        self.load_component_bytes(path)
134            .await?
135            .ok_or_else(|| anyhow::anyhow!("file not found in components: {}", path))
136    }
137
138    pub async fn load_migration(&self, name: &str) -> Result<String> {
139        let result = self.fs.read(&name).await?;
140        let bytes = result.to_bytes();
141        let contents = String::from_utf8(bytes.to_vec())?;
142
143        Ok(contents)
144    }
145
146    pub async fn list_migrations(&self) -> Result<Vec<String>> {
147        let mut migrations: Vec<String> = Vec::new();
148        let mut fs_lister = self
149            .fs
150            .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
151            .await?;
152        while let Some(entry) = fs_lister.try_next().await? {
153            let path = entry.path().to_string();
154            if path.ends_with("/") {
155                migrations.push(path)
156            }
157        }
158
159        // Sort migrations by name to ensure oldest to newest order
160        migrations.sort();
161
162        Ok(migrations)
163    }
164}
165
166pub enum DesiredOperator {
167    Memory,
168    FileSystem,
169}
170
171// Handy function for getting a disk based folder of data and return an in
172// memory operator that has the same contents.  Particularly useful for tests.
173pub async fn disk_to_operator(
174    source_folder: &str,
175    dest_prefix: Option<&str>,
176    desired_operator: DesiredOperator,
177) -> Result<Operator> {
178    let dest_op = match desired_operator {
179        DesiredOperator::FileSystem => {
180            let dest_service = opendal::services::Fs::default().root("./testout");
181            Operator::new(dest_service)?.finish()
182        }
183        DesiredOperator::Memory => {
184            let dest_service = Memory::default();
185            Operator::new(dest_service)?.finish()
186        }
187    };
188
189    // Create a LocalFileSystem to read from static/example
190    let fs_service = opendal::services::Fs::default().root(source_folder);
191    let source_store = Operator::new(fs_service)
192        .context("disk_to_mem_operator failed to create operator")?
193        .finish();
194
195    // Populate the in-memory store with contents from static/example
196    let store_loc = dest_prefix.unwrap_or_default();
197    crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
198        .await
199        .context("call to populate memory fs from object store")?;
200
201    Ok(dest_op)
202}
203
204pub async fn populate_store_from_store(
205    source_store: &Operator,
206    target_store: &Operator,
207    source_prefix: &str,
208    dest_prefix: &str,
209) -> Result<()> {
210    let mut lister = source_store
211        .lister_with(source_prefix)
212        .recursive(true)
213        .await
214        .context("lister call")?;
215    let mut list_result: Vec<opendal::Entry> = Vec::new();
216
217    while let Some(entry) = lister.try_next().await? {
218        if entry.path().ends_with("/") {
219            continue;
220        }
221        list_result.push(entry);
222    }
223
224    for entry in list_result {
225        // Print out the file we're writing:
226        let dest_object_path = format!("{}{}", dest_prefix, entry.path());
227        let source_object_path = entry.path();
228
229        // Get the object data
230        let bytes = source_store
231            .read(&source_object_path)
232            .await
233            .context(format!("read path {}", &source_object_path))?;
234
235        // Store in target with the same path
236        target_store
237            .write(&dest_object_path, bytes)
238            .await
239            .context("write")?;
240    }
241
242    Ok(())
243}
244
245/// Creates a memory-based OpenDAL operator from an include_dir bundle.
246///
247/// This function takes a bundled directory (created with include_dir!) and
248/// creates an in-memory OpenDAL operator containing all files from that bundle.
249/// This is useful for embedding migrations, templates, or other static files
250/// directly into the binary while still being able to use OpenDAL's interface.
251///
252/// # Arguments
253///
254/// * `included_dir` - A reference to a Dir created with include_dir! macro
255/// * `dest_prefix` - Optional prefix to add to all file paths in the operator
256///
257/// # Returns
258///
259/// A memory-based OpenDAL operator containing all files from the bundled directory
260pub async fn operator_from_includedir(
261    dir: &Dir<'_>,
262    dest_prefix: Option<&str>,
263) -> Result<Operator> {
264    // Create a memory operator
265    let dest_service = Memory::default();
266    let operator = Operator::new(dest_service)?.finish();
267
268    let prefix = dest_prefix.unwrap_or_default();
269
270    // First collect all file information
271    let mut files_to_write = Vec::new();
272    collect_files_from_dir(dir, "", &mut files_to_write);
273
274    // Then write all files to the operator
275    for (dest_path, contents) in &files_to_write {
276        let final_path = format!("{}{}", prefix, dest_path);
277        operator
278            .write(&final_path, contents.clone())
279            .await
280            .context(format!("Failed to write file {}", final_path))?;
281    }
282
283    Ok(operator)
284}
285
286// Helper function to recursively collect file information
287fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
288    for entry in dir.entries() {
289        match entry {
290            DirEntry::Dir(subdir) => {
291                let new_path = if current_path.is_empty() {
292                    subdir.path().to_string_lossy().to_string()
293                } else {
294                    format!(
295                        "{}/{}",
296                        current_path,
297                        subdir.path().file_name().unwrap().to_string_lossy()
298                    )
299                };
300                collect_files_from_dir(subdir, &new_path, files);
301            }
302            DirEntry::File(file) => {
303                let file_path = if current_path.is_empty() {
304                    file.path().to_string_lossy().to_string()
305                } else {
306                    format!(
307                        "{}/{}",
308                        current_path,
309                        file.path().file_name().unwrap().to_string_lossy()
310                    )
311                };
312                files.push((file_path, file.contents().to_vec()));
313            }
314        }
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::store::pinner::latest::Latest;
322    use include_dir::{include_dir, Dir};
323
324    // Create a test directory structure for testing
325    static TEST_DIR: Dir<'_> = include_dir!("./static");
326
327    #[tokio::test]
328    async fn test_operator_from_includedir_with_prefix() {
329        let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
330        assert!(
331            result.is_ok(),
332            "Should create operator with prefix successfully"
333        );
334    }
335
336    #[tokio::test]
337    async fn test_list_migrations_returns_two_migrations() {
338        // Load the two_migrations test folder into a memory operator
339        let op = disk_to_operator(
340            "./static/tests/two_migrations",
341            None,
342            DesiredOperator::Memory,
343        )
344        .await
345        .expect("Failed to create operator from disk");
346
347        // Create a pinner and FolderPather
348        let pinner = Latest::new("").expect("Failed to create Latest pinner");
349        let pather = FolderPather {
350            spawn_folder: "".to_string(),
351        };
352
353        // Create the Store
354        let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
355
356        // Call list_migrations and verify the result
357        let migrations: Vec<String> = store
358            .list_migrations()
359            .await
360            .expect("Failed to list migrations");
361
362        // Should have exactly 2 migrations
363        assert_eq!(
364            migrations.len(),
365            2,
366            "Expected 2 migrations, got {:?}",
367            migrations
368        );
369
370        // Verify the migration names are present (they end with /)
371        let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
372        assert!(
373            migration_names
374                .iter()
375                .any(|m| m.contains("20240907212659-initial")),
376            "Expected to find 20240907212659-initial migration, got {:?}",
377            migration_names
378        );
379        assert!(
380            migration_names
381                .iter()
382                .any(|m| m.contains("20240908123456-second")),
383            "Expected to find 20240908123456-second migration, got {:?}",
384            migration_names
385        );
386    }
387
388    #[tokio::test]
389    async fn test_get_migration_fs_status_with_lock() {
390        // Create an in-memory operator with both up.sql and lock.toml
391        let mem_service = Memory::default();
392        let op = Operator::new(mem_service).unwrap().finish();
393
394        // Write both up.sql and lock.toml
395        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
396            .await
397            .expect("Failed to write up.sql");
398        op.write(
399            "/migrations/20240101000000-test/lock.toml",
400            "pin = \"abc123\"",
401        )
402        .await
403        .expect("Failed to write lock.toml");
404
405        let pather = FolderPather {
406            spawn_folder: "".to_string(),
407        };
408
409        // This migration should have both up.sql and lock.toml
410        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
411            .await
412            .expect("Failed to get migration status");
413
414        assert!(status.has_up_sql, "Migration should have up.sql");
415        assert!(status.has_lock_toml, "Migration should have lock.toml");
416    }
417
418    #[tokio::test]
419    async fn test_get_migration_fs_status_without_lock() {
420        // Create an in-memory operator with just an up.sql file
421        let mem_service = Memory::default();
422        let op = Operator::new(mem_service).unwrap().finish();
423
424        // Write only up.sql, no lock.toml
425        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
426            .await
427            .expect("Failed to write up.sql");
428
429        let pather = FolderPather {
430            spawn_folder: "".to_string(),
431        };
432
433        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
434            .await
435            .expect("Failed to get migration status");
436
437        assert!(status.has_up_sql, "Migration should have up.sql");
438        assert!(!status.has_lock_toml, "Migration should not have lock.toml");
439    }
440
441    #[tokio::test]
442    async fn test_list_migration_fs_status() {
443        // Test listing works for different spawn_folder formats, including
444        // "./" prefix which opendal normalizes away from entry.path().
445        for spawn_folder in [
446            "",
447            "./database/spawn",
448            "/database/spawn",
449            "./spawn",
450            "/spawn",
451        ] {
452            let mem_service = Memory::default();
453            let op = Operator::new(mem_service).unwrap().finish();
454
455            let prefix = spawn_folder
456                .trim_start_matches("./")
457                .trim_start_matches('/');
458            let migrations = if prefix.is_empty() {
459                "migrations".to_string()
460            } else {
461                format!("{}/migrations", prefix)
462            };
463
464            op.write(
465                &format!("{}/20240101-first/up.sql", migrations),
466                "SELECT 1;",
467            )
468            .await
469            .unwrap();
470            op.write(
471                &format!("{}/20240101-first/lock.toml", migrations),
472                "pin = \"abc\"",
473            )
474            .await
475            .unwrap();
476            op.write(
477                &format!("{}/20240102-second/up.sql", migrations),
478                "SELECT 2;",
479            )
480            .await
481            .unwrap();
482
483            let pather = FolderPather {
484                spawn_folder: spawn_folder.to_string(),
485            };
486
487            for filter in [None, Some("20240101-first")] {
488                let statuses = list_migration_fs_status(&op, &pather, filter)
489                    .await
490                    .expect("Failed to list migration statuses");
491
492                let first = statuses
493                    .get("20240101-first")
494                    .expect("Should have first migration");
495                assert!(first.has_up_sql);
496                assert!(first.has_lock_toml);
497
498                if filter.is_none() {
499                    assert_eq!(statuses.len(), 2);
500                    let second = statuses
501                        .get("20240102-second")
502                        .expect("Should have second migration");
503                    assert!(second.has_up_sql);
504                    assert!(!second.has_lock_toml);
505                } else {
506                    assert_eq!(statuses.len(), 1);
507                }
508            }
509        }
510    }
511}