spacetimedb/host/
disk_storage.rs1use async_trait::async_trait;
2use spacetimedb_lib::{hash_bytes, Hash};
3use std::io;
4use std::path::PathBuf;
5use tokio::fs;
6use tokio::io::AsyncWriteExt;
7
8use super::ExternalStorage;
9
10#[derive(Clone, Debug)]
12pub struct DiskStorage {
13 base: PathBuf,
14}
15
16impl DiskStorage {
17 pub async fn new(base: PathBuf) -> io::Result<Self> {
18 fs::create_dir_all(&base).await?;
19 Ok(Self { base })
20 }
21
22 fn object_path(&self, h: &Hash) -> PathBuf {
23 let hex = h.to_hex();
24 let (pre, suf) = hex.split_at(2);
25 let mut path = self.base.clone();
26 path.extend([pre, suf]);
27 path
28 }
29
30 #[tracing::instrument(level = "trace", skip(self))]
31 pub async fn get(&self, key: &Hash) -> io::Result<Option<Box<[u8]>>> {
32 let path = self.object_path(key);
33 match fs::read(path).await {
34 Ok(bytes) => {
35 let actual_hash = hash_bytes(&bytes);
36 if actual_hash == *key {
37 Ok(Some(bytes.into()))
38 } else {
39 log::warn!("hash mismatch: {actual_hash} stored at {key}");
40 if let Err(e) = self.prune(key).await {
41 log::warn!("prune error: {e}");
42 }
43 Ok(None)
44 }
45 }
46 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
47 Err(e) => Err(e),
48 }
49 }
50
51 #[tracing::instrument(level = "trace", skip(self, value))]
52 pub async fn put(&self, value: &[u8]) -> io::Result<Hash> {
53 let h = hash_bytes(value);
54 let path = self.object_path(&h);
55 fs::create_dir_all(path.parent().expect("object path must have a parent")).await?;
56
57 let ts = std::time::UNIX_EPOCH.elapsed().unwrap().as_nanos();
59 let tmp = path.with_extension(format!("tmp{ts}"));
60 {
61 let mut f = fs::File::options().write(true).create_new(true).open(&tmp).await?;
62 f.write_all(value).await?;
63 f.sync_data().await?;
64 }
65
66 fs::rename(tmp, path).await?;
67
68 Ok(h)
69 }
70
71 #[tracing::instrument(level = "trace", skip(self))]
72 pub async fn prune(&self, key: &Hash) -> anyhow::Result<()> {
73 Ok(fs::remove_file(self.object_path(key)).await?)
74 }
75}
76
77#[async_trait]
78impl ExternalStorage for DiskStorage {
79 async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
80 self.get(&program_hash).await.map_err(Into::into)
81 }
82}