Skip to main content

spawn_db/store/
mod.rs

1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::collections::BTreeMap;
8use std::fmt::Debug;
9
10use crate::store::pinner::Pinner;
11
12pub mod pinner;
13
14/// Filesystem-level status of a single migration.
15#[derive(Debug, Clone)]
16pub struct MigrationFileStatus {
17    pub has_up_sql: bool,
18    pub has_lock_toml: bool,
19}
20
21/// Get the filesystem status of a single migration.
22/// Returns the status indicating whether up.sql and lock.toml files exist.
23pub async fn get_migration_fs_status(
24    op: &Operator,
25    pather: &FolderPather,
26    migration_name: &str,
27) -> Result<MigrationFileStatus> {
28    // List only this specific migration folder
29    let statuses = list_migration_fs_status(op, pather, Some(migration_name)).await?;
30
31    Ok(statuses
32        .get(migration_name)
33        .cloned()
34        .unwrap_or(MigrationFileStatus {
35            has_up_sql: false,
36            has_lock_toml: false,
37        }))
38}
39
40/// Scan the migrations folder and return the filesystem status of each migration,
41/// keyed by migration name. This does not touch the database.
42/// Performs a single recursive list for efficiency with remote storage.
43///
44/// If `migration_name` is provided, only lists that specific migration folder.
45/// Otherwise lists all migrations.
46pub async fn list_migration_fs_status(
47    op: &Operator,
48    pather: &FolderPather,
49    migration_name: Option<&str>,
50) -> Result<BTreeMap<String, MigrationFileStatus>> {
51    let migrations_folder = pather.migrations_folder();
52    // Normalize the prefix to match opendal's path normalization:
53    // opendal strips leading "./" and "/" from entry.path() results,
54    // so we must do the same for strip_prefix to work correctly.
55    let normalized_folder = migrations_folder
56        .trim_start_matches("./")
57        .trim_start_matches('/');
58    let migrations_prefix = if let Some(name) = migration_name {
59        // List only the specific migration folder
60        format!("{}/{}/", normalized_folder, name)
61    } else {
62        // List all migrations
63        format!("{}/", normalized_folder)
64    };
65
66    // Single recursive list - efficient for remote storage like S3
67    let mut lister = op
68        .lister_with(&migrations_prefix)
69        .recursive(true)
70        .await
71        .context("listing migrations")?;
72
73    let mut result: BTreeMap<String, MigrationFileStatus> = BTreeMap::new();
74
75    while let Some(entry) = lister.try_next().await? {
76        let path = entry.path().to_string();
77        let relative_path = path.strip_prefix(&migrations_prefix).unwrap_or(&path);
78
79        // When listing a specific migration, relative_path is just "up.sql" or "lock.toml".
80        // When listing all migrations, relative_path is "migration-name/up.sql" etc.
81        // Resolve the migration name and filename from the relative path.
82        let (name, filename) = match relative_path.split_once('/') {
83            Some((name, filename)) => (name, filename),
84            None if migration_name.is_some() => (migration_name.unwrap(), relative_path),
85            None => continue,
86        };
87
88        let status = result
89            .entry(name.to_string())
90            .or_insert(MigrationFileStatus {
91                has_up_sql: false,
92                has_lock_toml: false,
93            });
94
95        if filename == "up.sql" {
96            status.has_up_sql = true;
97        } else if filename == "lock.toml" {
98            status.has_lock_toml = true;
99        }
100    }
101
102    Ok(result)
103}
104
105pub struct Store {
106    pinner: Box<dyn Pinner>,
107    fs: Operator,
108    pather: FolderPather,
109}
110
111impl Debug for Store {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Store").field("fs", &self.fs).finish()
114    }
115}
116
117impl Store {
118    pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
119        Ok(Store { pinner, fs, pather })
120    }
121
122    pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
123        let res = self.pinner.load(name, &self.fs).await?;
124
125        Ok(res)
126    }
127
128    pub async fn read_file_bytes(&self, path: &str) -> Result<Vec<u8>> {
129        let full_path = format!("{}/{}", self.pather.components_folder(), path);
130        let result = self.fs.read(&full_path).await?;
131        Ok(result.to_bytes().to_vec())
132    }
133
134    pub async fn load_migration(&self, name: &str) -> Result<String> {
135        let result = self.fs.read(&name).await?;
136        let bytes = result.to_bytes();
137        let contents = String::from_utf8(bytes.to_vec())?;
138
139        Ok(contents)
140    }
141
142    pub async fn list_migrations(&self) -> Result<Vec<String>> {
143        let mut migrations: Vec<String> = Vec::new();
144        let mut fs_lister = self
145            .fs
146            .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
147            .await?;
148        while let Some(entry) = fs_lister.try_next().await? {
149            let path = entry.path().to_string();
150            if path.ends_with("/") {
151                migrations.push(path)
152            }
153        }
154
155        // Sort migrations by name to ensure oldest to newest order
156        migrations.sort();
157
158        Ok(migrations)
159    }
160}
161
162pub enum DesiredOperator {
163    Memory,
164    FileSystem,
165}
166
167// Handy function for getting a disk based folder of data and return an in
168// memory operator that has the same contents.  Particularly useful for tests.
169pub async fn disk_to_operator(
170    source_folder: &str,
171    dest_prefix: Option<&str>,
172    desired_operator: DesiredOperator,
173) -> Result<Operator> {
174    let dest_op = match desired_operator {
175        DesiredOperator::FileSystem => {
176            let dest_service = opendal::services::Fs::default().root("./testout");
177            Operator::new(dest_service)?.finish()
178        }
179        DesiredOperator::Memory => {
180            let dest_service = Memory::default();
181            Operator::new(dest_service)?.finish()
182        }
183    };
184
185    // Create a LocalFileSystem to read from static/example
186    let fs_service = opendal::services::Fs::default().root(source_folder);
187    let source_store = Operator::new(fs_service)
188        .context("disk_to_mem_operator failed to create operator")?
189        .finish();
190
191    // Populate the in-memory store with contents from static/example
192    let store_loc = dest_prefix.unwrap_or_default();
193    crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
194        .await
195        .context("call to populate memory fs from object store")?;
196
197    Ok(dest_op)
198}
199
200pub async fn populate_store_from_store(
201    source_store: &Operator,
202    target_store: &Operator,
203    source_prefix: &str,
204    dest_prefix: &str,
205) -> Result<()> {
206    let mut lister = source_store
207        .lister_with(source_prefix)
208        .recursive(true)
209        .await
210        .context("lister call")?;
211    let mut list_result: Vec<opendal::Entry> = Vec::new();
212
213    while let Some(entry) = lister.try_next().await? {
214        if entry.path().ends_with("/") {
215            continue;
216        }
217        list_result.push(entry);
218    }
219
220    for entry in list_result {
221        // Print out the file we're writing:
222        let dest_object_path = format!("{}{}", dest_prefix, entry.path());
223        let source_object_path = entry.path();
224
225        // Get the object data
226        let bytes = source_store
227            .read(&source_object_path)
228            .await
229            .context(format!("read path {}", &source_object_path))?;
230
231        // Store in target with the same path
232        target_store
233            .write(&dest_object_path, bytes)
234            .await
235            .context("write")?;
236    }
237
238    Ok(())
239}
240
241/// Creates a memory-based OpenDAL operator from an include_dir bundle.
242///
243/// This function takes a bundled directory (created with include_dir!) and
244/// creates an in-memory OpenDAL operator containing all files from that bundle.
245/// This is useful for embedding migrations, templates, or other static files
246/// directly into the binary while still being able to use OpenDAL's interface.
247///
248/// # Arguments
249///
250/// * `included_dir` - A reference to a Dir created with include_dir! macro
251/// * `dest_prefix` - Optional prefix to add to all file paths in the operator
252///
253/// # Returns
254///
255/// A memory-based OpenDAL operator containing all files from the bundled directory
256pub async fn operator_from_includedir(
257    dir: &Dir<'_>,
258    dest_prefix: Option<&str>,
259) -> Result<Operator> {
260    // Create a memory operator
261    let dest_service = Memory::default();
262    let operator = Operator::new(dest_service)?.finish();
263
264    let prefix = dest_prefix.unwrap_or_default();
265
266    // First collect all file information
267    let mut files_to_write = Vec::new();
268    collect_files_from_dir(dir, "", &mut files_to_write);
269
270    // Then write all files to the operator
271    for (dest_path, contents) in &files_to_write {
272        let final_path = format!("{}{}", prefix, dest_path);
273        operator
274            .write(&final_path, contents.clone())
275            .await
276            .context(format!("Failed to write file {}", final_path))?;
277    }
278
279    Ok(operator)
280}
281
282// Helper function to recursively collect file information
283fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
284    for entry in dir.entries() {
285        match entry {
286            DirEntry::Dir(subdir) => {
287                let new_path = if current_path.is_empty() {
288                    subdir.path().to_string_lossy().to_string()
289                } else {
290                    format!(
291                        "{}/{}",
292                        current_path,
293                        subdir.path().file_name().unwrap().to_string_lossy()
294                    )
295                };
296                collect_files_from_dir(subdir, &new_path, files);
297            }
298            DirEntry::File(file) => {
299                let file_path = if current_path.is_empty() {
300                    file.path().to_string_lossy().to_string()
301                } else {
302                    format!(
303                        "{}/{}",
304                        current_path,
305                        file.path().file_name().unwrap().to_string_lossy()
306                    )
307                };
308                files.push((file_path, file.contents().to_vec()));
309            }
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::store::pinner::latest::Latest;
318    use include_dir::{include_dir, Dir};
319
320    // Create a test directory structure for testing
321    static TEST_DIR: Dir<'_> = include_dir!("./static");
322
323    #[tokio::test]
324    async fn test_operator_from_includedir_with_prefix() {
325        let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
326        assert!(
327            result.is_ok(),
328            "Should create operator with prefix successfully"
329        );
330    }
331
332    #[tokio::test]
333    async fn test_list_migrations_returns_two_migrations() {
334        // Load the two_migrations test folder into a memory operator
335        let op = disk_to_operator(
336            "./static/tests/two_migrations",
337            None,
338            DesiredOperator::Memory,
339        )
340        .await
341        .expect("Failed to create operator from disk");
342
343        // Create a pinner and FolderPather
344        let pinner = Latest::new("").expect("Failed to create Latest pinner");
345        let pather = FolderPather {
346            spawn_folder: "".to_string(),
347        };
348
349        // Create the Store
350        let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
351
352        // Call list_migrations and verify the result
353        let migrations: Vec<String> = store
354            .list_migrations()
355            .await
356            .expect("Failed to list migrations");
357
358        // Should have exactly 2 migrations
359        assert_eq!(
360            migrations.len(),
361            2,
362            "Expected 2 migrations, got {:?}",
363            migrations
364        );
365
366        // Verify the migration names are present (they end with /)
367        let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
368        assert!(
369            migration_names
370                .iter()
371                .any(|m| m.contains("20240907212659-initial")),
372            "Expected to find 20240907212659-initial migration, got {:?}",
373            migration_names
374        );
375        assert!(
376            migration_names
377                .iter()
378                .any(|m| m.contains("20240908123456-second")),
379            "Expected to find 20240908123456-second migration, got {:?}",
380            migration_names
381        );
382    }
383
384    #[tokio::test]
385    async fn test_get_migration_fs_status_with_lock() {
386        // Create an in-memory operator with both up.sql and lock.toml
387        let mem_service = Memory::default();
388        let op = Operator::new(mem_service).unwrap().finish();
389
390        // Write both up.sql and lock.toml
391        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
392            .await
393            .expect("Failed to write up.sql");
394        op.write(
395            "/migrations/20240101000000-test/lock.toml",
396            "pin = \"abc123\"",
397        )
398        .await
399        .expect("Failed to write lock.toml");
400
401        let pather = FolderPather {
402            spawn_folder: "".to_string(),
403        };
404
405        // This migration should have both up.sql and lock.toml
406        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
407            .await
408            .expect("Failed to get migration status");
409
410        assert!(status.has_up_sql, "Migration should have up.sql");
411        assert!(status.has_lock_toml, "Migration should have lock.toml");
412    }
413
414    #[tokio::test]
415    async fn test_get_migration_fs_status_without_lock() {
416        // Create an in-memory operator with just an up.sql file
417        let mem_service = Memory::default();
418        let op = Operator::new(mem_service).unwrap().finish();
419
420        // Write only up.sql, no lock.toml
421        op.write("/migrations/20240101000000-test/up.sql", "SELECT 1;")
422            .await
423            .expect("Failed to write up.sql");
424
425        let pather = FolderPather {
426            spawn_folder: "".to_string(),
427        };
428
429        let status = get_migration_fs_status(&op, &pather, "20240101000000-test")
430            .await
431            .expect("Failed to get migration status");
432
433        assert!(status.has_up_sql, "Migration should have up.sql");
434        assert!(!status.has_lock_toml, "Migration should not have lock.toml");
435    }
436
437    #[tokio::test]
438    async fn test_list_migration_fs_status() {
439        // Test listing works for different spawn_folder formats, including
440        // "./" prefix which opendal normalizes away from entry.path().
441        for spawn_folder in [
442            "",
443            "./database/spawn",
444            "/database/spawn",
445            "./spawn",
446            "/spawn",
447        ] {
448            let mem_service = Memory::default();
449            let op = Operator::new(mem_service).unwrap().finish();
450
451            let prefix = spawn_folder
452                .trim_start_matches("./")
453                .trim_start_matches('/');
454            let migrations = if prefix.is_empty() {
455                "migrations".to_string()
456            } else {
457                format!("{}/migrations", prefix)
458            };
459
460            op.write(
461                &format!("{}/20240101-first/up.sql", migrations),
462                "SELECT 1;",
463            )
464            .await
465            .unwrap();
466            op.write(
467                &format!("{}/20240101-first/lock.toml", migrations),
468                "pin = \"abc\"",
469            )
470            .await
471            .unwrap();
472            op.write(
473                &format!("{}/20240102-second/up.sql", migrations),
474                "SELECT 2;",
475            )
476            .await
477            .unwrap();
478
479            let pather = FolderPather {
480                spawn_folder: spawn_folder.to_string(),
481            };
482
483            for filter in [None, Some("20240101-first")] {
484                let statuses = list_migration_fs_status(&op, &pather, filter)
485                    .await
486                    .expect("Failed to list migration statuses");
487
488                let first = statuses
489                    .get("20240101-first")
490                    .expect("Should have first migration");
491                assert!(first.has_up_sql);
492                assert!(first.has_lock_toml);
493
494                if filter.is_none() {
495                    assert_eq!(statuses.len(), 2);
496                    let second = statuses
497                        .get("20240102-second")
498                        .expect("Should have second migration");
499                    assert!(second.has_up_sql);
500                    assert!(!second.has_lock_toml);
501                } else {
502                    assert_eq!(statuses.len(), 1);
503                }
504            }
505        }
506    }
507}