Skip to main content

braid_blob/
store.rs

1use async_trait::async_trait;
2use braid_http::error::{BraidError, Result};
3use braid_http::traits::BraidStorage;
4use braid_http::types::{Bytes, Version};
5#[cfg(feature = "native")]
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9#[cfg(feature = "native")]
10use std::path::PathBuf;
11use std::sync::Arc;
12#[cfg(feature = "native")]
13use tokio::fs;
14use tokio::sync::broadcast;
15#[cfg(feature = "native")]
16use tokio::sync::Mutex;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct BlobMetadata {
20    pub key: String,
21    pub version: Vec<Version>,
22    pub content_type: Option<String>,
23    pub parents: Vec<Version>,
24    #[serde(default)]
25    pub content_hash: Option<String>,
26    #[serde(default)]
27    pub size: Option<u64>,
28}
29
30#[derive(Clone, Debug)]
31pub enum StoreEvent {
32    Put {
33        meta: BlobMetadata,
34        data: Bytes,
35    },
36    Delete {
37        key: String,
38        version: Vec<Version>,
39        content_type: Option<String>,
40    },
41}
42
43#[derive(Clone, Debug)]
44#[cfg(feature = "native")]
45pub struct BlobStore {
46    db_path: PathBuf,
47    meta_db_path: PathBuf,
48    meta_conn: Arc<Mutex<Connection>>,
49    tx: broadcast::Sender<StoreEvent>,
50}
51
52#[cfg(feature = "native")]
53impl BlobStore {
54    pub async fn new(db_path: PathBuf, meta_db_path: PathBuf) -> Result<Self> {
55        fs::create_dir_all(&db_path).await?;
56        if let Some(parent) = meta_db_path.parent() {
57            fs::create_dir_all(parent).await?;
58        }
59
60        let conn = Connection::open(&meta_db_path).map_err(|e| BraidError::Fs(e.to_string()))?;
61        conn.execute(
62            "CREATE TABLE IF NOT EXISTS meta (
63                key TEXT PRIMARY KEY,
64                value JSON
65            )",
66            [],
67        )
68        .map_err(|e| BraidError::Fs(e.to_string()))?;
69
70        let (tx, _) = broadcast::channel(100);
71
72        Ok(Self {
73            db_path,
74            meta_db_path,
75            meta_conn: Arc::new(Mutex::new(conn)),
76            tx,
77        })
78    }
79
80    pub fn subscribe(&self) -> broadcast::Receiver<StoreEvent> {
81        self.tx.subscribe()
82    }
83
84    pub async fn get(&self, key: &str) -> Result<Option<(Bytes, BlobMetadata)>> {
85        let meta = {
86            let conn = self.meta_conn.lock().await;
87            let mut stmt = conn
88                .prepare("SELECT value FROM meta WHERE key = ?")
89                .map_err(|e| BraidError::Fs(e.to_string()))?;
90            let mut rows = stmt
91                .query(params![key])
92                .map_err(|e| BraidError::Fs(e.to_string()))?;
93
94            if let Some(row) = rows.next().map_err(|e| BraidError::Fs(e.to_string()))? {
95                let value_str: String = row.get(0).map_err(|e| BraidError::Fs(e.to_string()))?;
96                serde_json::from_str::<BlobMetadata>(&value_str)?
97            } else {
98                return Ok(None);
99            }
100        };
101
102        let file_path = self.get_file_path(key);
103        if fs::try_exists(&file_path).await? {
104            let data = fs::read(&file_path).await?;
105            Ok(Some((Bytes::from(data), meta)))
106        } else {
107            Ok(None)
108        }
109    }
110
111    pub async fn get_meta(&self, key: &str) -> Result<Option<BlobMetadata>> {
112        let conn = self.meta_conn.lock().await;
113        let mut stmt = conn
114            .prepare("SELECT value FROM meta WHERE key = ?")
115            .map_err(|e| BraidError::Fs(e.to_string()))?;
116        let mut rows = stmt
117            .query(params![key])
118            .map_err(|e| BraidError::Fs(e.to_string()))?;
119
120        if let Some(row) = rows.next().map_err(|e| BraidError::Fs(e.to_string()))? {
121            let value_str: String = row.get(0).map_err(|e| BraidError::Fs(e.to_string()))?;
122            Ok(Some(serde_json::from_str::<BlobMetadata>(&value_str)?))
123        } else {
124            Ok(None)
125        }
126    }
127
128    pub async fn put(
129        &self,
130        key: &str,
131        data: Bytes,
132        version: Vec<Version>,
133        parents: Vec<Version>,
134        content_type: Option<String>,
135    ) -> Result<Vec<Version>> {
136        let current_meta = self.get_meta(key).await?;
137        let new_ver_str = version.first().map(|v| v.to_string()).unwrap_or_default();
138        if let Some(meta) = &current_meta {
139            let current_ver_str = meta
140                .version
141                .first()
142                .map(|v| v.to_string())
143                .unwrap_or_default();
144            if compare_versions(&new_ver_str, &current_ver_str) <= 0 {
145                return Ok(meta.version.clone());
146            }
147        }
148
149        let mut hasher = Sha256::new();
150        hasher.update(&data);
151        let hash_bytes = hasher.finalize();
152        let content_hash = format!("{:x}", hash_bytes);
153
154        let new_meta = BlobMetadata {
155            key: key.to_string(),
156            version: version.clone(),
157            content_type,
158            parents,
159            content_hash: Some(content_hash.clone()),
160            size: Some(data.len() as u64),
161        };
162
163        let file_path = self.get_file_path(key);
164        let temp_folder = self.db_path.join("tmp");
165        atomic_write(&file_path, &data, &temp_folder).await?;
166
167        {
168            let conn = self.meta_conn.lock().await;
169            let val_str = serde_json::to_string(&new_meta)?;
170            conn.execute(
171                "INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
172                params![key, val_str],
173            )
174            .map_err(|e| BraidError::Fs(e.to_string()))?;
175        }
176
177        let _ = self.tx.send(StoreEvent::Put {
178            meta: new_meta.clone(),
179            data: data.clone(),
180        });
181
182        Ok(version)
183    }
184
185    pub async fn delete(&self, key: &str) -> Result<()> {
186        let current_meta = self.get_meta(key).await?;
187
188        {
189            let conn = self.meta_conn.lock().await;
190            conn.execute("DELETE FROM meta WHERE key = ?", params![key])
191                .map_err(|e| BraidError::Fs(e.to_string()))?;
192        }
193
194        let file_path = self.get_file_path(key);
195        if fs::try_exists(&file_path).await? {
196            fs::remove_file(&file_path).await?;
197        }
198
199        if let Some(meta) = current_meta {
200            let _ = self.tx.send(StoreEvent::Delete {
201                key: key.to_string(),
202                version: meta.version,
203                content_type: meta.content_type,
204            });
205        }
206        Ok(())
207    }
208
209    fn get_file_path(&self, key: &str) -> PathBuf {
210        self.db_path.join(encode_filename(key))
211    }
212}
213
214#[cfg(feature = "native")]
215#[async_trait]
216impl BraidStorage for BlobStore {
217    async fn put(&self, key: &str, data: Bytes, meta: String) -> Result<()> {
218        let metadata: BlobMetadata = serde_json::from_str(&meta)?;
219        self.put(
220            key,
221            data,
222            metadata.version,
223            metadata.parents,
224            metadata.content_type,
225        )
226        .await
227        .map(|_| ())
228    }
229
230    async fn get(&self, key: &str) -> Result<Option<(Bytes, String)>> {
231        if let Some((data, meta)) = self.get(key).await? {
232            let meta_str = serde_json::to_string(&meta)?;
233            Ok(Some((data, meta_str)))
234        } else {
235            Ok(None)
236        }
237    }
238
239    async fn delete(&self, key: &str) -> Result<()> {
240        self.delete(key).await
241    }
242
243    async fn list_keys(&self) -> Result<Vec<String>> {
244        let conn = self.meta_conn.lock().await;
245        let mut stmt = conn
246            .prepare("SELECT key FROM meta")
247            .map_err(|e| BraidError::Fs(e.to_string()))?;
248        let rows = stmt
249            .query_map([], |row| row.get(0))
250            .map_err(|e| BraidError::Fs(e.to_string()))?;
251
252        let mut keys = Vec::new();
253        for key in rows {
254            keys.push(key.map_err(|e| BraidError::Fs(e.to_string()))?);
255        }
256        Ok(keys)
257    }
258}
259
260fn compare_versions(a: &str, b: &str) -> i32 {
261    let seq_a = get_event_seq(a);
262    let seq_b = get_event_seq(b);
263    let c = compare_seqs(seq_a, seq_b);
264    if c != 0 {
265        return c;
266    }
267    a.cmp(b) as i32
268}
269
270fn get_event_seq(e: &str) -> &str {
271    if e.is_empty() {
272        return "";
273    }
274    if let Some(idx) = e.rfind('-') {
275        &e[idx + 1..]
276    } else {
277        e
278    }
279}
280
281fn compare_seqs(a: &str, b: &str) -> i32 {
282    if a.len() != b.len() {
283        return (a.len() as i32) - (b.len() as i32);
284    }
285    a.cmp(b) as i32
286}
287
288pub fn encode_filename(s: &str) -> String {
289    let bits: String = s
290        .chars()
291        .filter(|c| c.is_alphabetic())
292        .map(|c| if c.is_uppercase() { "1" } else { "0" })
293        .collect();
294
295    let postfix = if bits.is_empty() {
296        "0".to_string()
297    } else {
298        bits_to_hex(&bits)
299    };
300    let mut s = s
301        .chars()
302        .map(|c| match c {
303            '/' => '!',
304            '!' => '/',
305            _ => c,
306        })
307        .collect::<String>();
308    let mut encoded = String::new();
309    for c in s.chars() {
310        if matches!(
311            c,
312            '<' | '>' | ':' | '"' | '/' | '|' | '\\' | '?' | '*' | '%' | '\x00'..='\x1f' | '\x7f'
313        ) {
314            encoded.push_str(&format!("%{:02X}", c as u8));
315        } else {
316            encoded.push(c);
317        }
318    }
319    s = encoded;
320
321    let is_reserved = {
322        let lower = s.to_lowercase();
323        let name_part = lower.split('.').next().unwrap_or("");
324        matches!(name_part, "con" | "prn" | "aux" | "nul")
325            || (name_part.len() == 4
326                && name_part.starts_with("com")
327                && name_part
328                    .chars()
329                    .nth(3)
330                    .map_or(false, |c| c.is_ascii_digit() && c != '0'))
331            || (name_part.len() == 4
332                && name_part.starts_with("lpt")
333                && name_part
334                    .chars()
335                    .nth(3)
336                    .map_or(false, |c| c.is_ascii_digit() && c != '0'))
337    };
338
339    if is_reserved && s.len() >= 3 {
340        let char_at_2 = s.chars().nth(2).unwrap();
341        let encoded_char = format!("%{:02X}", char_at_2 as u8);
342        let chars: Vec<char> = s.chars().collect();
343        let prefix: String = chars.iter().take(2).collect();
344        let suffix: String = chars.iter().skip(3).collect();
345        s = format!("{}{}{}", prefix, encoded_char, suffix);
346    }
347
348    format!("{}.{}", s, postfix)
349}
350
351fn bits_to_hex(bits: &str) -> String {
352    if bits.is_empty() {
353        return "0".to_string();
354    }
355    let rem = bits.len() % 4;
356    let padded = if rem == 0 {
357        bits.to_string()
358    } else {
359        format!("{}{}", "0".repeat(4 - rem), bits)
360    };
361    let mut hex = String::new();
362    for chunk in padded.as_bytes().chunks(4) {
363        let val = u8::from_str_radix(std::str::from_utf8(chunk).unwrap(), 2).unwrap();
364        hex.push_str(&format!("{:x}", val));
365    }
366    let trimmed = hex.trim_start_matches('0');
367    if trimmed.is_empty() {
368        "0".to_string()
369    } else {
370        trimmed.to_string()
371    }
372}
373
374pub fn decode_filename(s: &str) -> String {
375    let s = if let Some(idx) = s.rfind('.') {
376        &s[..idx]
377    } else {
378        s
379    };
380    let mut decoded = String::new();
381    let mut chars = s.chars().peekable();
382    while let Some(c) = chars.next() {
383        if c == '%' {
384            let hex: String = chars.by_ref().take(2).collect();
385            if hex.len() == 2 {
386                if let Ok(byte) = u8::from_str_radix(&hex, 16) {
387                    decoded.push(byte as char);
388                    continue;
389                }
390            }
391            decoded.push('%');
392            decoded.push_str(&hex);
393        } else {
394            decoded.push(c);
395        }
396    }
397    decoded
398        .chars()
399        .map(|c| match c {
400            '!' => '/',
401            '/' => '!',
402            _ => c,
403        })
404        .collect()
405}
406
407#[cfg(feature = "native")]
408pub async fn atomic_write(
409    path: &std::path::Path,
410    data: &[u8],
411    temp_folder: &std::path::Path,
412) -> Result<()> {
413    fs::create_dir_all(temp_folder).await?;
414    let temp_name = format!("tmp_{}", uuid::Uuid::new_v4());
415    let temp_path = temp_folder.join(temp_name);
416
417    fs::write(&temp_path, data).await?;
418
419    if let Some(parent) = path.parent() {
420        fs::create_dir_all(parent).await?;
421    }
422
423    fs::rename(&temp_path, path)
424        .await
425        .map_err(|e| BraidError::Fs(e.to_string()))
426}