use crate::core::traits::BraidStorage;
use crate::core::types::Bytes;
use crate::core::{Result, Version};
use async_trait::async_trait;
#[cfg(feature = "native")]
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
#[cfg(feature = "native")]
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(feature = "native")]
use tokio::fs;
use tokio::sync::broadcast;
#[cfg(feature = "native")]
use tokio::sync::Mutex;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlobMetadata {
pub key: String,
pub version: Vec<Version>,
pub content_type: Option<String>,
pub parents: Vec<Version>,
#[serde(default)]
pub content_hash: Option<String>,
#[serde(default)]
pub size: Option<u64>,
}
#[derive(Clone, Debug)]
pub enum StoreEvent {
Put {
meta: BlobMetadata,
data: Bytes,
},
Delete {
key: String,
version: Vec<Version>,
content_type: Option<String>,
},
}
#[derive(Clone, Debug)]
#[cfg(feature = "native")]
pub struct BlobStore {
db_path: PathBuf,
meta_db_path: PathBuf,
meta_conn: Arc<Mutex<Connection>>,
tx: broadcast::Sender<StoreEvent>,
}
#[cfg(feature = "native")]
impl BlobStore {
pub async fn new(db_path: PathBuf, meta_db_path: PathBuf) -> Result<Self> {
fs::create_dir_all(&db_path).await?;
if let Some(parent) = meta_db_path.parent() {
fs::create_dir_all(parent).await?;
}
let conn = Connection::open(&meta_db_path)
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value JSON
)",
[],
)
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
let (tx, _) = broadcast::channel(100);
Ok(Self {
db_path,
meta_db_path,
meta_conn: Arc::new(Mutex::new(conn)),
tx,
})
}
pub fn subscribe(&self) -> broadcast::Receiver<StoreEvent> {
self.tx.subscribe()
}
pub async fn get(&self, key: &str) -> Result<Option<(Bytes, BlobMetadata)>> {
let meta = {
let conn = self.meta_conn.lock().await;
let mut stmt = conn
.prepare("SELECT value FROM meta WHERE key = ?")
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
let mut rows = stmt
.query(params![key])
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
if let Some(row) = rows
.next()
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?
{
let value_str: String = row
.get(0)
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
serde_json::from_str::<BlobMetadata>(&value_str)?
} else {
return Ok(None);
}
};
let file_path = self.get_file_path(key);
if fs::try_exists(&file_path).await? {
let data = fs::read(&file_path).await?;
Ok(Some((Bytes::from(data), meta)))
} else {
Ok(None)
}
}
pub async fn get_meta(&self, key: &str) -> Result<Option<BlobMetadata>> {
let conn = self.meta_conn.lock().await;
let mut stmt = conn
.prepare("SELECT value FROM meta WHERE key = ?")
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
let mut rows = stmt
.query(params![key])
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
if let Some(row) = rows
.next()
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?
{
let value_str: String = row
.get(0)
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
Ok(Some(serde_json::from_str::<BlobMetadata>(&value_str)?))
} else {
Ok(None)
}
}
pub async fn put(
&self,
key: &str,
data: Bytes,
version: Vec<Version>,
parents: Vec<Version>,
content_type: Option<String>,
) -> Result<Vec<Version>> {
let current_meta = self.get_meta(key).await?;
let new_ver_str = version.first().map(|v| v.to_string()).unwrap_or_default();
if let Some(meta) = ¤t_meta {
let current_ver_str = meta
.version
.first()
.map(|v| v.to_string())
.unwrap_or_default();
if compare_versions(&new_ver_str, ¤t_ver_str) <= 0 {
return Ok(meta.version.clone());
}
}
let mut hasher = Sha256::new();
hasher.update(&data);
let hash_bytes = hasher.finalize();
let content_hash = format!("{:x}", hash_bytes);
let new_meta = BlobMetadata {
key: key.to_string(),
version: version.clone(),
content_type,
parents,
content_hash: Some(content_hash.clone()),
size: Some(data.len() as u64),
};
let file_path = self.get_file_path(key);
fs::write(&file_path, &data).await?;
{
let conn = self.meta_conn.lock().await;
let val_str = serde_json::to_string(&new_meta)?;
conn.execute(
"INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
params![key, val_str],
)
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
}
let _ = self.tx.send(StoreEvent::Put {
meta: new_meta.clone(),
data: data.clone(),
});
Ok(version)
}
pub async fn delete(&self, key: &str) -> Result<()> {
let current_meta = self.get_meta(key).await?;
{
let conn = self.meta_conn.lock().await;
conn.execute("DELETE FROM meta WHERE key = ?", params![key])
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
}
let file_path = self.get_file_path(key);
if fs::try_exists(&file_path).await? {
fs::remove_file(&file_path).await?;
}
if let Some(meta) = current_meta {
let _ = self.tx.send(StoreEvent::Delete {
key: key.to_string(),
version: meta.version,
content_type: meta.content_type,
});
} else {
let _ = self.tx.send(StoreEvent::Delete {
key: key.to_string(),
version: vec![],
content_type: None,
});
}
Ok(())
}
fn get_file_path(&self, key: &str) -> PathBuf {
self.db_path.join(encode_filename(key))
}
}
#[cfg(feature = "native")]
#[async_trait]
impl BraidStorage for BlobStore {
async fn put(&self, key: &str, data: crate::core::types::Bytes, meta: String) -> Result<()> {
let metadata: BlobMetadata = serde_json::from_str(&meta)?;
self.put(
key,
data,
metadata.version,
metadata.parents,
metadata.content_type,
)
.await
.map(|_| ())
}
async fn get(&self, key: &str) -> Result<Option<(crate::core::types::Bytes, String)>> {
if let Some((data, meta)) = self.get(key).await? {
let meta_str = serde_json::to_string(&meta)?;
Ok(Some((data, meta_str)))
} else {
Ok(None)
}
}
async fn delete(&self, key: &str) -> Result<()> {
self.delete(key).await
}
async fn list_keys(&self) -> Result<Vec<String>> {
let conn = self.meta_conn.lock().await;
let mut stmt = conn
.prepare("SELECT key FROM meta")
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
let rows = stmt
.query_map([], |row| row.get(0))
.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
let mut keys = Vec::new();
for key in rows {
keys.push(key.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?);
}
Ok(keys)
}
}
fn compare_versions(a: &str, b: &str) -> i32 {
let seq_a = get_event_seq(a);
let seq_b = get_event_seq(b);
let c = compare_seqs(seq_a, seq_b);
if c != 0 {
return c;
}
if a < b {
-1
} else if a > b {
1
} else {
0
}
}
fn get_event_seq(e: &str) -> &str {
if e.is_empty() {
return "";
}
if let Some(idx) = e.rfind('-') {
&e[idx + 1..]
} else {
e
}
}
fn compare_seqs(a: &str, b: &str) -> i32 {
if a.len() != b.len() {
return (a.len() as i32) - (b.len() as i32);
}
if a < b {
-1
} else if a > b {
1
} else {
0
}
}
pub fn encode_filename(s: &str) -> String {
let bits: String = s
.chars()
.filter(|c| c.is_alphabetic())
.map(|c| if c.is_uppercase() { "1" } else { "0" })
.collect();
let postfix = if bits.is_empty() {
"0".to_string()
} else {
bits_to_hex(&bits)
};
let mut s = s
.chars()
.map(|c| match c {
'/' => '!',
'!' => '/',
_ => c,
})
.collect::<String>();
let mut encoded = String::new();
for c in s.chars() {
if matches!(
c,
'<' | '>' | ':' | '"' | '/' | '|' | '\\' | '?' | '*' | '%' | '\x00'..='\x1f' | '\x7f'
) {
encoded.push_str(&format!("%{:02X}", c as u8));
} else {
encoded.push(c);
}
}
s = encoded;
let is_reserved = {
let lower = s.to_lowercase();
let name_part = lower.split('.').next().unwrap_or("");
matches!(name_part, "con" | "prn" | "aux" | "nul")
|| (name_part.len() == 4
&& name_part.starts_with("com")
&& name_part
.chars()
.nth(3)
.map_or(false, |c| c.is_ascii_digit() && c != '0'))
|| (name_part.len() == 4
&& name_part.starts_with("lpt")
&& name_part
.chars()
.nth(3)
.map_or(false, |c| c.is_ascii_digit() && c != '0'))
};
if is_reserved {
if s.len() >= 3 {
let char_at_2 = s.chars().nth(2).unwrap();
let encoded_char = format!("%{:02X}", char_at_2 as u8);
let mut chars: Vec<char> = s.chars().collect();
let prefix: String = chars.iter().take(2).collect();
let suffix: String = chars.iter().skip(3).collect();
s = format!("{}{}{}", prefix, encoded_char, suffix);
}
}
format!("{}.{}", s, postfix)
}
fn bits_to_hex(bits: &str) -> String {
if bits.is_empty() {
return "0".to_string();
}
let rem = bits.len() % 4;
let padded = if rem == 0 {
bits.to_string()
} else {
format!("{}{}", "0".repeat(4 - rem), bits)
};
let mut hex = String::new();
for chunk in padded.as_bytes().chunks(4) {
let chunk_str = std::str::from_utf8(chunk).unwrap();
let val = u8::from_str_radix(chunk_str, 2).unwrap();
hex.push_str(&format!("{:x}", val));
}
let trimmed = hex.trim_start_matches('0');
if trimmed.is_empty() {
"0".to_string()
} else {
trimmed.to_string()
}
}
pub fn decode_filename(s: &str) -> String {
let s = if let Some(idx) = s.rfind('.') {
&s[..idx]
} else {
s
};
let mut decoded = String::new();
let mut chars = s.chars().peekable();
while let Some(c) = chars.next() {
if c == '%' {
let hex: String = chars.by_ref().take(2).collect();
if hex.len() == 2 {
if let Ok(byte) = u8::from_str_radix(&hex, 16) {
decoded.push(byte as char);
continue;
}
}
decoded.push('%');
decoded.push_str(&hex);
} else {
decoded.push(c);
}
}
decoded
.chars()
.map(|c| match c {
'!' => '/',
'/' => '!',
_ => c,
})
.collect()
}
pub fn increment_seq(s: &str) -> String {
if s.is_empty() {
return "1".to_string();
}
let mut chars: Vec<char> = s.chars().collect();
let mut carry = true;
for i in (0..chars.len()).rev() {
if !carry {
break;
}
let c = chars[i];
if c == '9' {
chars[i] = '0';
carry = true;
} else if c.is_ascii_digit() {
chars[i] = (c as u8 + 1) as char;
carry = false;
} else {
break;
}
}
if carry {
format!("1{}", chars.iter().collect::<String>())
} else {
chars.iter().collect()
}
}
pub fn max_seq<'a>(a: &'a str, b: &'a str) -> &'a str {
if compare_seqs(a, b) >= 0 {
a
} else {
b
}
}
pub async fn atomic_write(
dest: &std::path::Path,
data: &[u8],
temp_folder: &std::path::Path,
) -> Result<std::fs::Metadata> {
use tokio::fs;
fs::create_dir_all(temp_folder).await?;
let temp_name = format!("tmp_{}", uuid::Uuid::new_v4());
let temp_path = temp_folder.join(temp_name);
fs::write(&temp_path, data).await?;
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent).await?;
}
fs::rename(&temp_path, dest).await?;
let metadata = std::fs::metadata(dest).map_err(|e| crate::core::BraidError::Io(e))?;
Ok(metadata)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encode_decode_roundtrip() {
let original = "path/to/file.txt";
let encoded = encode_filename(original);
let decoded = decode_filename(&encoded);
assert_eq!(decoded, original);
}
#[test]
fn test_increment_seq() {
assert_eq!(increment_seq(""), "1");
assert_eq!(increment_seq("0"), "1");
assert_eq!(increment_seq("9"), "10");
assert_eq!(increment_seq("99"), "100");
assert_eq!(increment_seq("123"), "124");
}
#[test]
fn test_max_seq() {
assert_eq!(max_seq("1", "2"), "2");
assert_eq!(max_seq("10", "2"), "10");
assert_eq!(max_seq("99", "100"), "100");
}
}