use crate::api::paths;
use crate::error::{Error, Result};
use crate::merkle::{EntryBundle, HashTile};
use crate::types::{PartialSize, TileIndex, TileLevel};
use opendal::{services::Fs, services::S3, Operator};
use std::sync::Arc;
#[derive(Clone)]
pub struct TileStorage {
op: Arc<Operator>,
}
impl TileStorage {
pub fn new_s3(
endpoint: &str,
bucket: &str,
access_key: &str,
secret_key: &str,
region: &str,
) -> Result<Self> {
let mut builder = S3::default()
.endpoint(endpoint)
.bucket(bucket)
.access_key_id(access_key)
.secret_access_key(secret_key)
.region(region);
builder = builder.disable_config_load();
let op = Operator::new(builder)?.finish();
Ok(Self { op: Arc::new(op) })
}
pub fn new_fs(root: &str) -> Result<Self> {
let builder = Fs::default().root(root);
let op = Operator::new(builder)?.finish();
Ok(Self { op: Arc::new(op) })
}
pub fn new(op: Operator) -> Self {
Self { op: Arc::new(op) }
}
pub async fn read_checkpoint(&self) -> Result<Option<CheckpointData>> {
match self.op.read(paths::CHECKPOINT_PATH).await {
Ok(data) => Ok(Some(CheckpointData::new(data.to_vec()))),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
pub async fn write_checkpoint(&self, data: &CheckpointData) -> Result<()> {
self.op
.write(paths::CHECKPOINT_PATH, data.as_bytes().to_vec())
.await
.map_err(Into::into)
}
pub async fn read_tile(
&self,
level: TileLevel,
index: TileIndex,
partial: PartialSize,
) -> Result<Option<HashTile>> {
let path = paths::tile_path(level.value(), index.value(), partial.value());
match self.op.read(&path).await {
Ok(data) => {
let bytes = data.to_vec();
let tile = HashTile::from_bytes(&bytes)?;
Ok(Some(tile))
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
pub async fn write_tile(
&self,
level: TileLevel,
index: TileIndex,
partial: PartialSize,
tile: &HashTile,
) -> Result<()> {
let path = paths::tile_path(level.value(), index.value(), partial.value());
let data = tile.to_bytes();
self.op.write(&path, data).await.map_err(Into::into)
}
pub async fn read_entry_bundle(
&self,
index: TileIndex,
partial: PartialSize,
) -> Result<Option<EntryBundle>> {
let path = paths::entries_path(index.value(), partial.value());
match self.op.read(&path).await {
Ok(data) => {
let bytes = data.to_vec();
let bundle = EntryBundle::from_bytes(&bytes)?;
Ok(Some(bundle))
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
pub async fn write_entry_bundle(
&self,
index: TileIndex,
partial: PartialSize,
bundle: &EntryBundle,
) -> Result<()> {
let path = paths::entries_path(index.value(), partial.value());
let data = bundle.to_bytes();
self.op.write(&path, data).await.map_err(Into::into)
}
pub async fn read_raw(&self, path: &str) -> Result<Option<Vec<u8>>> {
match self.op.read(path).await {
Ok(data) => Ok(Some(data.to_vec())),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug, Clone)]
pub struct CheckpointData(Vec<u8>);
impl CheckpointData {
pub fn new(data: Vec<u8>) -> Self {
Self(data)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
pub fn into_bytes(self) -> Vec<u8> {
self.0
}
pub fn as_str(&self) -> Result<&str> {
std::str::from_utf8(&self.0).map_err(|e| Error::InvalidEntry(e.to_string()))
}
}
impl From<Vec<u8>> for CheckpointData {
fn from(data: Vec<u8>) -> Self {
Self::new(data)
}
}
impl From<String> for CheckpointData {
fn from(s: String) -> Self {
Self::new(s.into_bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
use opendal::services::Memory;
async fn create_test_storage() -> TileStorage {
let builder = Memory::default();
let op = Operator::new(builder).unwrap().finish();
TileStorage::new(op)
}
#[tokio::test]
async fn test_checkpoint_roundtrip() {
let storage = create_test_storage().await;
assert!(storage.read_checkpoint().await.unwrap().is_none());
let data = CheckpointData::from("test checkpoint\n".to_string());
storage.write_checkpoint(&data).await.unwrap();
let read = storage.read_checkpoint().await.unwrap().unwrap();
assert_eq!(read.as_str().unwrap(), "test checkpoint\n");
}
#[tokio::test]
async fn test_tile_roundtrip() {
use sigstore_types::Sha256Hash;
let storage = create_test_storage().await;
let level = TileLevel::new(0);
let index = TileIndex::new(0);
let partial = PartialSize::full();
assert!(storage
.read_tile(level, index, partial)
.await
.unwrap()
.is_none());
let nodes = vec![
Sha256Hash::from_bytes([1u8; 32]),
Sha256Hash::from_bytes([2u8; 32]),
];
let tile = HashTile::with_nodes(nodes.clone());
storage
.write_tile(level, index, partial, &tile)
.await
.unwrap();
let read = storage
.read_tile(level, index, partial)
.await
.unwrap()
.unwrap();
assert_eq!(read.nodes, nodes);
}
#[tokio::test]
async fn test_entry_bundle_roundtrip() {
use crate::types::EntryData;
let storage = create_test_storage().await;
let index = TileIndex::new(0);
let partial = PartialSize::new(2);
assert!(storage
.read_entry_bundle(index, partial)
.await
.unwrap()
.is_none());
let entries = vec![EntryData::from("entry 1"), EntryData::from("entry 2")];
let bundle = EntryBundle::with_entries(entries);
storage
.write_entry_bundle(index, partial, &bundle)
.await
.unwrap();
let read = storage
.read_entry_bundle(index, partial)
.await
.unwrap()
.unwrap();
assert_eq!(read.entries.len(), 2);
assert_eq!(read.entries[0].as_bytes(), b"entry 1");
assert_eq!(read.entries[1].as_bytes(), b"entry 2");
}
}