Skip to main content

spawn_db/store/pinner/
mod.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::TryStreamExt;
4use opendal::Operator;
5
6use serde::{Deserialize, Serialize};
7
8use std::fmt::Debug;
9use twox_hash::xxhash3_128;
10
11pub mod latest;
12pub mod spawn;
13
14#[async_trait]
15pub trait Pinner: Debug + Send + Sync {
16    async fn load_bytes(&self, name: &str, fs: &Operator) -> Result<Option<Vec<u8>>>;
17
18    async fn load(&self, name: &str, fs: &Operator) -> Result<Option<String>> {
19        match self.load_bytes(name, fs).await? {
20            Some(bytes) => Ok(Some(String::from_utf8(bytes)?)),
21            None => Ok(None),
22        }
23    }
24
25    async fn snapshot(&mut self, fs: &Operator) -> Result<String>;
26}
27
28#[derive(Debug, Default, Serialize, Deserialize)]
29pub struct Tree {
30    pub entries: Vec<Entry>,
31}
32
33#[derive(Debug, Deserialize, Serialize)]
34pub enum EntryKind {
35    Blob,
36    Tree,
37}
38
39#[derive(Debug, Deserialize, Serialize)]
40pub struct Entry {
41    pub kind: EntryKind,
42    pub hash: String,
43    pub name: String,
44}
45
46pub(crate) async fn pin_contents(
47    fs: &Operator,
48    store_path: &str,
49    contents: String,
50) -> Result<String> {
51    let hash = xxhash3_128::Hasher::oneshot(contents.as_bytes());
52    let hash = format!("{:032x}", hash);
53    let dir = format!("{}/{}", store_path, hash_to_path(&hash)?);
54
55    fs.write(&dir, contents).await?;
56
57    Ok(hash)
58}
59
60/// Converts a hash string into a relative path like `c6/b8e869fa533155bbf2f0dd8fda9c68`.
61pub(crate) fn hash_to_path(hash: &str) -> Result<String> {
62    if hash.len() < 3 {
63        return Err(anyhow::anyhow!("Hash too short"));
64    }
65
66    let (first_two, rest) = hash.split_at(2);
67    Ok(format!("{}/{}", first_two, rest).to_string())
68}
69
70/// Reads the file corresponding to the hash from the given base path.
71pub(crate) async fn read_hash_file(fs: &Operator, base_path: &str, hash: &str) -> Result<String> {
72    let relative_path = hash_to_path(hash)?;
73    let file_path = format!("{}/{}", base_path, relative_path);
74
75    let get_result = fs.read(&file_path).await?;
76    let bytes = get_result.to_bytes();
77    let contents = String::from_utf8(bytes.to_vec())?;
78
79    Ok(contents)
80}
81
82/// Walks through objects in an ObjectStore, creating pinned entries as appropriate for every
83/// directory and file.  Returns a hash of the object.
84pub(crate) async fn snapshot(fs: &Operator, store_path: &str, mut prefix: &str) -> Result<String> {
85    let fixed;
86    if !prefix.ends_with("/") {
87        fixed = format!("{}/", prefix);
88        prefix = &fixed;
89    }
90
91    let mut fs_lister = fs.lister(prefix).await?;
92    let mut list_result: Vec<opendal::Entry> = Vec::new();
93    while let Some(entry) = fs_lister.try_next().await? {
94        if entry.path() == prefix {
95            continue;
96        }
97        list_result.push(entry);
98    }
99
100    let mut entries = Vec::new();
101
102    for entry in list_result {
103        match entry.path().ends_with("/") {
104            true => {
105                // Folder
106                let branch = Box::pin(snapshot(fs, store_path, entry.path()))
107                    .await
108                    .context("failed to snapshot subfolder")?;
109                entries.push((
110                    entry.name().to_string(),
111                    Entry {
112                        kind: EntryKind::Tree,
113                        name: entry
114                            .name()
115                            .strip_suffix("/")
116                            .unwrap_or(entry.name())
117                            .to_string(),
118                        hash: branch,
119                    },
120                ));
121            }
122            false => {
123                let contents = fs.read(entry.path()).await?;
124                let contents = String::from_utf8(contents.to_bytes().to_vec())?;
125                let hash = pin_contents(fs, store_path, contents).await?;
126
127                entries.push((
128                    entry.name().to_string(),
129                    Entry {
130                        kind: EntryKind::Blob,
131                        name: entry.name().to_string(),
132                        hash,
133                    },
134                ));
135            }
136        }
137    }
138
139    // Sort entries by name for consistent ordering, and then return a hash for
140    // this node.
141    let mut tree = Tree::default();
142    entries.sort_by(|a, b| a.0.cmp(&b.0));
143    tree.entries = entries.into_iter().map(|(_, entry)| entry).collect();
144
145    let contents = toml::to_string(&tree).unwrap();
146    let hash = pin_contents(fs, store_path, contents)
147        .await
148        .context("could not pin contents")?;
149
150    Ok(hash)
151}
152
153#[cfg(test)]
154mod tests {
155    use crate::store;
156
157    use super::*;
158
159    #[tokio::test]
160    async fn test_snapshot() -> Result<()> {
161        let dest_op =
162            store::disk_to_operator("./static/example", None, store::DesiredOperator::Memory)
163                .await?;
164
165        let store_loc = "store/";
166        let root = snapshot(&dest_op, store_loc, "components/").await?;
167
168        assert!(root.len() > 0);
169        assert_eq!("cb59728fefa959672ef3c8c9f0b6df95", root);
170
171        // Read and print the root level file
172        let root_content = read_hash_file(&dest_op, store_loc, &root).await?;
173
174        // Verify that the hash of the root content matches the snapshot hash
175        let content_hash = format!(
176            "{:032x}",
177            xxhash3_128::Hasher::oneshot(root_content.as_bytes())
178        );
179        assert_eq!(
180            root, content_hash,
181            "Snapshot hash should match content hash"
182        );
183
184        Ok(())
185    }
186}