Skip to main content

spawn_db/store/
mod.rs

1use crate::config::FolderPather;
2use anyhow::{Context, Result};
3use futures::TryStreamExt;
4use include_dir::{Dir, DirEntry};
5use opendal::services::Memory;
6use opendal::Operator;
7use std::fmt::Debug;
8
9use crate::store::pinner::Pinner;
10
11pub mod pinner;
12
13pub struct Store {
14    pinner: Box<dyn Pinner>,
15    fs: Operator,
16    pather: FolderPather,
17}
18
19impl Debug for Store {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("Store").field("fs", &self.fs).finish()
22    }
23}
24
25impl Store {
26    pub fn new(pinner: Box<dyn Pinner>, fs: Operator, pather: FolderPather) -> Result<Store> {
27        Ok(Store { pinner, fs, pather })
28    }
29
30    pub async fn load_component(&self, name: &str) -> Result<Option<String>> {
31        let res = self.pinner.load(name, &self.fs).await?;
32
33        Ok(res)
34    }
35
36    pub async fn load_migration(&self, name: &str) -> Result<String> {
37        let result = self.fs.read(&name).await?;
38        let bytes = result.to_bytes();
39        let contents = String::from_utf8(bytes.to_vec())?;
40
41        Ok(contents)
42    }
43
44    pub async fn list_migrations(&self) -> Result<Vec<String>> {
45        let mut migrations: Vec<String> = Vec::new();
46        let mut fs_lister = self
47            .fs
48            .lister(format!("{}/", &self.pather.migrations_folder()).as_ref())
49            .await?;
50        while let Some(entry) = fs_lister.try_next().await? {
51            let path = entry.path().to_string();
52            if path.ends_with("/") {
53                migrations.push(path)
54            }
55        }
56
57        // Sort migrations by name to ensure oldest to newest order
58        migrations.sort();
59
60        Ok(migrations)
61    }
62}
63
64pub enum DesiredOperator {
65    Memory,
66    FileSystem,
67}
68
69// Handy function for getting a disk based folder of data and return an in
70// memory operator that has the same contents.  Particularly useful for tests.
71pub async fn disk_to_operator(
72    source_folder: &str,
73    dest_prefix: Option<&str>,
74    desired_operator: DesiredOperator,
75) -> Result<Operator> {
76    let dest_op = match desired_operator {
77        DesiredOperator::FileSystem => {
78            let dest_service = opendal::services::Fs::default().root("./testout");
79            Operator::new(dest_service)?.finish()
80        }
81        DesiredOperator::Memory => {
82            let dest_service = Memory::default();
83            Operator::new(dest_service)?.finish()
84        }
85    };
86
87    // Create a LocalFileSystem to read from static/example
88    let fs_service = opendal::services::Fs::default().root(source_folder);
89    let source_store = Operator::new(fs_service)
90        .context("disk_to_mem_operator failed to create operator")?
91        .finish();
92
93    // Populate the in-memory store with contents from static/example
94    let store_loc = dest_prefix.unwrap_or_default();
95    crate::store::populate_store_from_store(&source_store, &dest_op, "", store_loc)
96        .await
97        .context("call to populate memory fs from object store")?;
98
99    Ok(dest_op)
100}
101
102pub async fn populate_store_from_store(
103    source_store: &Operator,
104    target_store: &Operator,
105    source_prefix: &str,
106    dest_prefix: &str,
107) -> Result<()> {
108    let mut lister = source_store
109        .lister_with(source_prefix)
110        .recursive(true)
111        .await
112        .context("lister call")?;
113    let mut list_result: Vec<opendal::Entry> = Vec::new();
114
115    while let Some(entry) = lister.try_next().await? {
116        if entry.path().ends_with("/") {
117            continue;
118        }
119        list_result.push(entry);
120    }
121
122    for entry in list_result {
123        // Print out the file we're writing:
124        let dest_object_path = format!("{}{}", dest_prefix, entry.path());
125        let source_object_path = entry.path();
126
127        // Get the object data
128        let bytes = source_store
129            .read(&source_object_path)
130            .await
131            .context(format!("read path {}", &source_object_path))?;
132
133        // Store in target with the same path
134        target_store
135            .write(&dest_object_path, bytes)
136            .await
137            .context("write")?;
138    }
139
140    Ok(())
141}
142
143/// Creates a memory-based OpenDAL operator from an include_dir bundle.
144///
145/// This function takes a bundled directory (created with include_dir!) and
146/// creates an in-memory OpenDAL operator containing all files from that bundle.
147/// This is useful for embedding migrations, templates, or other static files
148/// directly into the binary while still being able to use OpenDAL's interface.
149///
150/// # Arguments
151///
152/// * `included_dir` - A reference to a Dir created with include_dir! macro
153/// * `dest_prefix` - Optional prefix to add to all file paths in the operator
154///
155/// # Returns
156///
157/// A memory-based OpenDAL operator containing all files from the bundled directory
158pub async fn operator_from_includedir(
159    dir: &Dir<'_>,
160    dest_prefix: Option<&str>,
161) -> Result<Operator> {
162    // Create a memory operator
163    let dest_service = Memory::default();
164    let operator = Operator::new(dest_service)?.finish();
165
166    let prefix = dest_prefix.unwrap_or_default();
167
168    // First collect all file information
169    let mut files_to_write = Vec::new();
170    collect_files_from_dir(dir, "", &mut files_to_write);
171
172    // Then write all files to the operator
173    for (dest_path, contents) in &files_to_write {
174        let final_path = format!("{}{}", prefix, dest_path);
175        operator
176            .write(&final_path, contents.clone())
177            .await
178            .context(format!("Failed to write file {}", final_path))?;
179    }
180
181    Ok(operator)
182}
183
184// Helper function to recursively collect file information
185fn collect_files_from_dir(dir: &Dir<'_>, current_path: &str, files: &mut Vec<(String, Vec<u8>)>) {
186    for entry in dir.entries() {
187        match entry {
188            DirEntry::Dir(subdir) => {
189                let new_path = if current_path.is_empty() {
190                    subdir.path().to_string_lossy().to_string()
191                } else {
192                    format!(
193                        "{}/{}",
194                        current_path,
195                        subdir.path().file_name().unwrap().to_string_lossy()
196                    )
197                };
198                collect_files_from_dir(subdir, &new_path, files);
199            }
200            DirEntry::File(file) => {
201                let file_path = if current_path.is_empty() {
202                    file.path().to_string_lossy().to_string()
203                } else {
204                    format!(
205                        "{}/{}",
206                        current_path,
207                        file.path().file_name().unwrap().to_string_lossy()
208                    )
209                };
210                files.push((file_path, file.contents().to_vec()));
211            }
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::store::pinner::latest::Latest;
220    use include_dir::{include_dir, Dir};
221
222    // Create a test directory structure for testing
223    static TEST_DIR: Dir<'_> = include_dir!("./static");
224
225    #[tokio::test]
226    async fn test_operator_from_includedir_with_prefix() {
227        let result = operator_from_includedir(&TEST_DIR, Some("test-prefix/")).await;
228        assert!(
229            result.is_ok(),
230            "Should create operator with prefix successfully"
231        );
232    }
233
234    #[tokio::test]
235    async fn test_list_migrations_returns_two_migrations() {
236        // Load the two_migrations test folder into a memory operator
237        let op = disk_to_operator(
238            "./static/tests/two_migrations",
239            None,
240            DesiredOperator::Memory,
241        )
242        .await
243        .expect("Failed to create operator from disk");
244
245        // Create a pinner and FolderPather
246        let pinner = Latest::new("").expect("Failed to create Latest pinner");
247        let pather = FolderPather {
248            spawn_folder: "".to_string(),
249        };
250
251        // Create the Store
252        let store = Store::new(Box::new(pinner), op, pather).expect("Failed to create Store");
253
254        // Call list_migrations and verify the result
255        let migrations: Vec<String> = store
256            .list_migrations()
257            .await
258            .expect("Failed to list migrations");
259
260        // Should have exactly 2 migrations
261        assert_eq!(
262            migrations.len(),
263            2,
264            "Expected 2 migrations, got {:?}",
265            migrations
266        );
267
268        // Verify the migration names are present (they end with /)
269        let migration_names: Vec<&str> = migrations.iter().map(|s| s.as_str()).collect();
270        assert!(
271            migration_names
272                .iter()
273                .any(|m| m.contains("20240907212659-initial")),
274            "Expected to find 20240907212659-initial migration, got {:?}",
275            migration_names
276        );
277        assert!(
278            migration_names
279                .iter()
280                .any(|m| m.contains("20240908123456-second")),
281            "Expected to find 20240908123456-second migration, got {:?}",
282            migration_names
283        );
284    }
285}