spawn_db/store/pinner/
mod.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::TryStreamExt;
4use opendal::Operator;
5
6use serde::{Deserialize, Serialize};
7
8use std::fmt::Debug;
9use twox_hash::xxhash3_128;
10
11pub mod latest;
12pub mod spawn;
13
14#[async_trait]
15pub trait Pinner: Debug + Send + Sync {
16 async fn load_bytes(&self, name: &str, fs: &Operator) -> Result<Option<Vec<u8>>>;
17
18 async fn load(&self, name: &str, fs: &Operator) -> Result<Option<String>> {
19 match self.load_bytes(name, fs).await? {
20 Some(bytes) => Ok(Some(String::from_utf8(bytes)?)),
21 None => Ok(None),
22 }
23 }
24
25 async fn snapshot(&mut self, fs: &Operator) -> Result<String>;
26}
27
28#[derive(Debug, Default, Serialize, Deserialize)]
29pub struct Tree {
30 pub entries: Vec<Entry>,
31}
32
33#[derive(Debug, Deserialize, Serialize)]
34pub enum EntryKind {
35 Blob,
36 Tree,
37}
38
39#[derive(Debug, Deserialize, Serialize)]
40pub struct Entry {
41 pub kind: EntryKind,
42 pub hash: String,
43 pub name: String,
44}
45
46pub(crate) async fn pin_contents(
47 fs: &Operator,
48 store_path: &str,
49 contents: String,
50) -> Result<String> {
51 let hash = xxhash3_128::Hasher::oneshot(contents.as_bytes());
52 let hash = format!("{:032x}", hash);
53 let dir = format!("{}/{}", store_path, hash_to_path(&hash)?);
54
55 fs.write(&dir, contents).await?;
56
57 Ok(hash)
58}
59
60pub(crate) fn hash_to_path(hash: &str) -> Result<String> {
62 if hash.len() < 3 {
63 return Err(anyhow::anyhow!("Hash too short"));
64 }
65
66 let (first_two, rest) = hash.split_at(2);
67 Ok(format!("{}/{}", first_two, rest).to_string())
68}
69
70pub(crate) async fn read_hash_file(fs: &Operator, base_path: &str, hash: &str) -> Result<String> {
72 let relative_path = hash_to_path(hash)?;
73 let file_path = format!("{}/{}", base_path, relative_path);
74
75 let get_result = fs.read(&file_path).await?;
76 let bytes = get_result.to_bytes();
77 let contents = String::from_utf8(bytes.to_vec())?;
78
79 Ok(contents)
80}
81
82pub(crate) async fn snapshot(fs: &Operator, store_path: &str, mut prefix: &str) -> Result<String> {
85 let fixed;
86 if !prefix.ends_with("/") {
87 fixed = format!("{}/", prefix);
88 prefix = &fixed;
89 }
90
91 let mut fs_lister = fs.lister(prefix).await?;
92 let mut list_result: Vec<opendal::Entry> = Vec::new();
93 while let Some(entry) = fs_lister.try_next().await? {
94 if entry.path() == prefix {
95 continue;
96 }
97 list_result.push(entry);
98 }
99
100 let mut entries = Vec::new();
101
102 for entry in list_result {
103 match entry.path().ends_with("/") {
104 true => {
105 let branch = Box::pin(snapshot(fs, store_path, entry.path()))
107 .await
108 .context("failed to snapshot subfolder")?;
109 entries.push((
110 entry.name().to_string(),
111 Entry {
112 kind: EntryKind::Tree,
113 name: entry
114 .name()
115 .strip_suffix("/")
116 .unwrap_or(entry.name())
117 .to_string(),
118 hash: branch,
119 },
120 ));
121 }
122 false => {
123 let contents = fs.read(entry.path()).await?;
124 let contents = String::from_utf8(contents.to_bytes().to_vec())?;
125 let hash = pin_contents(fs, store_path, contents).await?;
126
127 entries.push((
128 entry.name().to_string(),
129 Entry {
130 kind: EntryKind::Blob,
131 name: entry.name().to_string(),
132 hash,
133 },
134 ));
135 }
136 }
137 }
138
139 let mut tree = Tree::default();
142 entries.sort_by(|a, b| a.0.cmp(&b.0));
143 tree.entries = entries.into_iter().map(|(_, entry)| entry).collect();
144
145 let contents = toml::to_string(&tree).unwrap();
146 let hash = pin_contents(fs, store_path, contents)
147 .await
148 .context("could not pin contents")?;
149
150 Ok(hash)
151}
152
153#[cfg(test)]
154mod tests {
155 use crate::store;
156
157 use super::*;
158
159 #[tokio::test]
160 async fn test_snapshot() -> Result<()> {
161 let dest_op =
162 store::disk_to_operator("./static/example", None, store::DesiredOperator::Memory)
163 .await?;
164
165 let store_loc = "store/";
166 let root = snapshot(&dest_op, store_loc, "components/").await?;
167
168 assert!(root.len() > 0);
169 assert_eq!("cb59728fefa959672ef3c8c9f0b6df95", root);
170
171 let root_content = read_hash_file(&dest_op, store_loc, &root).await?;
173
174 let content_hash = format!(
176 "{:032x}",
177 xxhash3_128::Hasher::oneshot(root_content.as_bytes())
178 );
179 assert_eq!(
180 root, content_hash,
181 "Snapshot hash should match content hash"
182 );
183
184 Ok(())
185 }
186}