spawn_db/store/pinner/
mod.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::TryStreamExt;
4use opendal::Operator;
5
6use serde::{Deserialize, Serialize};
7
8use std::fmt::Debug;
9use twox_hash::xxhash3_128;
10
11pub mod latest;
12pub mod spawn;
13
14#[async_trait]
15pub trait Pinner: Debug + Send + Sync {
16 async fn load(&self, name: &str, fs: &Operator) -> Result<Option<String>>;
17 async fn snapshot(&mut self, fs: &Operator) -> Result<String>;
18}
19
20#[derive(Debug, Default, Serialize, Deserialize)]
21pub struct Tree {
22 pub entries: Vec<Entry>,
23}
24
25#[derive(Debug, Deserialize, Serialize)]
26pub enum EntryKind {
27 Blob,
28 Tree,
29}
30
31#[derive(Debug, Deserialize, Serialize)]
32pub struct Entry {
33 pub kind: EntryKind,
34 pub hash: String,
35 pub name: String,
36}
37
38pub(crate) async fn pin_contents(
39 fs: &Operator,
40 store_path: &str,
41 contents: String,
42) -> Result<String> {
43 let hash = xxhash3_128::Hasher::oneshot(contents.as_bytes());
44 let hash = format!("{:032x}", hash);
45 let dir = format!("{}/{}", store_path, hash_to_path(&hash)?);
46
47 fs.write(&dir, contents).await?;
48
49 Ok(hash)
50}
51
52pub(crate) fn hash_to_path(hash: &str) -> Result<String> {
54 if hash.len() < 3 {
55 return Err(anyhow::anyhow!("Hash too short"));
56 }
57
58 let (first_two, rest) = hash.split_at(2);
59 Ok(format!("{}/{}", first_two, rest).to_string())
60}
61
62pub(crate) async fn read_hash_file(fs: &Operator, base_path: &str, hash: &str) -> Result<String> {
64 let relative_path = hash_to_path(hash)?;
65 let file_path = format!("{}/{}", base_path, relative_path);
66
67 let get_result = fs.read(&file_path).await?;
68 let bytes = get_result.to_bytes();
69 let contents = String::from_utf8(bytes.to_vec())?;
70
71 Ok(contents)
72}
73
74pub(crate) async fn snapshot(fs: &Operator, store_path: &str, mut prefix: &str) -> Result<String> {
77 let fixed;
78 if !prefix.ends_with("/") {
79 fixed = format!("{}/", prefix);
80 prefix = &fixed;
81 }
82
83 let mut fs_lister = fs.lister(prefix).await?;
84 let mut list_result: Vec<opendal::Entry> = Vec::new();
85 while let Some(entry) = fs_lister.try_next().await? {
86 if entry.path() == prefix {
87 continue;
88 }
89 list_result.push(entry);
90 }
91
92 let mut entries = Vec::new();
93
94 for entry in list_result {
95 match entry.path().ends_with("/") {
96 true => {
97 let branch = Box::pin(snapshot(fs, store_path, entry.path()))
99 .await
100 .context("failed to snapshot subfolder")?;
101 entries.push((
102 entry.name().to_string(),
103 Entry {
104 kind: EntryKind::Tree,
105 name: entry
106 .name()
107 .strip_suffix("/")
108 .unwrap_or(entry.name())
109 .to_string(),
110 hash: branch,
111 },
112 ));
113 }
114 false => {
115 let contents = fs.read(entry.path()).await?;
116 let contents = String::from_utf8(contents.to_bytes().to_vec())?;
117 let hash = pin_contents(fs, store_path, contents).await?;
118
119 entries.push((
120 entry.name().to_string(),
121 Entry {
122 kind: EntryKind::Blob,
123 name: entry.name().to_string(),
124 hash,
125 },
126 ));
127 }
128 }
129 }
130
131 let mut tree = Tree::default();
134 entries.sort_by(|a, b| a.0.cmp(&b.0));
135 tree.entries = entries.into_iter().map(|(_, entry)| entry).collect();
136
137 let contents = toml::to_string(&tree).unwrap();
138 let hash = pin_contents(fs, store_path, contents)
139 .await
140 .context("could not pin contents")?;
141
142 Ok(hash)
143}
144
145#[cfg(test)]
146mod tests {
147 use crate::store;
148
149 use super::*;
150
151 #[tokio::test]
152 async fn test_snapshot() -> Result<()> {
153 let dest_op =
154 store::disk_to_operator("./static/example", None, store::DesiredOperator::Memory)
155 .await?;
156
157 let store_loc = "store/";
158 let root = snapshot(&dest_op, store_loc, "components/").await?;
159
160 assert!(root.len() > 0);
161 assert_eq!("cb59728fefa959672ef3c8c9f0b6df95", root);
162
163 let root_content = read_hash_file(&dest_op, store_loc, &root).await?;
165
166 let content_hash = format!(
168 "{:032x}",
169 xxhash3_128::Hasher::oneshot(root_content.as_bytes())
170 );
171 assert_eq!(
172 root, content_hash,
173 "Snapshot hash should match content hash"
174 );
175
176 Ok(())
177 }
178}