use crate::error::{DbxError, DbxResult};
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::fs;
use std::path::{Path, PathBuf};
pub struct ErasureCodingStore {
base_dir: PathBuf,
k: usize,
m: usize,
}
impl ErasureCodingStore {
pub fn new<P: AsRef<Path>>(base_dir: P, k: u8, m: u8) -> Self {
let base_dir = base_dir.as_ref().to_path_buf();
fs::create_dir_all(&base_dir).unwrap_or_default();
Self {
base_dir,
k: k as usize,
m: m as usize,
}
}
pub fn k(&self) -> usize {
self.k
}
pub fn m(&self) -> usize {
self.m
}
pub fn encode(&self, data: &[u8]) -> DbxResult<Vec<Vec<u8>>> {
let rs = ReedSolomon::new(self.k, self.m).map_err(|e| DbxError::Storage(e.to_string()))?;
let chunk_size = data.len().div_ceil(self.k);
let mut shards = vec![vec![0u8; chunk_size]; self.k + self.m];
for (i, shard) in shards.iter_mut().take(self.k).enumerate() {
let start = i * chunk_size;
let end = std::cmp::min(start + chunk_size, data.len());
if start < data.len() {
let len = end - start;
shard[..len].copy_from_slice(&data[start..end]);
}
}
rs.encode(&mut shards)
.map_err(|e| DbxError::Storage(e.to_string()))?;
Ok(shards)
}
pub fn store_shard(&self, key: &str, shard_id: usize, data: &[u8]) -> DbxResult<()> {
let object_dir = self.base_dir.join(key);
fs::create_dir_all(&object_dir)?;
let shard_path = object_dir.join(format!("shard_{}.blk", shard_id));
fs::write(&shard_path, data)?;
Ok(())
}
pub fn fetch_shard(&self, key: &str, shard_id: usize) -> DbxResult<Option<Vec<u8>>> {
let shard_path = self
.base_dir
.join(key)
.join(format!("shard_{}.blk", shard_id));
if shard_path.exists() {
Ok(Some(fs::read(shard_path)?))
} else {
Ok(None)
}
}
pub fn store_shards(
&self,
key: &str,
shards: &[Vec<u8>],
original_len: usize,
) -> DbxResult<()> {
for (i, shard) in shards.iter().enumerate() {
self.store_shard(key, i, shard)?;
}
let object_dir = self.base_dir.join(key);
fs::write(
object_dir.join("metadata.json"),
format!("{{\"length\":{}}}", original_len),
)?;
Ok(())
}
pub fn encode_and_store(&self, key: &str, data: &[u8]) -> DbxResult<()> {
let shards = self.encode(data)?;
self.store_shards(key, &shards, data.len())
}
pub fn decode(
&self,
mut shards: Vec<Option<Vec<u8>>>,
original_len: usize,
) -> DbxResult<Vec<u8>> {
let rs = ReedSolomon::new(self.k, self.m).map_err(|e| DbxError::Storage(e.to_string()))?;
rs.reconstruct(&mut shards)
.map_err(|e| DbxError::Storage(e.to_string()))?;
let chunk_size = shards.iter().flatten().next().map(|s| s.len()).unwrap_or(0);
let max_len = chunk_size * self.k;
let capacity = std::cmp::min(original_len, max_len);
let mut output = Vec::with_capacity(capacity);
let mut bytes_written = 0;
for data in shards.into_iter().take(self.k).flatten() {
let remaining = original_len.saturating_sub(bytes_written);
let take = std::cmp::min(remaining, data.len());
output.extend_from_slice(&data[..take]);
bytes_written += take;
}
Ok(output)
}
pub fn retrieve_and_decode(&self, key: &str) -> DbxResult<Option<Vec<u8>>> {
let object_dir = self.base_dir.join(key);
if !object_dir.exists() {
return Ok(None);
}
let meta_str = fs::read_to_string(object_dir.join("metadata.json"))?;
let length_str = meta_str
.split(':')
.nth(1)
.unwrap_or("0")
.trim_end_matches('}')
.trim();
let original_len: usize = length_str.parse().unwrap_or(0);
let rs = ReedSolomon::new(self.k, self.m).map_err(|e| DbxError::Storage(e.to_string()))?;
let mut shards: Vec<Option<Vec<u8>>> = vec![None; self.k + self.m];
for (i, shard) in shards.iter_mut().enumerate().take(self.k + self.m) {
let shard_path = object_dir.join(format!("shard_{}.blk", i));
if let Ok(data) = fs::read(&shard_path) {
*shard = Some(data);
}
}
rs.reconstruct(&mut shards)
.map_err(|e| DbxError::Storage(e.to_string()))?;
let mut output = Vec::with_capacity(original_len);
let mut bytes_written = 0;
for data in shards.into_iter().take(self.k).flatten() {
let remaining = original_len - bytes_written;
let take = std::cmp::min(remaining, data.len());
output.extend_from_slice(&data[..take]);
bytes_written += take;
}
Ok(Some(output))
}
pub fn delete(&self, key: &str) -> DbxResult<bool> {
let object_dir = self.base_dir.join(key);
if object_dir.exists() {
fs::remove_dir_all(object_dir)?;
Ok(true)
} else {
Ok(false)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_erasure_coding_store() {
let dir = tempdir().unwrap();
let store = ErasureCodingStore::new(dir.path(), 4, 2);
let key = "test_object_1";
let data = b"Hello World Erasure Coding Testing!";
store.encode_and_store(key, data).unwrap();
let obj_dir = dir.path().join(key);
fs::remove_file(obj_dir.join("shard_0.blk")).unwrap();
fs::remove_file(obj_dir.join("shard_3.blk")).unwrap();
let recovered = store.retrieve_and_decode(key).unwrap().unwrap();
assert_eq!(recovered, data);
}
}