1use sha2::Digest;
7
8use rusqlite::{Connection, params};
9use std::path::Path;
10use thiserror::Error;
11
12use crate::types::{BlobRef, BranchProto, HashProto, PatchProto, UserInfo};
13use crate::webhooks::Webhook;
14
15#[derive(Error, Debug)]
16pub enum StorageError {
17 #[error("database error: {0}")]
18 Database(#[from] rusqlite::Error),
19
20 #[error("I/O error: {0}")]
21 Io(#[from] std::io::Error),
22
23 #[error("repo not found: {0}")]
24 RepoNotFound(String),
25
26 #[error("base64 error: {0}")]
27 Base64(String),
28
29 #[error("lock poisoned: {0}")]
30 PoisonedLock(String),
31
32 #[error("webhook not found: {0}")]
33 WebhookNotFound(String),
34
35 #[error("{0}")]
36 Custom(String),
37
38 #[error("blob exceeds maximum allowed size of {0} bytes")]
39 BlobTooLarge(usize),
40}
41
42pub struct HubStorage {
50 conn: std::sync::Mutex<Connection>,
51 max_blob_size: usize,
52 max_page_size: usize,
53}
54
55type MirrorRow = (String, String, String, Option<i64>, String);
57
58type MirrorListRow = (i64, String, String, String, Option<i64>, String);
60
61impl HubStorage {
62 pub fn open(path: &Path) -> Result<Self, StorageError> {
64 Self::open_with_limits(path, 50 * 1024 * 1024, 10000)
65 }
66
67 pub fn open_with_limits(
69 path: &Path,
70 max_blob_size: usize,
71 max_page_size: usize,
72 ) -> Result<Self, StorageError> {
73 if let Some(parent) = path.parent() {
74 std::fs::create_dir_all(parent)?;
75 }
76 let conn = Connection::open(path)?;
77 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
78 let mut store = Self {
79 conn: std::sync::Mutex::new(conn),
80 max_blob_size,
81 max_page_size,
82 };
83 store.migrate()?;
84 Ok(store)
85 }
86
87 pub fn open_in_memory() -> Result<Self, StorageError> {
89 Self::open_in_memory_with_limits(50 * 1024 * 1024, 10000)
90 }
91
92 pub fn open_in_memory_with_limits(
94 max_blob_size: usize,
95 max_page_size: usize,
96 ) -> Result<Self, StorageError> {
97 let conn = Connection::open_in_memory()?;
98 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
99 let mut store = Self {
100 conn: std::sync::Mutex::new(conn),
101 max_blob_size,
102 max_page_size,
103 };
104 store.migrate()?;
105 Ok(store)
106 }
107
108 fn migrate(&mut self) -> Result<(), StorageError> {
109 let conn = self
110 .conn
111 .get_mut()
112 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
113 conn.execute_batch(
114 "CREATE TABLE IF NOT EXISTS repos (
115 repo_id TEXT PRIMARY KEY,
116 created_at TEXT NOT NULL DEFAULT (datetime('now'))
117 );
118
119 CREATE TABLE IF NOT EXISTS patches (
120 repo_id TEXT NOT NULL,
121 patch_id TEXT NOT NULL,
122 operation_type TEXT NOT NULL,
123 touch_set TEXT NOT NULL,
124 target_path TEXT,
125 payload TEXT NOT NULL,
126 parent_ids TEXT NOT NULL,
127 author TEXT NOT NULL,
128 message TEXT NOT NULL,
129 timestamp INTEGER NOT NULL,
130 PRIMARY KEY (repo_id, patch_id)
131 );
132
133 CREATE TABLE IF NOT EXISTS branches (
134 repo_id TEXT NOT NULL,
135 name TEXT NOT NULL,
136 target_patch_id TEXT NOT NULL,
137 PRIMARY KEY (repo_id, name)
138 );
139
140 CREATE TABLE IF NOT EXISTS blobs (
141 repo_id TEXT NOT NULL,
142 blob_hash TEXT NOT NULL,
143 data BLOB NOT NULL,
144 PRIMARY KEY (repo_id, blob_hash)
145 );
146
147 CREATE TABLE IF NOT EXISTS authorized_keys (
148 author TEXT PRIMARY KEY,
149 public_key BLOB NOT NULL,
150 added_at TEXT NOT NULL DEFAULT (datetime('now'))
151 );
152
153 CREATE TABLE IF NOT EXISTS tokens (
154 token TEXT PRIMARY KEY,
155 created_at INTEGER NOT NULL,
156 description TEXT,
157 expires_at INTEGER NOT NULL
158 );
159
160 CREATE TABLE IF NOT EXISTS branch_protection (
161 repo_id TEXT NOT NULL,
162 branch_name TEXT NOT NULL,
163 PRIMARY KEY (repo_id, branch_name)
164 );
165
166 CREATE TABLE IF NOT EXISTS mirrors (
167 id INTEGER PRIMARY KEY,
168 repo_name TEXT NOT NULL,
169 upstream_url TEXT NOT NULL,
170 upstream_repo TEXT NOT NULL,
171 last_sync INTEGER,
172 status TEXT DEFAULT 'idle'
173 );
174
175 CREATE TABLE IF NOT EXISTS users (
176 username TEXT PRIMARY KEY,
177 display_name TEXT NOT NULL,
178 role TEXT NOT NULL DEFAULT 'member',
179 api_token TEXT UNIQUE,
180 created_at INTEGER NOT NULL
181 );
182
183 CREATE INDEX IF NOT EXISTS idx_patches_repo ON patches(repo_id);
184 CREATE INDEX IF NOT EXISTS idx_branches_repo ON branches(repo_id);
185 CREATE INDEX IF NOT EXISTS idx_blobs_repo ON blobs(repo_id);
186 CREATE INDEX IF NOT EXISTS idx_mirrors_repo ON mirrors(repo_name);
187
188 CREATE TABLE IF NOT EXISTS replication_peers (
189 id INTEGER PRIMARY KEY AUTOINCREMENT,
190 peer_url TEXT NOT NULL UNIQUE,
191 role TEXT NOT NULL DEFAULT 'follower',
192 last_sync_seq INTEGER DEFAULT 0,
193 status TEXT NOT NULL DEFAULT 'active',
194 added_at INTEGER NOT NULL
195 );
196
197 CREATE TABLE IF NOT EXISTS replication_log (
198 seq INTEGER PRIMARY KEY AUTOINCREMENT,
199 operation TEXT NOT NULL,
200 table_name TEXT NOT NULL,
201 row_id TEXT NOT NULL,
202 data TEXT,
203 timestamp INTEGER NOT NULL
204 );
205
206 CREATE TABLE IF NOT EXISTS webhooks (
207 id TEXT PRIMARY KEY,
208 repo_id TEXT NOT NULL,
209 url TEXT NOT NULL,
210 events TEXT NOT NULL,
211 secret TEXT,
212 created_at INTEGER NOT NULL,
213 active INTEGER NOT NULL DEFAULT 1,
214 FOREIGN KEY (repo_id) REFERENCES repos(repo_id)
215 );
216
217 CREATE INDEX IF NOT EXISTS idx_webhooks_repo ON webhooks(repo_id);
218
219 CREATE TABLE IF NOT EXISTS sso_providers (
220 provider_name TEXT PRIMARY KEY,
221 config_json TEXT NOT NULL,
222 created_at TEXT NOT NULL DEFAULT (datetime('now')),
223 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
224 );
225
226 CREATE TABLE IF NOT EXISTS audit_log (
227 id INTEGER PRIMARY KEY AUTOINCREMENT,
228 timestamp TEXT NOT NULL DEFAULT (datetime('now')),
229 actor TEXT NOT NULL DEFAULT '',
230 action TEXT NOT NULL,
231 resource_type TEXT NOT NULL DEFAULT '',
232 resource_id TEXT NOT NULL DEFAULT '',
233 status TEXT NOT NULL DEFAULT 'success',
234 details TEXT NOT NULL DEFAULT '',
235 request_id TEXT NOT NULL DEFAULT '',
236 client_ip TEXT NOT NULL DEFAULT ''
237 );
238 CREATE INDEX IF NOT EXISTS idx_audit_log_timestamp ON audit_log(timestamp);
239 CREATE INDEX IF NOT EXISTS idx_audit_log_actor ON audit_log(actor);
240 CREATE INDEX IF NOT EXISTS idx_audit_log_action ON audit_log(action);
241
242 CREATE TABLE IF NOT EXISTS sso_states (
243 state TEXT PRIMARY KEY,
244 provider_name TEXT NOT NULL,
245 nonce TEXT NOT NULL,
246 created_at INTEGER NOT NULL
247 );
248 CREATE INDEX IF NOT EXISTS idx_sso_states_created ON sso_states(created_at);
249
250 CREATE TABLE IF NOT EXISTS sso_user_mappings (
251 provider_name TEXT NOT NULL,
252 provider_sub TEXT NOT NULL,
253 username TEXT NOT NULL,
254 linked_at INTEGER NOT NULL,
255 PRIMARY KEY (provider_name, provider_sub),
256 FOREIGN KEY (username) REFERENCES users(username)
257 );
258 ",
259 )?;
260
261 let has_expires: bool = conn.query_row(
262 "SELECT COUNT(*) > 0 FROM pragma_table_info('tokens') WHERE name = 'expires_at'",
263 [],
264 |row| row.get(0),
265 )?;
266
267 if !has_expires {
268 let now = std::time::SystemTime::now()
269 .duration_since(std::time::UNIX_EPOCH)
270 .unwrap_or_default()
271 .as_secs() as i64;
272 let default_expiry = now + (30 * 24 * 60 * 60);
273 conn.execute_batch(
274 "ALTER TABLE tokens ADD COLUMN expires_at INTEGER NOT NULL DEFAULT 0;",
275 )?;
276 conn.execute(
277 "UPDATE tokens SET expires_at = ?1 WHERE expires_at = 0",
278 params![default_expiry],
279 )?;
280 }
281
282 Ok(())
283 }
284
285 pub fn ensure_repo(&self, repo_id: &str) -> Result<bool, StorageError> {
289 let conn = self
290 .conn
291 .lock()
292 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
293 conn.execute(
294 "INSERT OR IGNORE INTO repos (repo_id) VALUES (?1)",
295 params![repo_id],
296 )?;
297 Ok(conn.changes() > 0)
298 }
299
300 pub fn list_repos(&self) -> Result<Vec<String>, StorageError> {
302 let conn = self
303 .conn
304 .lock()
305 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
306 let mut stmt = conn.prepare("SELECT repo_id FROM repos ORDER BY repo_id")?;
307 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
308 let mut ids = Vec::new();
309 for row in rows {
310 ids.push(row?);
311 }
312 Ok(ids)
313 }
314
315 pub fn repo_exists(&self, repo_id: &str) -> Result<bool, StorageError> {
317 let conn = self
318 .conn
319 .lock()
320 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
321 let count: i64 = conn.query_row(
322 "SELECT COUNT(*) FROM repos WHERE repo_id = ?1",
323 params![repo_id],
324 |row| row.get(0),
325 )?;
326 Ok(count > 0)
327 }
328
329 pub fn insert_patch(&self, repo_id: &str, patch: &PatchProto) -> Result<bool, StorageError> {
333 let id_hex = hash_to_hex(&patch.id);
334 let touch_set_json = serde_json::to_string(&patch.touch_set).unwrap_or_default();
335 let parent_ids_json = serde_json::to_string(
336 &patch
337 .parent_ids
338 .iter()
339 .map(|h| &h.value)
340 .collect::<Vec<_>>(),
341 )
342 .unwrap_or_default();
343
344 let conn = self
345 .conn
346 .lock()
347 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
348 conn.execute(
349 "INSERT OR IGNORE INTO patches (repo_id, patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp)
350 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
351 params![
352 repo_id,
353 id_hex,
354 patch.operation_type,
355 touch_set_json,
356 patch.target_path,
357 patch.payload,
358 parent_ids_json,
359 patch.author,
360 patch.message,
361 patch.timestamp as i64,
362 ],
363 )?;
364 Ok(conn.changes() > 0)
365 }
366
367 pub fn get_patch(
369 &self,
370 repo_id: &str,
371 patch_id_hex: &str,
372 ) -> Result<Option<PatchProto>, StorageError> {
373 let conn = self
374 .conn
375 .lock()
376 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
377 let result = conn.query_row(
378 "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
379 FROM patches WHERE repo_id = ?1 AND patch_id = ?2",
380 params![repo_id, patch_id_hex],
381 |row| {
382 let id_hex: String = row.get(0)?;
383 let operation_type: String = row.get(1)?;
384 let touch_set_json: String = row.get(2)?;
385 let target_path: Option<String> = row.get(3)?;
386 let payload: String = row.get(4)?;
387 let parent_ids_json: String = row.get(5)?;
388 let author: String = row.get(6)?;
389 let message: String = row.get(7)?;
390 let timestamp: i64 = row.get(8)?;
391
392 let touch_set: Vec<String> =
393 serde_json::from_str(&touch_set_json).unwrap_or_default();
394 let parent_ids: Vec<String> =
395 serde_json::from_str(&parent_ids_json).unwrap_or_default();
396
397 Ok(PatchProto {
398 id: HashProto { value: id_hex },
399 operation_type,
400 touch_set,
401 target_path,
402 payload,
403 parent_ids: parent_ids
404 .into_iter()
405 .map(|h| HashProto { value: h })
406 .collect(),
407 author,
408 message,
409 timestamp: timestamp as u64,
410 })
411 },
412 );
413
414 match result {
415 Ok(patch) => Ok(Some(patch)),
416 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
417 Err(e) => Err(StorageError::Database(e)),
418 }
419 }
420
421 pub fn get_all_patches(
424 &self,
425 repo_id: &str,
426 offset: usize,
427 limit: usize,
428 ) -> Result<Vec<PatchProto>, StorageError> {
429 let effective_limit = limit.min(self.max_page_size).max(1);
430 let conn = self
431 .conn
432 .lock()
433 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
434 let mut stmt = conn.prepare(
435 "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
436 FROM patches WHERE repo_id = ?1 ORDER BY timestamp ASC, patch_id ASC LIMIT ?2 OFFSET ?3",
437 )?;
438 let rows = stmt.query_map(params![repo_id, effective_limit as i64, offset as i64], |row| {
439 let id_hex: String = row.get(0)?;
440 let operation_type: String = row.get(1)?;
441 let touch_set_json: String = row.get(2)?;
442 let target_path: Option<String> = row.get(3)?;
443 let payload: String = row.get(4)?;
444 let parent_ids_json: String = row.get(5)?;
445 let author: String = row.get(6)?;
446 let message: String = row.get(7)?;
447 let timestamp: i64 = row.get(8)?;
448
449 let touch_set: Vec<String> = serde_json::from_str(&touch_set_json).unwrap_or_default();
450 let parent_ids: Vec<String> =
451 serde_json::from_str(&parent_ids_json).unwrap_or_default();
452
453 Ok(PatchProto {
454 id: HashProto { value: id_hex },
455 operation_type,
456 touch_set,
457 target_path,
458 payload,
459 parent_ids: parent_ids
460 .into_iter()
461 .map(|h| HashProto { value: h })
462 .collect(),
463 author,
464 message,
465 timestamp: timestamp as u64,
466 })
467 })?;
468
469 let mut patches = Vec::new();
470 for row in rows {
471 patches.push(row?);
472 }
473 Ok(patches)
474 }
475
476 pub fn get_all_patches_unbounded(&self, repo_id: &str) -> Result<Vec<PatchProto>, StorageError> {
480 let conn = self
481 .conn
482 .lock()
483 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
484 let mut stmt = conn.prepare(
485 "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
486 FROM patches WHERE repo_id = ?1 ORDER BY timestamp ASC, patch_id ASC",
487 )?;
488 let rows = stmt.query_map(params![repo_id], |row| {
489 let id_hex: String = row.get(0)?;
490 let operation_type: String = row.get(1)?;
491 let touch_set_json: String = row.get(2)?;
492 let target_path: Option<String> = row.get(3)?;
493 let payload: String = row.get(4)?;
494 let parent_ids_json: String = row.get(5)?;
495 let author: String = row.get(6)?;
496 let message: String = row.get(7)?;
497 let timestamp: i64 = row.get(8)?;
498
499 let touch_set: Vec<String> =
500 serde_json::from_str(&touch_set_json).unwrap_or_default();
501 let parent_ids: Vec<String> =
502 serde_json::from_str(&parent_ids_json).unwrap_or_default();
503
504 Ok(PatchProto {
505 id: HashProto { value: id_hex },
506 operation_type,
507 touch_set,
508 target_path,
509 payload,
510 parent_ids: parent_ids
511 .into_iter()
512 .map(|h| HashProto { value: h })
513 .collect(),
514 author,
515 message,
516 timestamp: timestamp as u64,
517 })
518 })?;
519
520 let mut patches = Vec::new();
521 for row in rows {
522 patches.push(row?);
523 }
524 Ok(patches)
525 }
526
527 pub fn patch_count(&self, repo_id: &str) -> Result<u64, StorageError> {
529 let conn = self
530 .conn
531 .lock()
532 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
533 let count: i64 = conn.query_row(
534 "SELECT COUNT(*) FROM patches WHERE repo_id = ?1",
535 params![repo_id],
536 |row| row.get(0),
537 )?;
538 Ok(count as u64)
539 }
540
541 pub fn set_branch(
545 &self,
546 repo_id: &str,
547 name: &str,
548 target_patch_id: &str,
549 ) -> Result<(), StorageError> {
550 let conn = self
551 .conn
552 .lock()
553 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
554 conn.execute(
555 "INSERT OR REPLACE INTO branches (repo_id, name, target_patch_id) VALUES (?1, ?2, ?3)",
556 params![repo_id, name, target_patch_id],
557 )?;
558 Ok(())
559 }
560
561 pub fn get_branches(&self, repo_id: &str) -> Result<Vec<BranchProto>, StorageError> {
563 let conn = self
564 .conn
565 .lock()
566 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
567 let mut stmt = conn.prepare(
568 "SELECT name, target_patch_id FROM branches WHERE repo_id = ?1 ORDER BY name",
569 )?;
570
571 let rows = stmt.query_map(params![repo_id], |row| {
572 let name: String = row.get(0)?;
573 let target_hex: String = row.get(1)?;
574 Ok((name, target_hex))
575 })?;
576
577 let mut branches = Vec::new();
578 for row in rows {
579 let (name, target_hex) = row?;
580 branches.push(BranchProto {
581 name,
582 target_id: HashProto { value: target_hex },
583 });
584 }
585 Ok(branches)
586 }
587
588 pub fn store_blob(
592 &self,
593 repo_id: &str,
594 hash_hex: &str,
595 data: &[u8],
596 ) -> Result<(), StorageError> {
597 if data.len() > self.max_blob_size {
598 return Err(StorageError::BlobTooLarge(self.max_blob_size));
599 }
600 let conn = self
601 .conn
602 .lock()
603 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
604 conn.execute(
605 "INSERT OR REPLACE INTO blobs (repo_id, blob_hash, data) VALUES (?1, ?2, ?3)",
606 params![repo_id, hash_hex, data],
607 )?;
608 Ok(())
609 }
610
611 pub fn delete_blob(&self, repo_id: &str, hash_hex: &str) -> Result<(), StorageError> {
612 let conn = self
613 .conn
614 .lock()
615 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
616 conn.execute(
617 "DELETE FROM blobs WHERE repo_id = ?1 AND blob_hash = ?2",
618 params![repo_id, hash_hex],
619 )?;
620 Ok(())
621 }
622
623 pub fn get_blob(&self, repo_id: &str, hash_hex: &str) -> Result<Option<Vec<u8>>, StorageError> {
625 let conn = self
626 .conn
627 .lock()
628 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
629 let result = conn.query_row(
630 "SELECT data FROM blobs WHERE repo_id = ?1 AND blob_hash = ?2",
631 params![repo_id, hash_hex],
632 |row| row.get::<_, Vec<u8>>(0),
633 );
634
635 match result {
636 Ok(data) => Ok(Some(data)),
637 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
638 Err(e) => Err(StorageError::Database(e)),
639 }
640 }
641
642 pub fn get_all_blobs(&self, repo_id: &str) -> Result<Vec<BlobRef>, StorageError> {
645 let conn = self
646 .conn
647 .lock()
648 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
649 let mut stmt = conn.prepare("SELECT blob_hash, data FROM blobs WHERE repo_id = ?1")?;
650
651 let rows = stmt.query_map(params![repo_id], |row| {
652 let hash_hex: String = row.get(0)?;
653 let data: Vec<u8> = row.get(1)?;
654 Ok((hash_hex, data))
655 })?;
656
657 let max_blob_size = self.max_blob_size;
658 let mut blobs = Vec::new();
659 for row in rows {
660 let (hash_hex, data) = row?;
661 let (data_b64, truncated) = if data.len() > max_blob_size {
662 (String::new(), true)
663 } else {
664 (base64_encode(&data), false)
665 };
666 blobs.push(BlobRef {
667 hash: HashProto { value: hash_hex },
668 data: data_b64,
669 truncated,
670 });
671 }
672 Ok(blobs)
673 }
674
675 pub fn get_blobs(
677 &self,
678 repo_id: &str,
679 hashes: &std::collections::HashSet<String>,
680 ) -> Result<Vec<BlobRef>, StorageError> {
681 if hashes.is_empty() {
682 return Ok(vec![]);
683 }
684
685 let placeholders: Vec<String> = hashes.iter().map(|_| "?".to_owned()).collect();
686 let sql = format!(
687 "SELECT blob_hash, data FROM blobs WHERE repo_id = ?1 AND blob_hash IN ({})",
688 placeholders.join(",")
689 );
690
691 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
692 params_vec.push(Box::new(repo_id.to_owned()));
693 for h in hashes {
694 params_vec.push(Box::new(h.clone()));
695 }
696 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
697 params_vec.iter().map(std::convert::AsRef::as_ref).collect();
698
699 let conn = self
700 .conn
701 .lock()
702 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
703 let mut stmt = conn.prepare(&sql)?;
704 let rows = stmt.query_map(param_refs.as_slice(), |row| {
705 let hash_hex: String = row.get(0)?;
706 let data: Vec<u8> = row.get(1)?;
707 Ok((hash_hex, data))
708 })?;
709
710 let mut blobs = Vec::new();
711 for row in rows {
712 let (hash_hex, data) = row?;
713 let (data_b64, truncated) = if data.len() > self.max_blob_size {
714 (String::new(), true)
715 } else {
716 (base64_encode(&data), false)
717 };
718 blobs.push(BlobRef {
719 hash: HashProto { value: hash_hex },
720 data: data_b64,
721 truncated,
722 });
723 }
724 Ok(blobs)
725 }
726
727 pub fn get_branch_target(
729 &self,
730 repo_id: &str,
731 branch_name: &str,
732 ) -> Result<Option<String>, StorageError> {
733 let conn = self
734 .conn
735 .lock()
736 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
737 let result = conn.query_row(
738 "SELECT target_patch_id FROM branches WHERE repo_id = ?1 AND name = ?2",
739 params![repo_id, branch_name],
740 |row| row.get::<_, String>(0),
741 );
742 match result {
743 Ok(hex) => Ok(Some(hex)),
744 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
745 Err(e) => Err(StorageError::Database(e)),
746 }
747 }
748
749 pub fn is_ancestor(
752 &self,
753 repo_id: &str,
754 ancestor_id: &str,
755 descendant_id: &str,
756 ) -> Result<bool, StorageError> {
757 if ancestor_id == descendant_id {
758 return Ok(true);
759 }
760
761 let conn = self
762 .conn
763 .lock()
764 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
765
766 drop(conn);
770
771 self.is_ancestor_batched(repo_id, ancestor_id, descendant_id)
773 }
774
775 fn is_ancestor_batched(
777 &self,
778 repo_id: &str,
779 ancestor_id: &str,
780 descendant_id: &str,
781 ) -> Result<bool, StorageError> {
782 let mut visited = std::collections::HashSet::new();
783 let mut frontier = vec![descendant_id.to_owned()];
784
785 while !frontier.is_empty() {
786 frontier.sort();
788 frontier.dedup();
789 frontier.retain(|id| visited.insert(id.clone()));
790
791 if frontier.is_empty() {
792 break;
793 }
794
795 let patches = self.get_patches_batch(repo_id, &frontier)?;
797 frontier.clear();
798
799 for patch in patches.values() {
800 for parent in &patch.parent_ids {
801 let parent_hex = &parent.value;
802 if parent_hex == ancestor_id {
803 return Ok(true);
804 }
805 if !visited.contains(parent_hex) {
806 frontier.push(parent_hex.clone());
807 }
808 }
809 }
810
811 if visited.len() > 100_000 {
813 return Ok(false);
814 }
815 }
816 Ok(false)
817 }
818
819 fn get_patches_batch(
822 &self,
823 repo_id: &str,
824 ids: &[String],
825 ) -> Result<std::collections::HashMap<String, PatchProto>, StorageError> {
826 if ids.is_empty() {
827 return Ok(std::collections::HashMap::new());
828 }
829
830 let conn = self
831 .conn
832 .lock()
833 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
834
835 let placeholders: Vec<String> = ids.iter().enumerate().map(|(i, _)| format!("?{}", i + 2)).collect();
837 let sql = format!(
838 "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
839 FROM patches WHERE repo_id = ?1 AND patch_id IN ({})",
840 placeholders.join(", ")
841 );
842
843 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(repo_id.to_owned())];
844 for id in ids {
845 params.push(Box::new(id.clone()));
846 }
847 let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
848
849 let mut stmt = conn.prepare(&sql)?;
850 let rows = stmt.query_map(param_refs.as_slice(), |row| {
851 let id_hex: String = row.get(0)?;
852 let operation_type: String = row.get(1)?;
853 let touch_set_json: String = row.get(2)?;
854 let target_path: Option<String> = row.get(3)?;
855 let payload: String = row.get(4)?;
856 let parent_ids_json: String = row.get(5)?;
857 let author: String = row.get(6)?;
858 let message: String = row.get(7)?;
859 let timestamp: i64 = row.get(8)?;
860
861 let touch_set: Vec<String> =
862 serde_json::from_str(&touch_set_json).unwrap_or_default();
863 let parent_ids: Vec<String> =
864 serde_json::from_str(&parent_ids_json).unwrap_or_default();
865
866 Ok((id_hex.clone(), PatchProto {
867 id: HashProto { value: id_hex },
868 operation_type,
869 touch_set,
870 target_path,
871 payload,
872 parent_ids: parent_ids
873 .into_iter()
874 .map(|h| HashProto { value: h })
875 .collect(),
876 author,
877 message,
878 timestamp: timestamp as u64,
879 }))
880 })?;
881
882 let mut result = std::collections::HashMap::with_capacity(ids.len());
883 for row in rows {
884 let (id_hex, patch) = row?;
885 result.insert(id_hex, patch);
886 }
887 Ok(result)
888 }
889
890 pub fn protect_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
893 let conn = self
894 .conn
895 .lock()
896 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
897 conn.execute(
898 "INSERT OR IGNORE INTO branch_protection (repo_id, branch_name) VALUES (?1, ?2)",
899 params![repo_id, branch_name],
900 )?;
901 Ok(())
902 }
903
904 pub fn unprotect_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
905 let conn = self
906 .conn
907 .lock()
908 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
909 conn.execute(
910 "DELETE FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
911 params![repo_id, branch_name],
912 )?;
913 Ok(())
914 }
915
916 pub fn is_branch_protected(
917 &self,
918 repo_id: &str,
919 branch_name: &str,
920 ) -> Result<bool, StorageError> {
921 let conn = self
922 .conn
923 .lock()
924 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
925 let count: i64 = conn.query_row(
926 "SELECT COUNT(*) FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
927 params![repo_id, branch_name],
928 |row| row.get(0),
929 )?;
930 Ok(count > 0)
931 }
932
933 pub fn add_authorized_key(
937 &self,
938 author: &str,
939 public_key_bytes: &[u8],
940 ) -> Result<(), StorageError> {
941 let conn = self
942 .conn
943 .lock()
944 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
945 conn.execute(
946 "INSERT OR REPLACE INTO authorized_keys (author, public_key) VALUES (?1, ?2)",
947 params![author, public_key_bytes],
948 )?;
949 Ok(())
950 }
951
952 pub fn get_authorized_key(&self, author: &str) -> Result<Option<Vec<u8>>, StorageError> {
954 let conn = self
955 .conn
956 .lock()
957 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
958 let result = conn.query_row(
959 "SELECT public_key FROM authorized_keys WHERE author = ?1",
960 params![author],
961 |row| row.get::<_, Vec<u8>>(0),
962 );
963
964 match result {
965 Ok(bytes) => Ok(Some(bytes)),
966 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
967 Err(e) => Err(StorageError::Database(e)),
968 }
969 }
970
971 pub fn has_authorized_keys(&self) -> Result<bool, StorageError> {
973 let conn = self
974 .conn
975 .lock()
976 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
977 let count: i64 =
978 conn.query_row("SELECT COUNT(*) FROM authorized_keys", [], |row| row.get(0))?;
979 Ok(count > 0)
980 }
981
982 pub fn store_token(
985 &self,
986 token: &str,
987 created_at: u64,
988 description: &str,
989 expires_at: i64,
990 ) -> Result<(), StorageError> {
991 let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
992 let conn = self
993 .conn
994 .lock()
995 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
996 conn.execute(
997 "INSERT INTO tokens (token, created_at, description, expires_at) VALUES (?1, ?2, ?3, ?4)",
998 params![token_hash, created_at as i64, description, expires_at],
999 )?;
1000 Ok(())
1001 }
1002
1003 pub fn verify_token(&self, token: &str) -> Result<bool, StorageError> {
1004 let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
1005 let now = std::time::SystemTime::now()
1006 .duration_since(std::time::UNIX_EPOCH)
1007 .unwrap_or_default()
1008 .as_secs() as i64;
1009 let conn = self
1010 .conn
1011 .lock()
1012 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1013 let count: i64 = conn.query_row(
1014 "SELECT COUNT(*) FROM tokens WHERE token = ?1 AND expires_at > ?2",
1015 params![token_hash, now],
1016 |row| row.get(0),
1017 )?;
1018 Ok(count > 0)
1019 }
1020
1021 pub fn has_tokens(&self) -> Result<bool, StorageError> {
1022 let conn = self
1023 .conn
1024 .lock()
1025 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1026 let count: i64 = conn.query_row("SELECT COUNT(*) FROM tokens", [], |row| row.get(0))?;
1027 Ok(count > 0)
1028 }
1029
1030 pub fn has_users(&self) -> Result<bool, StorageError> {
1031 let conn = self
1032 .conn
1033 .lock()
1034 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1035 let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |row| row.get(0))?;
1036 Ok(count > 0)
1037 }
1038
1039 pub fn add_mirror(
1042 &self,
1043 repo_name: &str,
1044 upstream_url: &str,
1045 upstream_repo: &str,
1046 ) -> Result<i64, StorageError> {
1047 let conn = self
1048 .conn
1049 .lock()
1050 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1051 conn.execute(
1052 "INSERT INTO mirrors (repo_name, upstream_url, upstream_repo, last_sync, status)
1053 VALUES (?1, ?2, ?3, NULL, 'idle')",
1054 params![repo_name, upstream_url, upstream_repo],
1055 )?;
1056 Ok(conn.last_insert_rowid())
1057 }
1058
1059 pub fn get_mirror(&self, mirror_id: i64) -> Result<Option<MirrorRow>, StorageError> {
1060 let conn = self
1061 .conn
1062 .lock()
1063 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1064 let result = conn.query_row(
1065 "SELECT repo_name, upstream_url, upstream_repo, last_sync, status FROM mirrors WHERE id = ?1",
1066 params![mirror_id],
1067 |row| {
1068 Ok((
1069 row.get::<_, String>(0)?,
1070 row.get::<_, String>(1)?,
1071 row.get::<_, String>(2)?,
1072 row.get::<_, Option<i64>>(3)?,
1073 row.get::<_, String>(4)?,
1074 ))
1075 },
1076 );
1077 match result {
1078 Ok(row) => Ok(Some(row)),
1079 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1080 Err(e) => Err(StorageError::Database(e)),
1081 }
1082 }
1083
1084 pub fn update_mirror_status(
1085 &self,
1086 mirror_id: i64,
1087 status: &str,
1088 last_sync: Option<i64>,
1089 ) -> Result<(), StorageError> {
1090 let conn = self
1091 .conn
1092 .lock()
1093 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1094 conn.execute(
1095 "UPDATE mirrors SET status = ?1, last_sync = COALESCE(?2, last_sync) WHERE id = ?3",
1096 params![status, last_sync, mirror_id],
1097 )?;
1098 Ok(())
1099 }
1100
1101 pub fn list_mirrors(&self) -> Result<Vec<MirrorListRow>, StorageError> {
1102 let conn = self
1103 .conn
1104 .lock()
1105 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1106 let mut stmt = conn.prepare(
1107 "SELECT id, repo_name, upstream_url, upstream_repo, last_sync, status FROM mirrors ORDER BY id",
1108 )?;
1109 let rows = stmt.query_map([], |row| {
1110 Ok((
1111 row.get::<_, i64>(0)?,
1112 row.get::<_, String>(1)?,
1113 row.get::<_, String>(2)?,
1114 row.get::<_, String>(3)?,
1115 row.get::<_, Option<i64>>(4)?,
1116 row.get::<_, String>(5)?,
1117 ))
1118 })?;
1119 let mut mirrors = Vec::new();
1120 for row in rows {
1121 mirrors.push(row?);
1122 }
1123 Ok(mirrors)
1124 }
1125
1126 pub fn get_mirror_by_repo(&self, repo_name: &str) -> Result<Option<i64>, StorageError> {
1127 let conn = self
1128 .conn
1129 .lock()
1130 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1131 let result = conn.query_row(
1132 "SELECT id FROM mirrors WHERE repo_name = ?1",
1133 params![repo_name],
1134 |row| row.get::<_, i64>(0),
1135 );
1136 match result {
1137 Ok(id) => Ok(Some(id)),
1138 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1139 Err(e) => Err(StorageError::Database(e)),
1140 }
1141 }
1142
1143 pub fn create_user(
1146 &self,
1147 username: &str,
1148 display_name: &str,
1149 role: &str,
1150 api_token: &str,
1151 ) -> Result<(), StorageError> {
1152 let token_hash = format!("{:x}", sha2::Sha256::digest(api_token.as_bytes()));
1153 let created_at = std::time::SystemTime::now()
1154 .duration_since(std::time::UNIX_EPOCH)
1155 .unwrap_or_default()
1156 .as_secs() as i64;
1157 let conn = self
1158 .conn
1159 .lock()
1160 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1161 conn.execute(
1162 "INSERT INTO users (username, display_name, role, api_token, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
1163 params![username, display_name, role, token_hash, created_at],
1164 )?;
1165 Ok(())
1166 }
1167
1168 pub fn get_user(&self, username: &str) -> Result<Option<UserInfo>, StorageError> {
1169 let conn = self
1170 .conn
1171 .lock()
1172 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1173 let result = conn.query_row(
1174 "SELECT username, display_name, role, api_token, created_at FROM users WHERE username = ?1",
1175 params![username],
1176 |row| {
1177 Ok(UserInfo {
1178 username: row.get(0)?,
1179 display_name: row.get(1)?,
1180 role: row.get(2)?,
1181 api_token: row.get(3)?,
1182 created_at: row.get(4)?,
1183 })
1184 },
1185 );
1186 match result {
1187 Ok(user) => Ok(Some(user)),
1188 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1189 Err(e) => Err(StorageError::Database(e)),
1190 }
1191 }
1192
1193 pub fn get_user_by_token(&self, token: &str) -> Result<Option<UserInfo>, StorageError> {
1194 let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
1195 let conn = self
1196 .conn
1197 .lock()
1198 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1199 let result = conn.query_row(
1200 "SELECT username, display_name, role, api_token, created_at FROM users WHERE api_token = ?1",
1201 params![token_hash],
1202 |row| {
1203 Ok(UserInfo {
1204 username: row.get(0)?,
1205 display_name: row.get(1)?,
1206 role: row.get(2)?,
1207 api_token: row.get(3)?,
1208 created_at: row.get(4)?,
1209 })
1210 },
1211 );
1212 match result {
1213 Ok(user) => Ok(Some(user)),
1214 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1215 Err(e) => Err(StorageError::Database(e)),
1216 }
1217 }
1218
1219 pub fn list_users(&self) -> Result<Vec<UserInfo>, StorageError> {
1220 let conn = self
1221 .conn
1222 .lock()
1223 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1224 let mut stmt = conn.prepare(
1225 "SELECT username, display_name, role, api_token, created_at FROM users ORDER BY username",
1226 )?;
1227 let rows = stmt.query_map([], |row| {
1228 Ok(UserInfo {
1229 username: row.get(0)?,
1230 display_name: row.get(1)?,
1231 role: row.get(2)?,
1232 api_token: row.get(3)?,
1233 created_at: row.get(4)?,
1234 })
1235 })?;
1236 let mut users = Vec::new();
1237 for row in rows {
1238 users.push(row?);
1239 }
1240 Ok(users)
1241 }
1242
1243 pub fn update_user_role(&self, username: &str, role: &str) -> Result<(), StorageError> {
1244 let conn = self
1245 .conn
1246 .lock()
1247 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1248 conn.execute(
1249 "UPDATE users SET role = ?1 WHERE username = ?2",
1250 params![role, username],
1251 )?;
1252 Ok(())
1253 }
1254
1255 pub fn delete_user(&self, username: &str) -> Result<(), StorageError> {
1256 let conn = self
1257 .conn
1258 .lock()
1259 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1260 conn.execute("DELETE FROM users WHERE username = ?1", params![username])?;
1261 Ok(())
1262 }
1263
1264 pub fn delete_repo(&self, repo_id: &str) -> Result<(), StorageError> {
1265 let conn = self
1266 .conn
1267 .lock()
1268 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1269 conn.execute("DELETE FROM patches WHERE repo_id = ?1", params![repo_id])?;
1270 conn.execute("DELETE FROM branches WHERE repo_id = ?1", params![repo_id])?;
1271 conn.execute("DELETE FROM blobs WHERE repo_id = ?1", params![repo_id])?;
1272 conn.execute(
1273 "DELETE FROM branch_protection WHERE repo_id = ?1",
1274 params![repo_id],
1275 )?;
1276 conn.execute("DELETE FROM repos WHERE repo_id = ?1", params![repo_id])?;
1277 Ok(())
1278 }
1279
1280 pub fn delete_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
1281 let conn = self
1282 .conn
1283 .lock()
1284 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1285 conn.execute(
1286 "DELETE FROM branches WHERE repo_id = ?1 AND name = ?2",
1287 params![repo_id, branch_name],
1288 )?;
1289 conn.execute(
1290 "DELETE FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
1291 params![repo_id, branch_name],
1292 )?;
1293 Ok(())
1294 }
1295
1296 pub fn delete_mirror(&self, mirror_id: i64) -> Result<(), StorageError> {
1297 let conn = self
1298 .conn
1299 .lock()
1300 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1301 conn.execute("DELETE FROM mirrors WHERE id = ?1", params![mirror_id])?;
1302 Ok(())
1303 }
1304
1305 pub fn search_repos(&self, query: &str) -> Result<Vec<String>, StorageError> {
1306 let pattern = format!("%{query}%");
1307 let conn = self
1308 .conn
1309 .lock()
1310 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1311 let mut stmt =
1312 conn.prepare("SELECT repo_id FROM repos WHERE repo_id LIKE ?1 ORDER BY repo_id")?;
1313 let rows = stmt.query_map(params![pattern], |row| row.get::<_, String>(0))?;
1314 let mut ids = Vec::new();
1315 for row in rows {
1316 ids.push(row?);
1317 }
1318 Ok(ids)
1319 }
1320
1321 pub fn get_patches_at(
1322 &self,
1323 repo_id: &str,
1324 tip_id: &str,
1325 ) -> Result<Vec<PatchProto>, StorageError> {
1326 let mut visited = std::collections::HashSet::new();
1328 let mut frontier = vec![tip_id.to_owned()];
1329
1330 while !frontier.is_empty() {
1331 frontier.sort();
1332 frontier.dedup();
1333 frontier.retain(|id| visited.insert(id.clone()));
1334 if frontier.is_empty() {
1335 break;
1336 }
1337
1338 let patches = self.get_patches_batch(repo_id, &frontier)?;
1339 frontier.clear();
1340
1341 for patch in patches.values() {
1342 for parent in &patch.parent_ids {
1343 if !visited.contains(&parent.value) {
1344 frontier.push(parent.value.clone());
1345 }
1346 }
1347 }
1348
1349 if visited.len() > 100_000 {
1350 break;
1351 }
1352 }
1353
1354 let all_ids: Vec<String> = visited.into_iter().collect();
1356 let patches_map = self.get_patches_batch(repo_id, &all_ids)?;
1357
1358 let mut patches: Vec<PatchProto> = patches_map.into_values().collect();
1360 patches.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.value.cmp(&b.id.value)));
1361 Ok(patches)
1362 }
1363
1364 pub fn get_tree_at_branch(
1365 &self,
1366 repo_id: &str,
1367 branch: &str,
1368 ) -> Result<Vec<crate::types::TreeEntry>, StorageError> {
1369 use crate::types::TreeEntry;
1370
1371 let Some(tip_id) = self.get_branch_target(repo_id, branch)? else { return Ok(Vec::new()) };
1372
1373 let mut patches = self.get_patches_at(repo_id, &tip_id)?;
1374 patches.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.value.cmp(&b.id.value)));
1375
1376 let mut tree: std::collections::HashMap<String, String> = std::collections::HashMap::new();
1377
1378 for patch in &patches {
1379 let path = match &patch.target_path {
1380 Some(p) => p.clone(),
1381 None => continue,
1382 };
1383 match patch.operation_type.as_str() {
1384 "Create" | "Modify" => {
1385 tree.insert(path, patch.payload.clone());
1386 }
1387 "Delete" => {
1388 tree.remove(&path);
1389 }
1390 _ => {}
1391 }
1392 }
1393
1394 let mut entries: Vec<TreeEntry> = tree
1395 .into_iter()
1396 .map(|(path, content_hash)| TreeEntry { path, content_hash })
1397 .collect();
1398 entries.sort_by(|a, b| a.path.cmp(&b.path));
1399 Ok(entries)
1400 }
1401
1402 pub fn search_patches(
1403 &self,
1404 repo_id: &str,
1405 query: &str,
1406 ) -> Result<Vec<PatchProto>, StorageError> {
1407 let pattern = format!("%{query}%");
1408 let conn = self
1409 .conn
1410 .lock()
1411 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1412 let mut stmt = conn.prepare(
1413 "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
1414 FROM patches WHERE repo_id = ?1 AND (author LIKE ?2 OR message LIKE ?3) LIMIT 50",
1415 )?;
1416 let rows = stmt.query_map(params![repo_id, pattern, pattern], |row| {
1417 let id_hex: String = row.get(0)?;
1418 let operation_type: String = row.get(1)?;
1419 let touch_set_json: String = row.get(2)?;
1420 let target_path: Option<String> = row.get(3)?;
1421 let payload: String = row.get(4)?;
1422 let parent_ids_json: String = row.get(5)?;
1423 let author: String = row.get(6)?;
1424 let message: String = row.get(7)?;
1425 let timestamp: i64 = row.get(8)?;
1426
1427 let touch_set: Vec<String> = serde_json::from_str(&touch_set_json).unwrap_or_default();
1428 let parent_ids: Vec<String> =
1429 serde_json::from_str(&parent_ids_json).unwrap_or_default();
1430
1431 Ok(PatchProto {
1432 id: HashProto { value: id_hex },
1433 operation_type,
1434 touch_set,
1435 target_path,
1436 payload,
1437 parent_ids: parent_ids
1438 .into_iter()
1439 .map(|h| HashProto { value: h })
1440 .collect(),
1441 author,
1442 message,
1443 timestamp: timestamp as u64,
1444 })
1445 })?;
1446
1447 let mut patches = Vec::new();
1448 for row in rows {
1449 patches.push(row?);
1450 }
1451 Ok(patches)
1452 }
1453
1454 pub fn add_replication_peer(&self, peer_url: &str, role: &str) -> Result<i64, StorageError> {
1457 let added_at = std::time::SystemTime::now()
1458 .duration_since(std::time::UNIX_EPOCH)
1459 .unwrap_or_default()
1460 .as_secs() as i64;
1461 let conn = self
1462 .conn
1463 .lock()
1464 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1465 conn.execute(
1466 "INSERT INTO replication_peers (peer_url, role, last_sync_seq, status, added_at) VALUES (?1, ?2, 0, 'active', ?3)",
1467 params![peer_url, role, added_at],
1468 )?;
1469 Ok(conn.last_insert_rowid())
1470 }
1471
1472 pub fn remove_replication_peer(&self, id: i64) -> Result<(), StorageError> {
1473 let conn = self
1474 .conn
1475 .lock()
1476 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1477 conn.execute("DELETE FROM replication_peers WHERE id = ?1", params![id])?;
1478 Ok(())
1479 }
1480
1481 pub fn list_replication_peers(&self) -> Result<Vec<ReplicationPeer>, StorageError> {
1482 let conn = self
1483 .conn
1484 .lock()
1485 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1486 let mut stmt = conn.prepare(
1487 "SELECT id, peer_url, role, last_sync_seq, status, added_at FROM replication_peers ORDER BY id",
1488 )?;
1489 let rows = stmt.query_map([], |row| {
1490 Ok(ReplicationPeer {
1491 id: row.get(0)?,
1492 peer_url: row.get(1)?,
1493 role: row.get(2)?,
1494 last_sync_seq: row.get(3)?,
1495 status: row.get(4)?,
1496 added_at: row.get(5)?,
1497 })
1498 })?;
1499 let mut peers = Vec::new();
1500 for row in rows {
1501 peers.push(row?);
1502 }
1503 Ok(peers)
1504 }
1505
1506 pub fn update_peer_sync_seq(&self, peer_id: i64, seq: i64) -> Result<(), StorageError> {
1507 let conn = self
1508 .conn
1509 .lock()
1510 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1511 conn.execute(
1512 "UPDATE replication_peers SET last_sync_seq = ?1 WHERE id = ?2",
1513 params![seq, peer_id],
1514 )?;
1515 Ok(())
1516 }
1517
1518 pub fn get_replication_peer(&self, id: i64) -> Result<Option<ReplicationPeer>, StorageError> {
1519 let conn = self
1520 .conn
1521 .lock()
1522 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1523 let result = conn.query_row(
1524 "SELECT id, peer_url, role, last_sync_seq, status, added_at FROM replication_peers WHERE id = ?1",
1525 params![id],
1526 |row| {
1527 Ok(ReplicationPeer {
1528 id: row.get(0)?,
1529 peer_url: row.get(1)?,
1530 role: row.get(2)?,
1531 last_sync_seq: row.get(3)?,
1532 status: row.get(4)?,
1533 added_at: row.get(5)?,
1534 })
1535 },
1536 );
1537 match result {
1538 Ok(peer) => Ok(Some(peer)),
1539 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1540 Err(e) => Err(StorageError::Database(e)),
1541 }
1542 }
1543
1544 pub fn log_operation(
1545 &self,
1546 operation: &str,
1547 table_name: &str,
1548 row_id: &str,
1549 data: Option<&str>,
1550 ) -> Result<i64, StorageError> {
1551 let timestamp = std::time::SystemTime::now()
1552 .duration_since(std::time::UNIX_EPOCH)
1553 .unwrap_or_default()
1554 .as_secs() as i64;
1555 let conn = self
1556 .conn
1557 .lock()
1558 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1559 conn.execute(
1560 "INSERT INTO replication_log (operation, table_name, row_id, data, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
1561 params![operation, table_name, row_id, data, timestamp],
1562 )?;
1563 Ok(conn.last_insert_rowid())
1564 }
1565
1566 pub fn get_replication_log(
1567 &self,
1568 since_seq: i64,
1569 ) -> Result<Vec<ReplicationEntry>, StorageError> {
1570 let conn = self
1571 .conn
1572 .lock()
1573 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1574 let mut stmt = conn.prepare(
1575 "SELECT seq, operation, table_name, row_id, data, timestamp FROM replication_log WHERE seq > ?1 ORDER BY seq",
1576 )?;
1577 let rows = stmt.query_map(params![since_seq], |row| {
1578 Ok(ReplicationEntry {
1579 seq: row.get(0)?,
1580 operation: row.get(1)?,
1581 table_name: row.get(2)?,
1582 row_id: row.get(3)?,
1583 data: row.get(4)?,
1584 timestamp: row.get(5)?,
1585 })
1586 })?;
1587 let mut entries = Vec::new();
1588 for row in rows {
1589 entries.push(row?);
1590 }
1591 Ok(entries)
1592 }
1593
1594 pub fn apply_replication_entries(
1595 &self,
1596 entries: &[ReplicationEntry],
1597 ) -> Result<(), StorageError> {
1598 let conn = self
1599 .conn
1600 .lock()
1601 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1602 for entry in entries {
1603 conn.execute(
1604 "INSERT OR IGNORE INTO replication_log (seq, operation, table_name, row_id, data, timestamp) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1605 params![entry.seq, entry.operation, entry.table_name, entry.row_id, entry.data, entry.timestamp],
1606 )?;
1607 }
1608 Ok(())
1609 }
1610
1611 pub fn get_replication_status(&self) -> Result<ReplicationStatus, StorageError> {
1612 let peers = self.list_replication_peers()?;
1613 let conn = self
1614 .conn
1615 .lock()
1616 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1617 let current_seq: i64 = conn.query_row(
1618 "SELECT COALESCE(MAX(seq), 0) FROM replication_log",
1619 [],
1620 |row| row.get(0),
1621 )?;
1622 Ok(ReplicationStatus {
1623 current_seq,
1624 peer_count: peers.len(),
1625 peers,
1626 })
1627 }
1628
1629 pub fn create_webhook(&self, webhook: &Webhook) -> Result<(), StorageError> {
1632 let events_json = serde_json::to_string(&webhook.events).unwrap_or_default();
1633 let conn = self
1634 .conn
1635 .lock()
1636 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1637 conn.execute(
1638 "INSERT INTO webhooks (id, repo_id, url, events, secret, created_at, active) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1639 params![
1640 webhook.id,
1641 webhook.repo_id,
1642 webhook.url,
1643 events_json,
1644 webhook.secret,
1645 webhook.created_at as i64,
1646 i32::from(webhook.active),
1647 ],
1648 )?;
1649 Ok(())
1650 }
1651
1652 pub fn list_webhooks(&self, repo_id: &str) -> Result<Vec<Webhook>, StorageError> {
1653 let conn = self
1654 .conn
1655 .lock()
1656 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1657 let mut stmt = conn.prepare(
1658 "SELECT id, repo_id, url, events, secret, created_at, active FROM webhooks WHERE repo_id = ?1 ORDER BY created_at",
1659 )?;
1660 let rows = stmt.query_map(params![repo_id], |row| {
1661 let events_json: String = row.get(3)?;
1662 let events: Vec<String> = serde_json::from_str(&events_json).unwrap_or_default();
1663 Ok(Webhook {
1664 id: row.get(0)?,
1665 repo_id: row.get(1)?,
1666 url: row.get(2)?,
1667 events,
1668 secret: row.get(4)?,
1669 created_at: row.get::<_, i64>(5)? as u64,
1670 active: row.get::<_, i32>(6)? != 0,
1671 })
1672 })?;
1673 let mut webhooks = Vec::new();
1674 for row in rows {
1675 webhooks.push(row?);
1676 }
1677 Ok(webhooks)
1678 }
1679
1680 pub fn get_webhook(&self, id: &str) -> Result<Webhook, StorageError> {
1681 let conn = self
1682 .conn
1683 .lock()
1684 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1685 conn.query_row(
1686 "SELECT id, repo_id, url, events, secret, created_at, active FROM webhooks WHERE id = ?1",
1687 params![id],
1688 |row| {
1689 let events_json: String = row.get(3)?;
1690 let events: Vec<String> = serde_json::from_str(&events_json).unwrap_or_default();
1691 Ok(Webhook {
1692 id: row.get(0)?,
1693 repo_id: row.get(1)?,
1694 url: row.get(2)?,
1695 events,
1696 secret: row.get(4)?,
1697 created_at: row.get::<_, i64>(5)? as u64,
1698 active: row.get::<_, i32>(6)? != 0,
1699 })
1700 },
1701 ).map_err(|e| match e {
1702 rusqlite::Error::QueryReturnedNoRows => StorageError::WebhookNotFound(id.to_owned()),
1703 e => StorageError::Database(e),
1704 })
1705 }
1706
1707 pub fn delete_webhook(&self, id: &str) -> Result<(), StorageError> {
1708 let conn = self
1709 .conn
1710 .lock()
1711 .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1712 let changes = conn.execute("DELETE FROM webhooks WHERE id = ?1", params![id])?;
1713 if changes == 0 {
1714 return Err(StorageError::WebhookNotFound(id.to_owned()));
1715 }
1716 Ok(())
1717 }
1718
1719 pub fn set_oidc_config(&self, config: &crate::sso::OidcConfig) -> Result<(), StorageError> {
1723 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1724 let json = serde_json::to_string(config)
1725 .map_err(|e| StorageError::PoisonedLock(format!("failed to serialize OIDC config: {e}")))?;
1726 conn.execute(
1727 "INSERT OR REPLACE INTO sso_providers (provider_name, config_json, updated_at) VALUES (?1, ?2, datetime('now'))",
1728 params![config.provider_name, json],
1729 )?;
1730 Ok(())
1731 }
1732
1733 pub fn get_oidc_config(&self, provider_name: &str) -> Result<Option<crate::sso::OidcConfig>, StorageError> {
1735 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1736 let result = conn.query_row(
1737 "SELECT config_json FROM sso_providers WHERE provider_name = ?1",
1738 params![provider_name],
1739 |row| row.get::<_, String>(0),
1740 );
1741 match result {
1742 Ok(json) => {
1743 let config: crate::sso::OidcConfig = serde_json::from_str(&json)
1744 .map_err(|e| StorageError::PoisonedLock(format!("failed to deserialize OIDC config: {e}")))?;
1745 Ok(Some(config))
1746 }
1747 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1748 Err(e) => Err(StorageError::Database(e)),
1749 }
1750 }
1751
1752 pub fn list_oidc_configs(&self) -> Result<Vec<crate::sso::OidcConfig>, StorageError> {
1754 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1755 let mut stmt = conn.prepare("SELECT config_json FROM sso_providers ORDER BY provider_name")?;
1756 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
1757 let mut configs = Vec::new();
1758 for row in rows {
1759 let json = row?;
1760 let config: crate::sso::OidcConfig = serde_json::from_str(&json)
1761 .map_err(|e| StorageError::PoisonedLock(format!("failed to deserialize OIDC config: {e}")))?;
1762 configs.push(config);
1763 }
1764 Ok(configs)
1765 }
1766
1767 pub fn delete_oidc_config(&self, provider_name: &str) -> Result<bool, StorageError> {
1769 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1770 let affected = conn.execute(
1771 "DELETE FROM sso_providers WHERE provider_name = ?1",
1772 params![provider_name],
1773 )?;
1774 Ok(affected > 0)
1775 }
1776
1777 pub fn store_sso_state(
1783 &self,
1784 state: &str,
1785 provider_name: &str,
1786 nonce: &str,
1787 ) -> Result<(), StorageError> {
1788 let created_at = std::time::SystemTime::now()
1789 .duration_since(std::time::UNIX_EPOCH)
1790 .unwrap_or_default()
1791 .as_secs() as i64;
1792 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1793 conn.execute(
1794 "INSERT INTO sso_states (state, provider_name, nonce, created_at) VALUES (?1, ?2, ?3, ?4)",
1795 params![state, provider_name, nonce, created_at],
1796 )?;
1797 Ok(())
1798 }
1799
1800 pub fn consume_sso_state(&self, state: &str) -> Result<Option<(String, String)>, StorageError> {
1806 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1807 let result = conn.query_row(
1808 "SELECT provider_name, nonce, created_at FROM sso_states WHERE state = ?1",
1809 params![state],
1810 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, i64>(2)?)),
1811 );
1812 match result {
1813 Ok((provider_name, nonce, created_at)) => {
1814 let _ = conn.execute("DELETE FROM sso_states WHERE state = ?1", params![state]);
1816 let now = std::time::SystemTime::now()
1818 .duration_since(std::time::UNIX_EPOCH)
1819 .unwrap_or_default()
1820 .as_secs() as i64;
1821 let max_age = 10 * 60; if now - created_at > max_age {
1823 return Ok(None);
1824 }
1825 Ok(Some((provider_name, nonce)))
1826 }
1827 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1828 Err(e) => Err(StorageError::Database(e)),
1829 }
1830 }
1831
1832 pub fn cleanup_expired_sso_states(&self) -> Result<usize, StorageError> {
1834 let cutoff = std::time::SystemTime::now()
1835 .duration_since(std::time::UNIX_EPOCH)
1836 .unwrap_or_default()
1837 .as_secs() as i64
1838 - (10 * 60);
1839 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1840 let affected = conn.execute(
1841 "DELETE FROM sso_states WHERE created_at < ?1",
1842 params![cutoff],
1843 )?;
1844 Ok(affected)
1845 }
1846
1847 pub fn get_sso_user_mapping(
1849 &self,
1850 provider_name: &str,
1851 provider_sub: &str,
1852 ) -> Result<Option<String>, StorageError> {
1853 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1854 let result = conn.query_row(
1855 "SELECT username FROM sso_user_mappings WHERE provider_name = ?1 AND provider_sub = ?2",
1856 params![provider_name, provider_sub],
1857 |row| row.get::<_, String>(0),
1858 );
1859 match result {
1860 Ok(username) => Ok(Some(username)),
1861 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1862 Err(e) => Err(StorageError::Database(e)),
1863 }
1864 }
1865
1866 pub fn upsert_sso_user(
1872 &self,
1873 provider_name: &str,
1874 provider_sub: &str,
1875 username: &str,
1876 display_name: &str,
1877 ) -> Result<String, StorageError> {
1878 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1879 let created_at = std::time::SystemTime::now()
1880 .duration_since(std::time::UNIX_EPOCH)
1881 .unwrap_or_default()
1882 .as_secs() as i64;
1883
1884 conn.execute(
1886 "INSERT INTO users (username, display_name, role, api_token, created_at) VALUES (?1, ?2, 'member', NULL, ?3)
1887 ON CONFLICT(username) DO UPDATE SET display_name = ?2",
1888 params![username, display_name, created_at],
1889 )?;
1890
1891 conn.execute(
1893 "INSERT INTO sso_user_mappings (provider_name, provider_sub, username, linked_at) VALUES (?1, ?2, ?3, ?4)
1894 ON CONFLICT(provider_name, provider_sub) DO UPDATE SET username = ?3, linked_at = ?4",
1895 params![provider_name, provider_sub, username, created_at],
1896 )?;
1897
1898 Ok(username.to_owned())
1899 }
1900
1901 pub fn get_user_by_email(&self, email: &str) -> Result<Option<UserInfo>, StorageError> {
1905 self.get_user(email)
1906 }
1907
1908 pub fn update_user_token(&self, username: &str, token_hash: &str) -> Result<(), StorageError> {
1910 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1911 conn.execute(
1912 "UPDATE users SET api_token = ?1 WHERE username = ?2",
1913 params![token_hash, username],
1914 )?;
1915 Ok(())
1916 }
1917
1918 #[allow(clippy::too_many_arguments)]
1922 pub fn write_audit_entry(
1923 &self,
1924 actor: &str,
1925 action: &str,
1926 resource_type: &str,
1927 resource_id: &str,
1928 status: &str,
1929 details: &str,
1930 request_id: &str,
1931 client_ip: &str,
1932 ) -> Result<(), StorageError> {
1933 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1934 conn.execute(
1935 "INSERT INTO audit_log (actor, action, resource_type, resource_id, status, details, request_id, client_ip) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1936 params![actor, action, resource_type, resource_id, status, details, request_id, client_ip],
1937 )?;
1938 Ok(())
1939 }
1940
1941 pub fn query_audit_log(
1943 &self,
1944 actor: Option<&str>,
1945 action: Option<&str>,
1946 limit: usize,
1947 offset: usize,
1948 ) -> Result<Vec<AuditEntry>, StorageError> {
1949 let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1950 let effective_limit: i64 = limit.clamp(1, 1000) as i64;
1951 let effective_offset: i64 = offset as i64;
1952
1953 let mut sql = String::from(
1954 "SELECT id, timestamp, actor, action, resource_type, resource_id, status, details, request_id, client_ip FROM audit_log WHERE 1=1",
1955 );
1956 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1957
1958 if let Some(a) = actor {
1959 sql.push_str(" AND actor = ?");
1960 param_values.push(Box::new(a.to_owned()));
1961 }
1962 if let Some(a) = action {
1963 sql.push_str(" AND action = ?");
1964 param_values.push(Box::new(a.to_owned()));
1965 }
1966 sql.push_str(" ORDER BY id DESC LIMIT ? OFFSET ?");
1967 param_values.push(Box::new(effective_limit));
1968 param_values.push(Box::new(effective_offset));
1969
1970 let param_refs: Vec<&dyn rusqlite::types::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
1971
1972 let mut stmt = conn.prepare(&sql)?;
1973 let rows = stmt.query_map(param_refs.as_slice(), |row| {
1974 Ok(AuditEntry {
1975 id: row.get(0)?,
1976 timestamp: row.get(1)?,
1977 actor: row.get(2)?,
1978 action: row.get(3)?,
1979 resource_type: row.get(4)?,
1980 resource_id: row.get(5)?,
1981 status: row.get(6)?,
1982 details: row.get(7)?,
1983 request_id: row.get(8)?,
1984 client_ip: row.get(9)?,
1985 })
1986 })?;
1987
1988 let mut entries = Vec::new();
1989 for row in rows {
1990 entries.push(row?);
1991 }
1992 Ok(entries)
1993 }
1994}
1995
1996#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1997pub struct ReplicationPeer {
1998 pub id: i64,
1999 pub peer_url: String,
2000 pub role: String,
2001 pub last_sync_seq: i64,
2002 pub status: String,
2003 pub added_at: i64,
2004}
2005
2006#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2007pub struct ReplicationEntry {
2008 pub seq: i64,
2009 pub operation: String,
2010 pub table_name: String,
2011 pub row_id: String,
2012 pub data: Option<String>,
2013 pub timestamp: i64,
2014}
2015
2016#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2017pub struct ReplicationStatus {
2018 pub current_seq: i64,
2019 pub peer_count: usize,
2020 pub peers: Vec<ReplicationPeer>,
2021}
2022
2023#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2025pub struct AuditEntry {
2026 pub id: i64,
2027 pub timestamp: String,
2028 pub actor: String,
2029 pub action: String,
2030 pub resource_type: String,
2031 pub resource_id: String,
2032 pub status: String,
2033 pub details: String,
2034 pub request_id: String,
2035 pub client_ip: String,
2036}
2037
2038fn base64_encode(data: &[u8]) -> String {
2039 use base64::Engine;
2040 base64::engine::general_purpose::STANDARD.encode(data)
2041}
2042
2043fn hash_to_hex(h: &HashProto) -> String {
2044 h.value.clone()
2045}
2046
2047#[cfg(test)]
2048mod tests {
2049 use super::*;
2050 fn make_hash_proto(hex: &str) -> HashProto {
2051 HashProto {
2052 value: hex.to_string(),
2053 }
2054 }
2055
2056 fn make_patch(id_hex: &str, op: &str, parents: &[&str], author: &str) -> PatchProto {
2057 PatchProto {
2058 id: make_hash_proto(id_hex),
2059 operation_type: op.to_string(),
2060 touch_set: vec![format!("file_{id_hex}")],
2061 target_path: Some(format!("file_{id_hex}")),
2062 payload: String::new(),
2063 parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
2064 author: author.to_string(),
2065 message: format!("patch {id_hex}"),
2066 timestamp: 0,
2067 }
2068 }
2069
2070 #[allow(dead_code)]
2071 fn make_branch(name: &str, target: &str) -> BranchProto {
2072 BranchProto {
2073 name: name.to_string(),
2074 target_id: make_hash_proto(target),
2075 }
2076 }
2077
2078 #[test]
2079 fn test_persistence_across_reopen() {
2080 let dir = tempfile::tempdir().unwrap();
2081 let db_path = dir.path().join("hub.db");
2082
2083 let store = HubStorage::open(&db_path).unwrap();
2085 store.ensure_repo("test-repo").unwrap();
2086 let patch = make_patch(&"a".repeat(64), "Create", &[], "alice");
2087 store.insert_patch("test-repo", &patch).unwrap();
2088 store
2089 .set_branch("test-repo", "main", &"a".repeat(64))
2090 .unwrap();
2091 store
2092 .store_blob("test-repo", &"deadbeef".repeat(8), b"hello")
2093 .unwrap();
2094 drop(store);
2095
2096 let store2 = HubStorage::open(&db_path).unwrap();
2098 assert!(store2.repo_exists("test-repo").unwrap());
2099 let all_patches = store2.get_all_patches("test-repo", 0, 10000).unwrap();
2100 assert_eq!(all_patches.len(), 1);
2101 let branches = store2.get_branches("test-repo").unwrap();
2102 assert_eq!(branches.len(), 1);
2103 assert_eq!(branches[0].name, "main");
2104 let blob = store2
2105 .get_blob("test-repo", &"deadbeef".repeat(8))
2106 .unwrap()
2107 .unwrap();
2108 assert_eq!(blob, b"hello");
2109 }
2110
2111 #[test]
2112 fn test_duplicate_patch_ignored() {
2113 let store = HubStorage::open_in_memory().unwrap();
2114 store.ensure_repo("repo").unwrap();
2115 let patch = make_patch(&"a".repeat(64), "Create", &[], "alice");
2116
2117 assert!(store.insert_patch("repo", &patch).unwrap());
2118 assert!(!store.insert_patch("repo", &patch).unwrap());
2119 assert_eq!(store.patch_count("repo").unwrap(), 1);
2120 }
2121
2122 #[test]
2123 fn test_authorized_keys() {
2124 let store = HubStorage::open_in_memory().unwrap();
2125 assert!(!store.has_authorized_keys().unwrap());
2126
2127 let key = [0u8; 32];
2128 store.add_authorized_key("alice", &key).unwrap();
2129 assert!(store.has_authorized_keys().unwrap());
2130
2131 let retrieved = store.get_authorized_key("alice").unwrap().unwrap();
2132 assert_eq!(retrieved, key);
2133
2134 assert!(store.get_authorized_key("bob").unwrap().is_none());
2135 }
2136
2137 #[test]
2138 fn test_list_repos() {
2139 let store = HubStorage::open_in_memory().unwrap();
2140 store.ensure_repo("repo-1").unwrap();
2141 store.ensure_repo("repo-2").unwrap();
2142 store.ensure_repo("repo-1").unwrap(); let repos = store.list_repos().unwrap();
2145 assert_eq!(repos.len(), 2);
2146 }
2147
2148 #[test]
2149 fn test_webhook_crud() {
2150 let store = HubStorage::open_in_memory().unwrap();
2151 store.ensure_repo("test-repo").unwrap();
2152
2153 let webhook = Webhook {
2154 id: "wh-1".to_string(),
2155 repo_id: "test-repo".to_string(),
2156 url: "https://example.com/hook".to_string(),
2157 events: vec!["push".to_string(), "branch.create".to_string()],
2158 secret: Some("secret123".to_string()),
2159 created_at: 1000,
2160 active: true,
2161 };
2162 store.create_webhook(&webhook).unwrap();
2163
2164 let listed = store.list_webhooks("test-repo").unwrap();
2165 assert_eq!(listed.len(), 1);
2166 assert_eq!(listed[0].id, "wh-1");
2167 assert_eq!(listed[0].events.len(), 2);
2168
2169 let fetched = store.get_webhook("wh-1").unwrap();
2170 assert_eq!(fetched.url, "https://example.com/hook");
2171 assert_eq!(fetched.secret, Some("secret123".to_string()));
2172 assert!(fetched.active);
2173
2174 assert!(store.list_webhooks("other-repo").unwrap().is_empty());
2175
2176 store.delete_webhook("wh-1").unwrap();
2177 assert!(store.list_webhooks("test-repo").unwrap().is_empty());
2178 }
2179}