1use crate::core::traits::BraidStorage;
2use crate::core::types::Bytes;
3use crate::core::{Result, Version};
4use async_trait::async_trait;
5#[cfg(feature = "native")]
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9#[cfg(feature = "native")]
10use std::path::PathBuf;
11use std::sync::Arc;
12#[cfg(feature = "native")]
13use tokio::fs;
14use tokio::sync::broadcast;
15#[cfg(feature = "native")]
16use tokio::sync::Mutex;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct BlobMetadata {
20 pub key: String,
21 pub version: Vec<Version>,
22 pub content_type: Option<String>,
23 pub parents: Vec<Version>,
24 #[serde(default)]
26 pub content_hash: Option<String>,
27 #[serde(default)]
29 pub size: Option<u64>,
30}
31
32#[derive(Clone, Debug)]
33pub enum StoreEvent {
34 Put {
35 meta: BlobMetadata,
36 data: Bytes,
37 },
38 Delete {
39 key: String,
40 version: Vec<Version>,
41 content_type: Option<String>,
42 },
43}
44
45#[derive(Clone, Debug)]
46#[cfg(feature = "native")]
47pub struct BlobStore {
48 db_path: PathBuf,
49 meta_db_path: PathBuf,
50 meta_conn: Arc<Mutex<Connection>>,
51 tx: broadcast::Sender<StoreEvent>,
52}
53
54#[cfg(feature = "native")]
55impl BlobStore {
56 pub async fn new(db_path: PathBuf, meta_db_path: PathBuf) -> Result<Self> {
57 fs::create_dir_all(&db_path).await?;
58 if let Some(parent) = meta_db_path.parent() {
59 fs::create_dir_all(parent).await?;
60 }
61
62 let conn = Connection::open(&meta_db_path)
63 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
64 conn.execute(
65 "CREATE TABLE IF NOT EXISTS meta (
66 key TEXT PRIMARY KEY,
67 value JSON
68 )",
69 [],
70 )
71 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
72
73 let (tx, _) = broadcast::channel(100);
74
75 Ok(Self {
76 db_path,
77 meta_db_path,
78 meta_conn: Arc::new(Mutex::new(conn)),
79 tx,
80 })
81 }
82
83 pub fn subscribe(&self) -> broadcast::Receiver<StoreEvent> {
84 self.tx.subscribe()
85 }
86
87 pub async fn get(&self, key: &str) -> Result<Option<(Bytes, BlobMetadata)>> {
88 let meta = {
89 let conn = self.meta_conn.lock().await;
90 let mut stmt = conn
91 .prepare("SELECT value FROM meta WHERE key = ?")
92 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
93 let mut rows = stmt
94 .query(params![key])
95 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
96
97 if let Some(row) = rows
98 .next()
99 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?
100 {
101 let value_str: String = row
102 .get(0)
103 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
104 serde_json::from_str::<BlobMetadata>(&value_str)?
105 } else {
106 return Ok(None);
107 }
108 };
109
110 let file_path = self.get_file_path(key);
111 if fs::try_exists(&file_path).await? {
112 let data = fs::read(&file_path).await?;
113 Ok(Some((Bytes::from(data), meta)))
114 } else {
115 Ok(None)
116 }
117 }
118
119 pub async fn get_meta(&self, key: &str) -> Result<Option<BlobMetadata>> {
120 let conn = self.meta_conn.lock().await;
121 let mut stmt = conn
122 .prepare("SELECT value FROM meta WHERE key = ?")
123 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
124 let mut rows = stmt
125 .query(params![key])
126 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
127
128 if let Some(row) = rows
129 .next()
130 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?
131 {
132 let value_str: String = row
133 .get(0)
134 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
135 Ok(Some(serde_json::from_str::<BlobMetadata>(&value_str)?))
136 } else {
137 Ok(None)
138 }
139 }
140
141 pub async fn put(
142 &self,
143 key: &str,
144 data: Bytes,
145 version: Vec<Version>,
146 parents: Vec<Version>,
147 content_type: Option<String>,
148 ) -> Result<Vec<Version>> {
149 let current_meta = self.get_meta(key).await?;
150 let new_ver_str = version.first().map(|v| v.to_string()).unwrap_or_default();
151 if let Some(meta) = ¤t_meta {
152 let current_ver_str = meta
153 .version
154 .first()
155 .map(|v| v.to_string())
156 .unwrap_or_default();
157 if compare_versions(&new_ver_str, ¤t_ver_str) <= 0 {
158 return Ok(meta.version.clone());
159 }
160 }
161
162 let mut hasher = Sha256::new();
164 hasher.update(&data);
165 let hash_bytes = hasher.finalize();
166 let content_hash = format!("{:x}", hash_bytes);
167
168 let new_meta = BlobMetadata {
169 key: key.to_string(),
170 version: version.clone(),
171 content_type,
172 parents,
173 content_hash: Some(content_hash.clone()),
174 size: Some(data.len() as u64),
175 };
176
177 let file_path = self.get_file_path(key);
179 fs::write(&file_path, &data).await?;
180
181 {
183 let conn = self.meta_conn.lock().await;
184 let val_str = serde_json::to_string(&new_meta)?;
185 conn.execute(
186 "INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
187 params![key, val_str],
188 )
189 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
190 }
191
192 let _ = self.tx.send(StoreEvent::Put {
194 meta: new_meta.clone(),
195 data: data.clone(),
196 });
197
198 Ok(version)
199 }
200
201 pub async fn delete(&self, key: &str) -> Result<()> {
202 let current_meta = self.get_meta(key).await?;
203
204 {
205 let conn = self.meta_conn.lock().await;
206 conn.execute("DELETE FROM meta WHERE key = ?", params![key])
207 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
208 }
209
210 let file_path = self.get_file_path(key);
211 if fs::try_exists(&file_path).await? {
212 fs::remove_file(&file_path).await?;
213 }
214
215 if let Some(meta) = current_meta {
216 let _ = self.tx.send(StoreEvent::Delete {
217 key: key.to_string(),
218 version: meta.version,
219 content_type: meta.content_type,
220 });
221 } else {
222 let _ = self.tx.send(StoreEvent::Delete {
223 key: key.to_string(),
224 version: vec![],
225 content_type: None,
226 });
227 }
228
229 Ok(())
230 }
231
232 fn get_file_path(&self, key: &str) -> PathBuf {
233 self.db_path.join(encode_filename(key))
234 }
235}
236
237#[cfg(feature = "native")]
238#[async_trait]
239impl BraidStorage for BlobStore {
240 async fn put(&self, key: &str, data: crate::core::types::Bytes, meta: String) -> Result<()> {
241 let metadata: BlobMetadata = serde_json::from_str(&meta)?;
242 self.put(
243 key,
244 data,
245 metadata.version,
246 metadata.parents,
247 metadata.content_type,
248 )
249 .await
250 .map(|_| ())
251 }
252
253 async fn get(&self, key: &str) -> Result<Option<(crate::core::types::Bytes, String)>> {
254 if let Some((data, meta)) = self.get(key).await? {
255 let meta_str = serde_json::to_string(&meta)?;
256 Ok(Some((data, meta_str)))
257 } else {
258 Ok(None)
259 }
260 }
261
262 async fn delete(&self, key: &str) -> Result<()> {
263 self.delete(key).await
264 }
265
266 async fn list_keys(&self) -> Result<Vec<String>> {
267 let conn = self.meta_conn.lock().await;
268 let mut stmt = conn
269 .prepare("SELECT key FROM meta")
270 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
271 let rows = stmt
272 .query_map([], |row| row.get(0))
273 .map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
274
275 let mut keys = Vec::new();
276 for key in rows {
277 keys.push(key.map_err(|e| crate::core::BraidError::Fs(e.to_string()))?);
278 }
279 Ok(keys)
280 }
281}
282
283fn compare_versions(a: &str, b: &str) -> i32 {
284 let seq_a = get_event_seq(a);
285 let seq_b = get_event_seq(b);
286
287 let c = compare_seqs(seq_a, seq_b);
288 if c != 0 {
289 return c;
290 }
291
292 if a < b {
293 -1
294 } else if a > b {
295 1
296 } else {
297 0
298 }
299}
300
301fn get_event_seq(e: &str) -> &str {
302 if e.is_empty() {
303 return "";
304 }
305 if let Some(idx) = e.rfind('-') {
306 &e[idx + 1..]
307 } else {
308 e
309 }
310}
311
312fn compare_seqs(a: &str, b: &str) -> i32 {
313 if a.len() != b.len() {
314 return (a.len() as i32) - (b.len() as i32);
315 }
316 if a < b {
317 -1
318 } else if a > b {
319 1
320 } else {
321 0
322 }
323}
324
325pub fn encode_filename(s: &str) -> String {
326 let bits: String = s
331 .chars()
332 .filter(|c| c.is_alphabetic())
333 .map(|c| if c.is_uppercase() { "1" } else { "0" })
334 .collect();
335
336 let postfix = if bits.is_empty() {
338 "0".to_string()
339 } else {
340 bits_to_hex(&bits)
347 };
348
349 let mut s = s
352 .chars()
353 .map(|c| match c {
354 '/' => '!',
355 '!' => '/',
356 _ => c,
357 })
358 .collect::<String>();
359
360 let mut encoded = String::new();
370 for c in s.chars() {
371 if matches!(
372 c,
373 '<' | '>' | ':' | '"' | '/' | '|' | '\\' | '?' | '*' | '%' | '\x00'..='\x1f' | '\x7f'
374 ) {
375 encoded.push_str(&format!("%{:02X}", c as u8));
376 } else {
377 encoded.push(c);
378 }
379 }
380 s = encoded;
381
382 let is_reserved = {
387 let lower = s.to_lowercase();
388 let name_part = lower.split('.').next().unwrap_or("");
389 matches!(name_part, "con" | "prn" | "aux" | "nul")
390 || (name_part.len() == 4
391 && name_part.starts_with("com")
392 && name_part
393 .chars()
394 .nth(3)
395 .map_or(false, |c| c.is_ascii_digit() && c != '0'))
396 || (name_part.len() == 4
397 && name_part.starts_with("lpt")
398 && name_part
399 .chars()
400 .nth(3)
401 .map_or(false, |c| c.is_ascii_digit() && c != '0'))
402 };
403
404 if is_reserved {
405 if s.len() >= 3 {
408 let char_at_2 = s.chars().nth(2).unwrap();
409 let encoded_char = format!("%{:02X}", char_at_2 as u8);
410 let mut chars: Vec<char> = s.chars().collect();
413 let prefix: String = chars.iter().take(2).collect();
414 let suffix: String = chars.iter().skip(3).collect();
415 s = format!("{}{}{}", prefix, encoded_char, suffix);
416 }
417 }
418
419 format!("{}.{}", s, postfix)
421}
422
423fn bits_to_hex(bits: &str) -> String {
424 if bits.is_empty() {
425 return "0".to_string();
426 }
427
428 let rem = bits.len() % 4;
430 let padded = if rem == 0 {
431 bits.to_string()
432 } else {
433 format!("{}{}", "0".repeat(4 - rem), bits)
434 };
435
436 let mut hex = String::new();
437 for chunk in padded.as_bytes().chunks(4) {
438 let chunk_str = std::str::from_utf8(chunk).unwrap();
439 let val = u8::from_str_radix(chunk_str, 2).unwrap();
440 hex.push_str(&format!("{:x}", val));
441 }
442
443 let trimmed = hex.trim_start_matches('0');
450 if trimmed.is_empty() {
451 "0".to_string()
452 } else {
453 trimmed.to_string()
454 }
455}
456
457pub fn decode_filename(s: &str) -> String {
460 let s = if let Some(idx) = s.rfind('.') {
462 &s[..idx]
463 } else {
464 s
465 };
466
467 let mut decoded = String::new();
469 let mut chars = s.chars().peekable();
470 while let Some(c) = chars.next() {
471 if c == '%' {
472 let hex: String = chars.by_ref().take(2).collect();
473 if hex.len() == 2 {
474 if let Ok(byte) = u8::from_str_radix(&hex, 16) {
475 decoded.push(byte as char);
476 continue;
477 }
478 }
479 decoded.push('%');
480 decoded.push_str(&hex);
481 } else {
482 decoded.push(c);
483 }
484 }
485
486 decoded
488 .chars()
489 .map(|c| match c {
490 '!' => '/',
491 '/' => '!',
492 _ => c,
493 })
494 .collect()
495}
496
497pub fn increment_seq(s: &str) -> String {
500 if s.is_empty() {
501 return "1".to_string();
502 }
503
504 let mut chars: Vec<char> = s.chars().collect();
505 let mut carry = true;
506
507 for i in (0..chars.len()).rev() {
508 if !carry {
509 break;
510 }
511
512 let c = chars[i];
513 if c == '9' {
514 chars[i] = '0';
515 carry = true;
516 } else if c.is_ascii_digit() {
517 chars[i] = (c as u8 + 1) as char;
518 carry = false;
519 } else {
520 break;
522 }
523 }
524
525 if carry {
526 format!("1{}", chars.iter().collect::<String>())
528 } else {
529 chars.iter().collect()
530 }
531}
532
533pub fn max_seq<'a>(a: &'a str, b: &'a str) -> &'a str {
536 if compare_seqs(a, b) >= 0 {
537 a
538 } else {
539 b
540 }
541}
542
543pub async fn atomic_write(
547 dest: &std::path::Path,
548 data: &[u8],
549 temp_folder: &std::path::Path,
550) -> Result<std::fs::Metadata> {
551 use tokio::fs;
552
553 fs::create_dir_all(temp_folder).await?;
555
556 let temp_name = format!("tmp_{}", uuid::Uuid::new_v4());
558 let temp_path = temp_folder.join(temp_name);
559
560 fs::write(&temp_path, data).await?;
562
563 if let Some(parent) = dest.parent() {
565 fs::create_dir_all(parent).await?;
566 }
567
568 fs::rename(&temp_path, dest).await?;
570
571 let metadata = std::fs::metadata(dest).map_err(|e| crate::core::BraidError::Io(e))?;
573 Ok(metadata)
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 #[test]
581 fn test_encode_decode_roundtrip() {
582 let original = "path/to/file.txt";
583 let encoded = encode_filename(original);
584 let decoded = decode_filename(&encoded);
585 assert_eq!(decoded, original);
586 }
587
588 #[test]
589 fn test_increment_seq() {
590 assert_eq!(increment_seq(""), "1");
591 assert_eq!(increment_seq("0"), "1");
592 assert_eq!(increment_seq("9"), "10");
593 assert_eq!(increment_seq("99"), "100");
594 assert_eq!(increment_seq("123"), "124");
595 }
596
597 #[test]
598 fn test_max_seq() {
599 assert_eq!(max_seq("1", "2"), "2");
600 assert_eq!(max_seq("10", "2"), "10");
601 assert_eq!(max_seq("99", "100"), "100");
602 }
603}