Skip to main content

spawn_db/store/pinner/
mod.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::TryStreamExt;
4use opendal::Operator;
5
6use serde::{Deserialize, Serialize};
7
8use std::fmt::Debug;
9use twox_hash::xxhash3_128;
10
11pub mod latest;
12pub mod spawn;
13
14#[async_trait]
15pub trait Pinner: Debug + Send + Sync {
16    async fn load(&self, name: &str, fs: &Operator) -> Result<Option<String>>;
17    async fn snapshot(&mut self, fs: &Operator) -> Result<String>;
18}
19
20#[derive(Debug, Default, Serialize, Deserialize)]
21pub struct Tree {
22    pub entries: Vec<Entry>,
23}
24
25#[derive(Debug, Deserialize, Serialize)]
26pub enum EntryKind {
27    Blob,
28    Tree,
29}
30
31#[derive(Debug, Deserialize, Serialize)]
32pub struct Entry {
33    pub kind: EntryKind,
34    pub hash: String,
35    pub name: String,
36}
37
38pub(crate) async fn pin_contents(
39    fs: &Operator,
40    store_path: &str,
41    contents: String,
42) -> Result<String> {
43    let hash = xxhash3_128::Hasher::oneshot(contents.as_bytes());
44    let hash = format!("{:032x}", hash);
45    let dir = format!("{}/{}", store_path, hash_to_path(&hash)?);
46
47    fs.write(&dir, contents).await?;
48
49    Ok(hash)
50}
51
52/// Converts a hash string into a relative path like `c6/b8e869fa533155bbf2f0dd8fda9c68`.
53pub(crate) fn hash_to_path(hash: &str) -> Result<String> {
54    if hash.len() < 3 {
55        return Err(anyhow::anyhow!("Hash too short"));
56    }
57
58    let (first_two, rest) = hash.split_at(2);
59    Ok(format!("{}/{}", first_two, rest).to_string())
60}
61
62/// Reads the file corresponding to the hash from the given base path.
63pub(crate) async fn read_hash_file(fs: &Operator, base_path: &str, hash: &str) -> Result<String> {
64    let relative_path = hash_to_path(hash)?;
65    let file_path = format!("{}/{}", base_path, relative_path);
66
67    let get_result = fs.read(&file_path).await?;
68    let bytes = get_result.to_bytes();
69    let contents = String::from_utf8(bytes.to_vec())?;
70
71    Ok(contents)
72}
73
74/// Walks through objects in an ObjectStore, creating pinned entries as appropriate for every
75/// directory and file.  Returns a hash of the object.
76pub(crate) async fn snapshot(fs: &Operator, store_path: &str, mut prefix: &str) -> Result<String> {
77    let fixed;
78    if !prefix.ends_with("/") {
79        fixed = format!("{}/", prefix);
80        prefix = &fixed;
81    }
82
83    let mut fs_lister = fs.lister(prefix).await?;
84    let mut list_result: Vec<opendal::Entry> = Vec::new();
85    while let Some(entry) = fs_lister.try_next().await? {
86        if entry.path() == prefix {
87            continue;
88        }
89        list_result.push(entry);
90    }
91
92    let mut entries = Vec::new();
93
94    for entry in list_result {
95        match entry.path().ends_with("/") {
96            true => {
97                // Folder
98                let branch = Box::pin(snapshot(fs, store_path, entry.path()))
99                    .await
100                    .context("failed to snapshot subfolder")?;
101                entries.push((
102                    entry.name().to_string(),
103                    Entry {
104                        kind: EntryKind::Tree,
105                        name: entry
106                            .name()
107                            .strip_suffix("/")
108                            .unwrap_or(entry.name())
109                            .to_string(),
110                        hash: branch,
111                    },
112                ));
113            }
114            false => {
115                let contents = fs.read(entry.path()).await?;
116                let contents = String::from_utf8(contents.to_bytes().to_vec())?;
117                let hash = pin_contents(fs, store_path, contents).await?;
118
119                entries.push((
120                    entry.name().to_string(),
121                    Entry {
122                        kind: EntryKind::Blob,
123                        name: entry.name().to_string(),
124                        hash,
125                    },
126                ));
127            }
128        }
129    }
130
131    // Sort entries by name for consistent ordering, and then return a hash for
132    // this node.
133    let mut tree = Tree::default();
134    entries.sort_by(|a, b| a.0.cmp(&b.0));
135    tree.entries = entries.into_iter().map(|(_, entry)| entry).collect();
136
137    let contents = toml::to_string(&tree).unwrap();
138    let hash = pin_contents(fs, store_path, contents)
139        .await
140        .context("could not pin contents")?;
141
142    Ok(hash)
143}
144
145#[cfg(test)]
146mod tests {
147    use crate::store;
148
149    use super::*;
150
151    #[tokio::test]
152    async fn test_snapshot() -> Result<()> {
153        let dest_op =
154            store::disk_to_operator("./static/example", None, store::DesiredOperator::Memory)
155                .await?;
156
157        let store_loc = "store/";
158        let root = snapshot(&dest_op, store_loc, "components/").await?;
159
160        assert!(root.len() > 0);
161        assert_eq!("cb59728fefa959672ef3c8c9f0b6df95", root);
162
163        // Read and print the root level file
164        let root_content = read_hash_file(&dest_op, store_loc, &root).await?;
165
166        // Verify that the hash of the root content matches the snapshot hash
167        let content_hash = format!(
168            "{:032x}",
169            xxhash3_128::Hasher::oneshot(root_content.as_bytes())
170        );
171        assert_eq!(
172            root, content_hash,
173            "Snapshot hash should match content hash"
174        );
175
176        Ok(())
177    }
178}