spacetimedb/host/
disk_storage.rs

1use async_trait::async_trait;
2use spacetimedb_lib::{hash_bytes, Hash};
3use std::io;
4use std::path::PathBuf;
5use tokio::fs;
6use tokio::io::AsyncWriteExt;
7
8use super::ExternalStorage;
9
10/// A simple [`ExternalStorage`] that stores programs in the filesystem.
11#[derive(Clone, Debug)]
12pub struct DiskStorage {
13    base: PathBuf,
14}
15
16impl DiskStorage {
17    pub async fn new(base: PathBuf) -> io::Result<Self> {
18        fs::create_dir_all(&base).await?;
19        Ok(Self { base })
20    }
21
22    fn object_path(&self, h: &Hash) -> PathBuf {
23        let hex = h.to_hex();
24        let (pre, suf) = hex.split_at(2);
25        let mut path = self.base.clone();
26        path.extend([pre, suf]);
27        path
28    }
29
30    #[tracing::instrument(level = "trace", skip(self))]
31    pub async fn get(&self, key: &Hash) -> io::Result<Option<Box<[u8]>>> {
32        let path = self.object_path(key);
33        match fs::read(path).await {
34            Ok(bytes) => {
35                let actual_hash = hash_bytes(&bytes);
36                if actual_hash == *key {
37                    Ok(Some(bytes.into()))
38                } else {
39                    log::warn!("hash mismatch: {actual_hash} stored at {key}");
40                    if let Err(e) = self.prune(key).await {
41                        log::warn!("prune error: {e}");
42                    }
43                    Ok(None)
44                }
45            }
46            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
47            Err(e) => Err(e),
48        }
49    }
50
51    #[tracing::instrument(level = "trace", skip(self, value))]
52    pub async fn put(&self, value: &[u8]) -> io::Result<Hash> {
53        let h = hash_bytes(value);
54        let path = self.object_path(&h);
55        fs::create_dir_all(path.parent().expect("object path must have a parent")).await?;
56
57        // to ensure it doesn't conflict with a concurrent call to put() - suffix with nanosecond timestamp
58        let ts = std::time::UNIX_EPOCH.elapsed().unwrap().as_nanos();
59        let tmp = path.with_extension(format!("tmp{ts}"));
60        {
61            let mut f = fs::File::options().write(true).create_new(true).open(&tmp).await?;
62            f.write_all(value).await?;
63            f.sync_data().await?;
64        }
65
66        fs::rename(tmp, path).await?;
67
68        Ok(h)
69    }
70
71    #[tracing::instrument(level = "trace", skip(self))]
72    pub async fn prune(&self, key: &Hash) -> anyhow::Result<()> {
73        Ok(fs::remove_file(self.object_path(key)).await?)
74    }
75}
76
77#[async_trait]
78impl ExternalStorage for DiskStorage {
79    async fn lookup(&self, program_hash: Hash) -> anyhow::Result<Option<Box<[u8]>>> {
80        self.get(&program_hash).await.map_err(Into::into)
81    }
82}