1use async_trait::async_trait;
2use braid_http::error::{BraidError, Result};
3use braid_http::traits::BraidStorage;
4use braid_http::types::{Bytes, Version};
5#[cfg(feature = "native")]
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9#[cfg(feature = "native")]
10use std::path::PathBuf;
11use std::sync::Arc;
12#[cfg(feature = "native")]
13use tokio::fs;
14use tokio::sync::broadcast;
15#[cfg(feature = "native")]
16use tokio::sync::Mutex;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct BlobMetadata {
20 pub key: String,
21 pub version: Vec<Version>,
22 pub content_type: Option<String>,
23 pub parents: Vec<Version>,
24 #[serde(default)]
25 pub content_hash: Option<String>,
26 #[serde(default)]
27 pub size: Option<u64>,
28}
29
30#[derive(Clone, Debug)]
31pub enum StoreEvent {
32 Put {
33 meta: BlobMetadata,
34 data: Bytes,
35 },
36 Delete {
37 key: String,
38 version: Vec<Version>,
39 content_type: Option<String>,
40 },
41}
42
43#[derive(Clone, Debug)]
44#[cfg(feature = "native")]
45pub struct BlobStore {
46 db_path: PathBuf,
47 meta_db_path: PathBuf,
48 meta_conn: Arc<Mutex<Connection>>,
49 tx: broadcast::Sender<StoreEvent>,
50}
51
52#[cfg(feature = "native")]
53impl BlobStore {
54 pub async fn new(db_path: PathBuf, meta_db_path: PathBuf) -> Result<Self> {
55 fs::create_dir_all(&db_path).await?;
56 if let Some(parent) = meta_db_path.parent() {
57 fs::create_dir_all(parent).await?;
58 }
59
60 let conn = Connection::open(&meta_db_path).map_err(|e| BraidError::Fs(e.to_string()))?;
61 conn.execute(
62 "CREATE TABLE IF NOT EXISTS meta (
63 key TEXT PRIMARY KEY,
64 value JSON
65 )",
66 [],
67 )
68 .map_err(|e| BraidError::Fs(e.to_string()))?;
69
70 let (tx, _) = broadcast::channel(100);
71
72 Ok(Self {
73 db_path,
74 meta_db_path,
75 meta_conn: Arc::new(Mutex::new(conn)),
76 tx,
77 })
78 }
79
80 pub fn subscribe(&self) -> broadcast::Receiver<StoreEvent> {
81 self.tx.subscribe()
82 }
83
84 pub async fn get(&self, key: &str) -> Result<Option<(Bytes, BlobMetadata)>> {
85 let meta = {
86 let conn = self.meta_conn.lock().await;
87 let mut stmt = conn
88 .prepare("SELECT value FROM meta WHERE key = ?")
89 .map_err(|e| BraidError::Fs(e.to_string()))?;
90 let mut rows = stmt
91 .query(params![key])
92 .map_err(|e| BraidError::Fs(e.to_string()))?;
93
94 if let Some(row) = rows.next().map_err(|e| BraidError::Fs(e.to_string()))? {
95 let value_str: String = row.get(0).map_err(|e| BraidError::Fs(e.to_string()))?;
96 serde_json::from_str::<BlobMetadata>(&value_str)?
97 } else {
98 return Ok(None);
99 }
100 };
101
102 let file_path = self.get_file_path(key);
103 if fs::try_exists(&file_path).await? {
104 let data = fs::read(&file_path).await?;
105 Ok(Some((Bytes::from(data), meta)))
106 } else {
107 Ok(None)
108 }
109 }
110
111 pub async fn get_meta(&self, key: &str) -> Result<Option<BlobMetadata>> {
112 let conn = self.meta_conn.lock().await;
113 let mut stmt = conn
114 .prepare("SELECT value FROM meta WHERE key = ?")
115 .map_err(|e| BraidError::Fs(e.to_string()))?;
116 let mut rows = stmt
117 .query(params![key])
118 .map_err(|e| BraidError::Fs(e.to_string()))?;
119
120 if let Some(row) = rows.next().map_err(|e| BraidError::Fs(e.to_string()))? {
121 let value_str: String = row.get(0).map_err(|e| BraidError::Fs(e.to_string()))?;
122 Ok(Some(serde_json::from_str::<BlobMetadata>(&value_str)?))
123 } else {
124 Ok(None)
125 }
126 }
127
128 pub async fn put(
129 &self,
130 key: &str,
131 data: Bytes,
132 version: Vec<Version>,
133 parents: Vec<Version>,
134 content_type: Option<String>,
135 ) -> Result<Vec<Version>> {
136 let current_meta = self.get_meta(key).await?;
137 let new_ver_str = version.first().map(|v| v.to_string()).unwrap_or_default();
138 if let Some(meta) = ¤t_meta {
139 let current_ver_str = meta
140 .version
141 .first()
142 .map(|v| v.to_string())
143 .unwrap_or_default();
144 if compare_versions(&new_ver_str, ¤t_ver_str) <= 0 {
145 return Ok(meta.version.clone());
146 }
147 }
148
149 let mut hasher = Sha256::new();
150 hasher.update(&data);
151 let hash_bytes = hasher.finalize();
152 let content_hash = format!("{:x}", hash_bytes);
153
154 let new_meta = BlobMetadata {
155 key: key.to_string(),
156 version: version.clone(),
157 content_type,
158 parents,
159 content_hash: Some(content_hash.clone()),
160 size: Some(data.len() as u64),
161 };
162
163 let file_path = self.get_file_path(key);
164 let temp_folder = self.db_path.join("tmp");
165 atomic_write(&file_path, &data, &temp_folder).await?;
166
167 {
168 let conn = self.meta_conn.lock().await;
169 let val_str = serde_json::to_string(&new_meta)?;
170 conn.execute(
171 "INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
172 params![key, val_str],
173 )
174 .map_err(|e| BraidError::Fs(e.to_string()))?;
175 }
176
177 let _ = self.tx.send(StoreEvent::Put {
178 meta: new_meta.clone(),
179 data: data.clone(),
180 });
181
182 Ok(version)
183 }
184
185 pub async fn delete(&self, key: &str) -> Result<()> {
186 let current_meta = self.get_meta(key).await?;
187
188 {
189 let conn = self.meta_conn.lock().await;
190 conn.execute("DELETE FROM meta WHERE key = ?", params![key])
191 .map_err(|e| BraidError::Fs(e.to_string()))?;
192 }
193
194 let file_path = self.get_file_path(key);
195 if fs::try_exists(&file_path).await? {
196 fs::remove_file(&file_path).await?;
197 }
198
199 if let Some(meta) = current_meta {
200 let _ = self.tx.send(StoreEvent::Delete {
201 key: key.to_string(),
202 version: meta.version,
203 content_type: meta.content_type,
204 });
205 }
206 Ok(())
207 }
208
209 fn get_file_path(&self, key: &str) -> PathBuf {
210 self.db_path.join(encode_filename(key))
211 }
212}
213
214#[cfg(feature = "native")]
215#[async_trait]
216impl BraidStorage for BlobStore {
217 async fn put(&self, key: &str, data: Bytes, meta: String) -> Result<()> {
218 let metadata: BlobMetadata = serde_json::from_str(&meta)?;
219 self.put(
220 key,
221 data,
222 metadata.version,
223 metadata.parents,
224 metadata.content_type,
225 )
226 .await
227 .map(|_| ())
228 }
229
230 async fn get(&self, key: &str) -> Result<Option<(Bytes, String)>> {
231 if let Some((data, meta)) = self.get(key).await? {
232 let meta_str = serde_json::to_string(&meta)?;
233 Ok(Some((data, meta_str)))
234 } else {
235 Ok(None)
236 }
237 }
238
239 async fn delete(&self, key: &str) -> Result<()> {
240 self.delete(key).await
241 }
242
243 async fn list_keys(&self) -> Result<Vec<String>> {
244 let conn = self.meta_conn.lock().await;
245 let mut stmt = conn
246 .prepare("SELECT key FROM meta")
247 .map_err(|e| BraidError::Fs(e.to_string()))?;
248 let rows = stmt
249 .query_map([], |row| row.get(0))
250 .map_err(|e| BraidError::Fs(e.to_string()))?;
251
252 let mut keys = Vec::new();
253 for key in rows {
254 keys.push(key.map_err(|e| BraidError::Fs(e.to_string()))?);
255 }
256 Ok(keys)
257 }
258}
259
260fn compare_versions(a: &str, b: &str) -> i32 {
261 let seq_a = get_event_seq(a);
262 let seq_b = get_event_seq(b);
263 let c = compare_seqs(seq_a, seq_b);
264 if c != 0 {
265 return c;
266 }
267 a.cmp(b) as i32
268}
269
270fn get_event_seq(e: &str) -> &str {
271 if e.is_empty() {
272 return "";
273 }
274 if let Some(idx) = e.rfind('-') {
275 &e[idx + 1..]
276 } else {
277 e
278 }
279}
280
281fn compare_seqs(a: &str, b: &str) -> i32 {
282 if a.len() != b.len() {
283 return (a.len() as i32) - (b.len() as i32);
284 }
285 a.cmp(b) as i32
286}
287
288pub fn encode_filename(s: &str) -> String {
289 let bits: String = s
290 .chars()
291 .filter(|c| c.is_alphabetic())
292 .map(|c| if c.is_uppercase() { "1" } else { "0" })
293 .collect();
294
295 let postfix = if bits.is_empty() {
296 "0".to_string()
297 } else {
298 bits_to_hex(&bits)
299 };
300 let mut s = s
301 .chars()
302 .map(|c| match c {
303 '/' => '!',
304 '!' => '/',
305 _ => c,
306 })
307 .collect::<String>();
308 let mut encoded = String::new();
309 for c in s.chars() {
310 if matches!(
311 c,
312 '<' | '>' | ':' | '"' | '/' | '|' | '\\' | '?' | '*' | '%' | '\x00'..='\x1f' | '\x7f'
313 ) {
314 encoded.push_str(&format!("%{:02X}", c as u8));
315 } else {
316 encoded.push(c);
317 }
318 }
319 s = encoded;
320
321 let is_reserved = {
322 let lower = s.to_lowercase();
323 let name_part = lower.split('.').next().unwrap_or("");
324 matches!(name_part, "con" | "prn" | "aux" | "nul")
325 || (name_part.len() == 4
326 && name_part.starts_with("com")
327 && name_part
328 .chars()
329 .nth(3)
330 .map_or(false, |c| c.is_ascii_digit() && c != '0'))
331 || (name_part.len() == 4
332 && name_part.starts_with("lpt")
333 && name_part
334 .chars()
335 .nth(3)
336 .map_or(false, |c| c.is_ascii_digit() && c != '0'))
337 };
338
339 if is_reserved && s.len() >= 3 {
340 let char_at_2 = s.chars().nth(2).unwrap();
341 let encoded_char = format!("%{:02X}", char_at_2 as u8);
342 let chars: Vec<char> = s.chars().collect();
343 let prefix: String = chars.iter().take(2).collect();
344 let suffix: String = chars.iter().skip(3).collect();
345 s = format!("{}{}{}", prefix, encoded_char, suffix);
346 }
347
348 format!("{}.{}", s, postfix)
349}
350
351fn bits_to_hex(bits: &str) -> String {
352 if bits.is_empty() {
353 return "0".to_string();
354 }
355 let rem = bits.len() % 4;
356 let padded = if rem == 0 {
357 bits.to_string()
358 } else {
359 format!("{}{}", "0".repeat(4 - rem), bits)
360 };
361 let mut hex = String::new();
362 for chunk in padded.as_bytes().chunks(4) {
363 let val = u8::from_str_radix(std::str::from_utf8(chunk).unwrap(), 2).unwrap();
364 hex.push_str(&format!("{:x}", val));
365 }
366 let trimmed = hex.trim_start_matches('0');
367 if trimmed.is_empty() {
368 "0".to_string()
369 } else {
370 trimmed.to_string()
371 }
372}
373
374pub fn decode_filename(s: &str) -> String {
375 let s = if let Some(idx) = s.rfind('.') {
376 &s[..idx]
377 } else {
378 s
379 };
380 let mut decoded = String::new();
381 let mut chars = s.chars().peekable();
382 while let Some(c) = chars.next() {
383 if c == '%' {
384 let hex: String = chars.by_ref().take(2).collect();
385 if hex.len() == 2 {
386 if let Ok(byte) = u8::from_str_radix(&hex, 16) {
387 decoded.push(byte as char);
388 continue;
389 }
390 }
391 decoded.push('%');
392 decoded.push_str(&hex);
393 } else {
394 decoded.push(c);
395 }
396 }
397 decoded
398 .chars()
399 .map(|c| match c {
400 '!' => '/',
401 '/' => '!',
402 _ => c,
403 })
404 .collect()
405}
406
407#[cfg(feature = "native")]
408pub async fn atomic_write(
409 path: &std::path::Path,
410 data: &[u8],
411 temp_folder: &std::path::Path,
412) -> Result<()> {
413 fs::create_dir_all(temp_folder).await?;
414 let temp_name = format!("tmp_{}", uuid::Uuid::new_v4());
415 let temp_path = temp_folder.join(temp_name);
416
417 fs::write(&temp_path, data).await?;
418
419 if let Some(parent) = path.parent() {
420 fs::create_dir_all(parent).await?;
421 }
422
423 fs::rename(&temp_path, path)
424 .await
425 .map_err(|e| BraidError::Fs(e.to_string()))
426}