Skip to main content

spawn_db/store/
mod.rs

1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14/// Filesystem-level status of a single migration.
15#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17    pub has_up_sql: bool,
18    pub has_lock_toml: bool,
19}
20
21/// Get the filesystem status of a single migration.
22/// Returns the status indicating whether up.sql and lock.toml files exist.
23pub async fn get_migration_fs_status(
24    op: &Operator,
25    pather: &FolderPather,
26    migration_name: &str,
27) -> Result<MigrationFileStatus> {
28    // List only this specific migration folder
29    let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31    Ok(statuses
32        .get(migration_name)
33        .cloned()
34        .unwrap_or(MigrationFileStatus {
35            has_up_sql: false,
36            has_lock_toml: false,
37        }))
38}
39
40/// Scan the migrations folder and return the filesystem status of each migration,
41/// keyed by migration name. This does not touch the database.
42/// Performs a single recursive list for efficiency with remote storage.
43///
44/// If `migration_name` is provided, only lists that specific migration folder.
45/// Otherwise lists all migrations.
46pub async fn list_migration_fs_status(
47    op: &Operator,
48    pather: &FolderPather,
49    migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51    let migrations_folder = pather.migrations_folder();
52    // Normalize the prefix to match opendal's path normalization:
53    // opendal strips leading "./" and "/" from entry.path() results,
54    // so we must do the same for strip_prefix to work correctly.
55    let normalized_folder = migrations_folder
56        .trim_start_matches("./")
57        .trim_start_matches('/');
58    let migrations_prefix = if let Some(name) = migration_name {
59        // List only the specific migration folder
60        format!("{}/{}/", normalized_folder, name)
61    } else {
62        // List all migrations
63        format!("{}/", normalized_folder)
64    };
65
66    // Single recursive list - efficient for remote storage like S3
67    let mut lister = op
68        .lister_with(&migrations_prefix)
69        .recursive(true)
70        .await
71        .context("listing migrations")?;
72
73    let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
74
75    while let Some(entry) = lister.try_next().await? {
76        let path = entry.path().to_string();
77        let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
78
79        // When listing a specific migration, relative_path is just "up.sql" or "lock.toml".
80        // When listing all migrations, relative_path is "migration-name/up.sql" etc.
81        // Resolve the migration name and filename from the relative path.
82        let (name, filename) = match relative_path.split_once('/') {
83            Some((name, filename)) => (name, filename),
84            None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
85            None => continue,
86        };
87
88        let status = result
89            .entry(name.to_string())
90            .or_insert(MigrationFileStatus {
91                has_up_sql: false,
92                has_lock_toml: false,
93            });
94
95        if filename == "up.sql" {
96            status.has_up_sql = true;
97        } else if filename == "lock.toml" {
98            status.has_lock_toml = true;
99        }
100    }
101
102    Ok(result)
103}
104
105pub struct Store {
106    pinner: Box<dyn Pinner>,
107    fs: Operator,
108    pather: FolderPather,
109}
110
111impl Debug for Store {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Store").field("fs", &self.fs).finish()
114    }
115}
116
117impl Store {
118    pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
119        Ok(Store { pinner, fs, pather })
120    }
121
122    pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
123        let res = self.pinner.load(name, &self.fs).await?;
124
125        Ok(res)
126    }
127
128    pub async fn load_migration(&self, name: &str) -> Result<String> {
129        let result = self.fs.read(&name).await?;
130        let bytes = result.to_bytes();
131        let contents = String::from_utf8(bytes.to_vec())?;
132
133        Ok(contents)
134    }
135
136    pub async fn list_migrations(&self) -> Result<Vec<String>> {
137        let mut migrations: Vec<String> = Vec::new();
138        let mut fs_lister = self
139            .fs
140            .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
141            .await?;
142        while let Some(entry) = fs_lister.try_next().await? {
143            let path = entry.path().to_string();
144            if path.ends_with("/") {
145                migrations.push(path)
146            }
147        }
148
149        // Sort migrations by name to ensure oldest to newest order
150        migrations.sort();
151
152        Ok(migrations)
153    }
154}
155
156pub enum DesiredOperator {
157    Memory,
158    FileSystem,
159}
160
161// Handy function for getting a disk based folder of data and return an in
162// memory operator that has the same contents.  Particularly useful for tests.
163pub async fn disk_to_operator(
164    source_folder: &str,
165    dest_prefix: Option<&str>,
166    desired_operator: DesiredOperator,
167) -> Result<Operator> {
168    let dest_op = match desired_operator {
169        DesiredOperator::FileSystem => {
170            let dest_service = opendal::services::Fs::default().root("./testout");
171            Operator::new(dest_service)?.finish()
172        }
173        DesiredOperator::Memory => {
174            let dest_service = Memory::default();
175            Operator::new(dest_service)?.finish()
176        }
177    };
178
179    // Create a LocalFileSystem to read from static/example
180    let fs_service = opendal::services::Fs::default().root(source_folder);
181    let source_store = Operator::new(fs_service)
182        .context("disk_to_mem_operator failed to create operator")?
183        .finish();
184
185    // Populate the in-memory store with contents from static/example
186    let store_loc = dest_prefix.unwrap_or_default();
187    crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
188        .await
189        .context("call to populate memory fs from object store")?;
190
191    Ok(dest_op)
192}
193
194pub async fn populate_store_from_store(
195    source_store: &Operator,
196    target_store: &Operator,
197    source_prefix: &str,
198    dest_prefix: &str,
199) -> Result<()> {
200    let mut lister = source_store
201        .lister_with(source_prefix)
202        .recursive(true)
203        .await
204        .context("lister call")?;
205    let mut list_result: Vec<opendal::Entry> = Vec::new();
206
207    while let Some(entry) = lister.try_next().await? {
208        if entry.path().ends_with("/") {
209            continue;
210        }
211        list_result.push(entry);
212    }
213
214    for entry in list_result {
215        // Print out the file we're writing:
216        let dest_object_path = format!("{}{}", dest_prefix, entry.path());
217        let source_object_path = entry.path();
218
219        // Get the object data
220        let bytes = source_store
221            .read(&source_object_path)
222            .await
223            .context(format!("read path {}", &source_object_path))?;
224
225        // Store in target with the same path
226        target_store
227            .write(&dest_object_path, bytes)
228            .await
229            .context("write")?;
230    }
231
232    Ok(())
233}
234
235/// Creates a memory-based OpenDAL operator from an include_dir bundle.
236///
237/// This function takes a bundled directory (created with include_dir!) and
238/// creates an in-memory OpenDAL operator containing all files from that bundle.
239/// This is useful for embedding migrations, templates, or other static files
240/// directly into the binary while still being able to use OpenDAL's interface.
241///
242/// # Arguments
243///
244/// * `included_dir` - A reference to a Dir created with include_dir! macro
245/// * `dest_prefix` - Optional prefix to add to all file paths in the operator
246///
247/// # Returns
248///
249/// A memory-based OpenDAL operator containing all files from the bundled directory
250pub async fn operator_from_includedir(
251    dir: &Dir<'_>,
252    dest_prefix: Option<&str>,
253) -> Result<Operator> {
254    // Create a memory operator
255    let dest_service = Memory::default();
256    let operator = Operator::new(dest_service)?.finish();
257
258    let prefix = dest_prefix.unwrap_or_default();
259
260    // First collect all file information
261    let mut files_to_write = Vec::new();
262    collect_files_from_dir(dir, "", &mut files_to_write);
263
264    // Then write all files to the operator
265    for (dest_path, contents) in &files_to_write {
266        let final_path = format!("{}{}", prefix, dest_path);
267        operator
268            .write(&final_path, contents.clone())
269            .await
270            .context(format!("Failed to write file {}", final_path))?;
271    }
272
273    Ok(operator)
274}
275
276// Helper function to recursively collect file information
277fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
278    for entry in dir.entries() {
279        match entry {
280            DirEntry::Dir(subdir) => {
281                let new_path = if current_path.is_empty() {
282                    subdir.path().to_string_lossy().to_string()
283                } else {
284                    format!(
285                        "{}/{}",
286                        current_path,
287                        subdir.path().file_name().unwrap().to_string_lossy()
288                    )
289                };
290                collect_files_from_dir(subdir, &new_path, files);
291            }
292            DirEntry::File(file) => {
293                let file_path = if current_path.is_empty() {
294                    file.path().to_string_lossy().to_string()
295                } else {
296                    format!(
297                        "{}/{}",
298                        current_path,
299                        file.path().file_name().unwrap().to_string_lossy()
300                    )
301                };
302                files.push((file_path, file.contents().to_vec()));
303            }
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use crate::store::pinner::latest::Latest;
312    use include_dir::{include_dir, Dir};
313
314    // Create a test directory structure for testing
315    static TEST_DIR: Dir<'_> = include_dir!("./static");
316
317    #[tokio::test]
318    async fn test_operator_from_includedir_with_prefix() {
319        let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
320        assert!(
321            result.is_ok(),
322            "Should create operator with prefix successfully"
323        );
324    }
325
326    #[tokio::test]
327    async fn test_list_migrations_returns_two_migrations() {
328        // Load the two_migrations test folder into a memory operator
329        let op = disk_to_operator(
330            "./static/tests/two_migrations",
331            None,
332            DesiredOperator::Memory,
333        )
334        .await
335        .expect("Failed to create operator from disk");
336
337        // Create a pinner and FolderPather
338        let pinner = Latest::new("").expect("Failed to create Latest pinner");
339        let pather = FolderPather {
340            spawn_folder: "".to_string(),
341        };
342
343        // Create the Store
344        let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
345
346        // Call list_migrations and verify the result
347        let migrations: Vec<String> = store
348            .list_migrations()
349            .await
350            .expect("Failed to list migrations");
351
352        // Should have exactly 2 migrations
353        assert_eq!(
354            migrations.len(),
355            2,
356            "Expected 2 migrations, got {:?}",
357            migrations
358        );
359
360        // Verify the migration names are present (they end with /)
361        let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
362        assert!(
363            migration_names
364                .iter()
365                .any(|m| m.contains("20240907212659-initial")),
366            "Expected to find 20240907212659-initial migration, got {:?}",
367            migration_names
368        );
369        assert!(
370            migration_names
371                .iter()
372                .any(|m| m.contains("20240908123456-second")),
373            "Expected to find 20240908123456-second migration, got {:?}",
374            migration_names
375        );
376    }
377
378    #[tokio::test]
379    async fn test_get_migration_fs_status_with_lock() {
380        // Create an in-memory operator with both up.sql and lock.toml
381        let mem_service = Memory::default();
382        let op = Operator::new(mem_service).unwrap().finish();
383
384        // Write both up.sql and lock.toml
385        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
386            .await
387            .expect("Failed to write up.sql");
388        op.write(
389            "/migrations/20240101000000-test/lock.toml",
390            "pin = \"abc123\"",
391        )
392        .await
393        .expect("Failed to write lock.toml");
394
395        let pather = FolderPather {
396            spawn_folder: "".to_string(),
397        };
398
399        // This migration should have both up.sql and lock.toml
400        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
401            .await
402            .expect("Failed to get migration status");
403
404        assert!(status.has_up_sql, "Migration should have up.sql");
405        assert!(status.has_lock_toml, "Migration should have lock.toml");
406    }
407
408    #[tokio::test]
409    async fn test_get_migration_fs_status_without_lock() {
410        // Create an in-memory operator with just an up.sql file
411        let mem_service = Memory::default();
412        let op = Operator::new(mem_service).unwrap().finish();
413
414        // Write only up.sql, no lock.toml
415        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
416            .await
417            .expect("Failed to write up.sql");
418
419        let pather = FolderPather {
420            spawn_folder: "".to_string(),
421        };
422
423        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
424            .await
425            .expect("Failed to get migration status");
426
427        assert!(status.has_up_sql, "Migration should have up.sql");
428        assert!(!status.has_lock_toml, "Migration should not have lock.toml");
429    }
430
431    #[tokio::test]
432    async fn test_list_migration_fs_status() {
433        // Test listing works for different spawn_folder formats, including
434        // "./" prefix which opendal normalizes away from entry.path().
435        for spawn_folder in [
436            "",
437            "./database/spawn",
438            "/database/spawn",
439            "./spawn",
440            "/spawn",
441        ] {
442            let mem_service = Memory::default();
443            let op = Operator::new(mem_service).unwrap().finish();
444
445            let prefix = spawn_folder
446                .trim_start_matches("./")
447                .trim_start_matches('/');
448            let migrations = if prefix.is_empty() {
449                "migrations".to_string()
450            } else {
451                format!("{}/migrations", prefix)
452            };
453
454            op.write(
455                &format!("{}/20240101-first/up.sql", migrations),
456                "SELECT 1;",
457            )
458            .await
459            .unwrap();
460            op.write(
461                &format!("{}/20240101-first/lock.toml", migrations),
462                "pin = \"abc\"",
463            )
464            .await
465            .unwrap();
466            op.write(
467                &format!("{}/20240102-second/up.sql", migrations),
468                "SELECT 2;",
469            )
470            .await
471            .unwrap();
472
473            let pather = FolderPather {
474                spawn_folder: spawn_folder.to_string(),
475            };
476
477            for filter in [None, Some("20240101-first")] {
478                let statuses = list_migration_fs_status(&op, &pather, filter)
479                    .await
480                    .expect("Failed to list migration statuses");
481
482                let first = statuses
483                    .get("20240101-first")
484                    .expect("Should have first migration");
485                assert!(first.has_up_sql);
486                assert!(first.has_lock_toml);
487
488                if filter.is_none() {
489                    assert_eq!(statuses.len(), 2);
490                    let second = statuses
491                        .get("20240102-second")
492                        .expect("Should have second migration");
493                    assert!(second.has_up_sql);
494                    assert!(!second.has_lock_toml);
495                } else {
496                    assert_eq!(statuses.len(), 1);
497                }
498            }
499        }
500    }
501}