Skip to main content

suture_hub/
storage.rs

1//! SQLite-backed persistent storage for the Suture Hub.
2//!
3//! Stores repositories, patches, branches, blobs, and authorized public keys
4//! in a single SQLite database. This replaces the in-memory HashMap approach.
5
6use sha2::Digest;
7
8use rusqlite::{Connection, params};
9use std::path::Path;
10use thiserror::Error;
11
12use crate::types::{BlobRef, BranchProto, HashProto, PatchProto, UserInfo};
13use crate::webhooks::Webhook;
14
15#[derive(Error, Debug)]
16pub enum StorageError {
17    #[error("database error: {0}")]
18    Database(#[from] rusqlite::Error),
19
20    #[error("I/O error: {0}")]
21    Io(#[from] std::io::Error),
22
23    #[error("repo not found: {0}")]
24    RepoNotFound(String),
25
26    #[error("base64 error: {0}")]
27    Base64(String),
28
29    #[error("lock poisoned: {0}")]
30    PoisonedLock(String),
31
32    #[error("webhook not found: {0}")]
33    WebhookNotFound(String),
34
35    #[error("{0}")]
36    Custom(String),
37
38    #[error("blob exceeds maximum allowed size of {0} bytes")]
39    BlobTooLarge(usize),
40}
41
42/// Persistent SQLite storage for the hub.
43///
44/// The SQLite connection is wrapped in a `std::sync::Mutex` to make this type
45/// `Send + Sync` without requiring `unsafe impl`. All methods acquire the
46/// mutex lock internally. The outer synchronization (tokio::sync::RwLock in
47/// server.rs) provides async-compatible locking; this inner mutex satisfies
48/// the Rust type system's thread-safety requirements.
49pub struct HubStorage {
50    conn: std::sync::Mutex<Connection>,
51    max_blob_size: usize,
52    max_page_size: usize,
53}
54
55/// Mirror row from DB: (repo_name, upstream_url, upstream_repo, last_sync, status)
56type MirrorRow = (String, String, String, Option<i64>, String);
57
58/// Mirror list row from DB: (id, repo_name, upstream_url, upstream_repo, last_sync, status)
59type MirrorListRow = (i64, String, String, String, Option<i64>, String);
60
61impl HubStorage {
62    /// Open or create the hub database at the given path.
63    pub fn open(path: &Path) -> Result<Self, StorageError> {
64        Self::open_with_limits(path, 50 * 1024 * 1024, 10000)
65    }
66
67    /// Open or create the hub database with custom limits.
68    pub fn open_with_limits(
69        path: &Path,
70        max_blob_size: usize,
71        max_page_size: usize,
72    ) -> Result<Self, StorageError> {
73        if let Some(parent) = path.parent() {
74            std::fs::create_dir_all(parent)?;
75        }
76        let conn = Connection::open(path)?;
77        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
78        let mut store = Self {
79            conn: std::sync::Mutex::new(conn),
80            max_blob_size,
81            max_page_size,
82        };
83        store.migrate()?;
84        Ok(store)
85    }
86
87    /// Open an in-memory database (for testing).
88    pub fn open_in_memory() -> Result<Self, StorageError> {
89        Self::open_in_memory_with_limits(50 * 1024 * 1024, 10000)
90    }
91
92    /// Open an in-memory database with custom limits.
93    pub fn open_in_memory_with_limits(
94        max_blob_size: usize,
95        max_page_size: usize,
96    ) -> Result<Self, StorageError> {
97        let conn = Connection::open_in_memory()?;
98        conn.execute_batch("PRAGMA journal_mode=WAL;")?;
99        let mut store = Self {
100            conn: std::sync::Mutex::new(conn),
101            max_blob_size,
102            max_page_size,
103        };
104        store.migrate()?;
105        Ok(store)
106    }
107
108    fn migrate(&mut self) -> Result<(), StorageError> {
109        let conn = self
110            .conn
111            .get_mut()
112            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
113        conn.execute_batch(
114            "CREATE TABLE IF NOT EXISTS repos (
115                repo_id TEXT PRIMARY KEY,
116                created_at TEXT NOT NULL DEFAULT (datetime('now'))
117            );
118
119            CREATE TABLE IF NOT EXISTS patches (
120                repo_id TEXT NOT NULL,
121                patch_id TEXT NOT NULL,
122                operation_type TEXT NOT NULL,
123                touch_set TEXT NOT NULL,
124                target_path TEXT,
125                payload TEXT NOT NULL,
126                parent_ids TEXT NOT NULL,
127                author TEXT NOT NULL,
128                message TEXT NOT NULL,
129                timestamp INTEGER NOT NULL,
130                PRIMARY KEY (repo_id, patch_id)
131            );
132
133            CREATE TABLE IF NOT EXISTS branches (
134                repo_id TEXT NOT NULL,
135                name TEXT NOT NULL,
136                target_patch_id TEXT NOT NULL,
137                PRIMARY KEY (repo_id, name)
138            );
139
140            CREATE TABLE IF NOT EXISTS blobs (
141                repo_id TEXT NOT NULL,
142                blob_hash TEXT NOT NULL,
143                data BLOB NOT NULL,
144                PRIMARY KEY (repo_id, blob_hash)
145            );
146
147            CREATE TABLE IF NOT EXISTS authorized_keys (
148                author TEXT PRIMARY KEY,
149                public_key BLOB NOT NULL,
150                added_at TEXT NOT NULL DEFAULT (datetime('now'))
151            );
152
153            CREATE TABLE IF NOT EXISTS tokens (
154                token TEXT PRIMARY KEY,
155                created_at INTEGER NOT NULL,
156                description TEXT,
157                expires_at INTEGER NOT NULL
158            );
159
160            CREATE TABLE IF NOT EXISTS branch_protection (
161                repo_id TEXT NOT NULL,
162                branch_name TEXT NOT NULL,
163                PRIMARY KEY (repo_id, branch_name)
164            );
165
166            CREATE TABLE IF NOT EXISTS mirrors (
167                id INTEGER PRIMARY KEY,
168                repo_name TEXT NOT NULL,
169                upstream_url TEXT NOT NULL,
170                upstream_repo TEXT NOT NULL,
171                last_sync INTEGER,
172                status TEXT DEFAULT 'idle'
173            );
174
175            CREATE TABLE IF NOT EXISTS users (
176                username TEXT PRIMARY KEY,
177                display_name TEXT NOT NULL,
178                role TEXT NOT NULL DEFAULT 'member',
179                api_token TEXT UNIQUE,
180                created_at INTEGER NOT NULL
181            );
182
183            CREATE INDEX IF NOT EXISTS idx_patches_repo ON patches(repo_id);
184            CREATE INDEX IF NOT EXISTS idx_branches_repo ON branches(repo_id);
185            CREATE INDEX IF NOT EXISTS idx_blobs_repo ON blobs(repo_id);
186            CREATE INDEX IF NOT EXISTS idx_mirrors_repo ON mirrors(repo_name);
187
188            CREATE TABLE IF NOT EXISTS replication_peers (
189                id INTEGER PRIMARY KEY AUTOINCREMENT,
190                peer_url TEXT NOT NULL UNIQUE,
191                role TEXT NOT NULL DEFAULT 'follower',
192                last_sync_seq INTEGER DEFAULT 0,
193                status TEXT NOT NULL DEFAULT 'active',
194                added_at INTEGER NOT NULL
195            );
196
197            CREATE TABLE IF NOT EXISTS replication_log (
198                seq INTEGER PRIMARY KEY AUTOINCREMENT,
199                operation TEXT NOT NULL,
200                table_name TEXT NOT NULL,
201                row_id TEXT NOT NULL,
202                data TEXT,
203                timestamp INTEGER NOT NULL
204            );
205
206            CREATE TABLE IF NOT EXISTS webhooks (
207                id TEXT PRIMARY KEY,
208                repo_id TEXT NOT NULL,
209                url TEXT NOT NULL,
210                events TEXT NOT NULL,
211                secret TEXT,
212                created_at INTEGER NOT NULL,
213                active INTEGER NOT NULL DEFAULT 1,
214                FOREIGN KEY (repo_id) REFERENCES repos(repo_id)
215            );
216
217            CREATE INDEX IF NOT EXISTS idx_webhooks_repo ON webhooks(repo_id);
218
219            CREATE TABLE IF NOT EXISTS sso_providers (
220                provider_name TEXT PRIMARY KEY,
221                config_json TEXT NOT NULL,
222                created_at TEXT NOT NULL DEFAULT (datetime('now')),
223                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
224            );
225
226            CREATE TABLE IF NOT EXISTS audit_log (
227                id INTEGER PRIMARY KEY AUTOINCREMENT,
228                timestamp TEXT NOT NULL DEFAULT (datetime('now')),
229                actor TEXT NOT NULL DEFAULT '',
230                action TEXT NOT NULL,
231                resource_type TEXT NOT NULL DEFAULT '',
232                resource_id TEXT NOT NULL DEFAULT '',
233                status TEXT NOT NULL DEFAULT 'success',
234                details TEXT NOT NULL DEFAULT '',
235                request_id TEXT NOT NULL DEFAULT '',
236                client_ip TEXT NOT NULL DEFAULT ''
237            );
238            CREATE INDEX IF NOT EXISTS idx_audit_log_timestamp ON audit_log(timestamp);
239            CREATE INDEX IF NOT EXISTS idx_audit_log_actor ON audit_log(actor);
240            CREATE INDEX IF NOT EXISTS idx_audit_log_action ON audit_log(action);
241
242            CREATE TABLE IF NOT EXISTS sso_states (
243                state TEXT PRIMARY KEY,
244                provider_name TEXT NOT NULL,
245                nonce TEXT NOT NULL,
246                created_at INTEGER NOT NULL
247            );
248            CREATE INDEX IF NOT EXISTS idx_sso_states_created ON sso_states(created_at);
249
250            CREATE TABLE IF NOT EXISTS sso_user_mappings (
251                provider_name TEXT NOT NULL,
252                provider_sub TEXT NOT NULL,
253                username TEXT NOT NULL,
254                linked_at INTEGER NOT NULL,
255                PRIMARY KEY (provider_name, provider_sub),
256                FOREIGN KEY (username) REFERENCES users(username)
257            );
258            ",
259        )?;
260
261        let has_expires: bool = conn.query_row(
262            "SELECT COUNT(*) > 0 FROM pragma_table_info('tokens') WHERE name = 'expires_at'",
263            [],
264            |row| row.get(0),
265        )?;
266
267        if !has_expires {
268            let now = std::time::SystemTime::now()
269                .duration_since(std::time::UNIX_EPOCH)
270                .unwrap_or_default()
271                .as_secs() as i64;
272            let default_expiry = now + (30 * 24 * 60 * 60);
273            conn.execute_batch(
274                "ALTER TABLE tokens ADD COLUMN expires_at INTEGER NOT NULL DEFAULT 0;",
275            )?;
276            conn.execute(
277                "UPDATE tokens SET expires_at = ?1 WHERE expires_at = 0",
278                params![default_expiry],
279            )?;
280        }
281
282        Ok(())
283    }
284
285    // === Repos ===
286
287    /// Ensure a repo exists. Returns true if it was newly created.
288    pub fn ensure_repo(&self, repo_id: &str) -> Result<bool, StorageError> {
289        let conn = self
290            .conn
291            .lock()
292            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
293        conn.execute(
294            "INSERT OR IGNORE INTO repos (repo_id) VALUES (?1)",
295            params![repo_id],
296        )?;
297        Ok(conn.changes() > 0)
298    }
299
300    /// List all repo IDs.
301    pub fn list_repos(&self) -> Result<Vec<String>, StorageError> {
302        let conn = self
303            .conn
304            .lock()
305            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
306        let mut stmt = conn.prepare("SELECT repo_id FROM repos ORDER BY repo_id")?;
307        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
308        let mut ids = Vec::new();
309        for row in rows {
310            ids.push(row?);
311        }
312        Ok(ids)
313    }
314
315    /// Check if a repo exists.
316    pub fn repo_exists(&self, repo_id: &str) -> Result<bool, StorageError> {
317        let conn = self
318            .conn
319            .lock()
320            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
321        let count: i64 = conn.query_row(
322            "SELECT COUNT(*) FROM repos WHERE repo_id = ?1",
323            params![repo_id],
324            |row| row.get(0),
325        )?;
326        Ok(count > 0)
327    }
328
329    // === Patches ===
330
331    /// Store a patch if it doesn't already exist. Returns true if newly inserted.
332    pub fn insert_patch(&self, repo_id: &str, patch: &PatchProto) -> Result<bool, StorageError> {
333        let id_hex = hash_to_hex(&patch.id);
334        let touch_set_json = serde_json::to_string(&patch.touch_set).unwrap_or_default();
335        let parent_ids_json = serde_json::to_string(
336            &patch
337                .parent_ids
338                .iter()
339                .map(|h| &h.value)
340                .collect::<Vec<_>>(),
341        )
342        .unwrap_or_default();
343
344        let conn = self
345            .conn
346            .lock()
347            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
348        conn.execute(
349            "INSERT OR IGNORE INTO patches (repo_id, patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp)
350             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
351            params![
352                repo_id,
353                id_hex,
354                patch.operation_type,
355                touch_set_json,
356                patch.target_path,
357                patch.payload,
358                parent_ids_json,
359                patch.author,
360                patch.message,
361                patch.timestamp as i64,
362            ],
363        )?;
364        Ok(conn.changes() > 0)
365    }
366
367    /// Get a patch by ID within a repo.
368    pub fn get_patch(
369        &self,
370        repo_id: &str,
371        patch_id_hex: &str,
372    ) -> Result<Option<PatchProto>, StorageError> {
373        let conn = self
374            .conn
375            .lock()
376            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
377        let result = conn.query_row(
378            "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
379             FROM patches WHERE repo_id = ?1 AND patch_id = ?2",
380            params![repo_id, patch_id_hex],
381            |row| {
382                let id_hex: String = row.get(0)?;
383                let operation_type: String = row.get(1)?;
384                let touch_set_json: String = row.get(2)?;
385                let target_path: Option<String> = row.get(3)?;
386                let payload: String = row.get(4)?;
387                let parent_ids_json: String = row.get(5)?;
388                let author: String = row.get(6)?;
389                let message: String = row.get(7)?;
390                let timestamp: i64 = row.get(8)?;
391
392                let touch_set: Vec<String> =
393                    serde_json::from_str(&touch_set_json).unwrap_or_default();
394                let parent_ids: Vec<String> =
395                    serde_json::from_str(&parent_ids_json).unwrap_or_default();
396
397                Ok(PatchProto {
398                    id: HashProto { value: id_hex },
399                    operation_type,
400                    touch_set,
401                    target_path,
402                    payload,
403                    parent_ids: parent_ids
404                        .into_iter()
405                        .map(|h| HashProto { value: h })
406                        .collect(),
407                    author,
408                    message,
409                    timestamp: timestamp as u64,
410                })
411            },
412        );
413
414        match result {
415            Ok(patch) => Ok(Some(patch)),
416            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
417            Err(e) => Err(StorageError::Database(e)),
418        }
419    }
420
421    /// Get patches for a repo with pagination support.
422    /// Returns at most `limit` patches starting from `offset`, ordered by timestamp.
423    pub fn get_all_patches(
424        &self,
425        repo_id: &str,
426        offset: usize,
427        limit: usize,
428    ) -> Result<Vec<PatchProto>, StorageError> {
429        let effective_limit = limit.min(self.max_page_size).max(1);
430        let conn = self
431            .conn
432            .lock()
433            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
434        let mut stmt = conn.prepare(
435            "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
436             FROM patches WHERE repo_id = ?1 ORDER BY timestamp ASC, patch_id ASC LIMIT ?2 OFFSET ?3",
437        )?;
438        let rows = stmt.query_map(params![repo_id, effective_limit as i64, offset as i64], |row| {
439            let id_hex: String = row.get(0)?;
440            let operation_type: String = row.get(1)?;
441            let touch_set_json: String = row.get(2)?;
442            let target_path: Option<String> = row.get(3)?;
443            let payload: String = row.get(4)?;
444            let parent_ids_json: String = row.get(5)?;
445            let author: String = row.get(6)?;
446            let message: String = row.get(7)?;
447            let timestamp: i64 = row.get(8)?;
448
449            let touch_set: Vec<String> = serde_json::from_str(&touch_set_json).unwrap_or_default();
450            let parent_ids: Vec<String> =
451                serde_json::from_str(&parent_ids_json).unwrap_or_default();
452
453            Ok(PatchProto {
454                id: HashProto { value: id_hex },
455                operation_type,
456                touch_set,
457                target_path,
458                payload,
459                parent_ids: parent_ids
460                    .into_iter()
461                    .map(|h| HashProto { value: h })
462                    .collect(),
463                author,
464                message,
465                timestamp: timestamp as u64,
466            })
467        })?;
468
469        let mut patches = Vec::new();
470        for row in rows {
471            patches.push(row?);
472        }
473        Ok(patches)
474    }
475
476    /// Get all patches for a repo without pagination limit.
477    /// Used internally by push/pull handlers that need the full patch set.
478    /// Prefer `get_all_patches()` with pagination for user-facing APIs.
479    pub fn get_all_patches_unbounded(&self, repo_id: &str) -> Result<Vec<PatchProto>, StorageError> {
480        let conn = self
481            .conn
482            .lock()
483            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
484        let mut stmt = conn.prepare(
485            "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
486             FROM patches WHERE repo_id = ?1 ORDER BY timestamp ASC, patch_id ASC",
487        )?;
488        let rows = stmt.query_map(params![repo_id], |row| {
489            let id_hex: String = row.get(0)?;
490            let operation_type: String = row.get(1)?;
491            let touch_set_json: String = row.get(2)?;
492            let target_path: Option<String> = row.get(3)?;
493            let payload: String = row.get(4)?;
494            let parent_ids_json: String = row.get(5)?;
495            let author: String = row.get(6)?;
496            let message: String = row.get(7)?;
497            let timestamp: i64 = row.get(8)?;
498
499            let touch_set: Vec<String> =
500                serde_json::from_str(&touch_set_json).unwrap_or_default();
501            let parent_ids: Vec<String> =
502                serde_json::from_str(&parent_ids_json).unwrap_or_default();
503
504            Ok(PatchProto {
505                id: HashProto { value: id_hex },
506                operation_type,
507                touch_set,
508                target_path,
509                payload,
510                parent_ids: parent_ids
511                    .into_iter()
512                    .map(|h| HashProto { value: h })
513                    .collect(),
514                author,
515                message,
516                timestamp: timestamp as u64,
517            })
518        })?;
519
520        let mut patches = Vec::new();
521        for row in rows {
522            patches.push(row?);
523        }
524        Ok(patches)
525    }
526
527    /// Count patches in a repo.
528    pub fn patch_count(&self, repo_id: &str) -> Result<u64, StorageError> {
529        let conn = self
530            .conn
531            .lock()
532            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
533        let count: i64 = conn.query_row(
534            "SELECT COUNT(*) FROM patches WHERE repo_id = ?1",
535            params![repo_id],
536            |row| row.get(0),
537        )?;
538        Ok(count as u64)
539    }
540
541    // === Branches ===
542
543    /// Set a branch pointer.
544    pub fn set_branch(
545        &self,
546        repo_id: &str,
547        name: &str,
548        target_patch_id: &str,
549    ) -> Result<(), StorageError> {
550        let conn = self
551            .conn
552            .lock()
553            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
554        conn.execute(
555            "INSERT OR REPLACE INTO branches (repo_id, name, target_patch_id) VALUES (?1, ?2, ?3)",
556            params![repo_id, name, target_patch_id],
557        )?;
558        Ok(())
559    }
560
561    /// Get all branches for a repo.
562    pub fn get_branches(&self, repo_id: &str) -> Result<Vec<BranchProto>, StorageError> {
563        let conn = self
564            .conn
565            .lock()
566            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
567        let mut stmt = conn.prepare(
568            "SELECT name, target_patch_id FROM branches WHERE repo_id = ?1 ORDER BY name",
569        )?;
570
571        let rows = stmt.query_map(params![repo_id], |row| {
572            let name: String = row.get(0)?;
573            let target_hex: String = row.get(1)?;
574            Ok((name, target_hex))
575        })?;
576
577        let mut branches = Vec::new();
578        for row in rows {
579            let (name, target_hex) = row?;
580            branches.push(BranchProto {
581                name,
582                target_id: HashProto { value: target_hex },
583            });
584        }
585        Ok(branches)
586    }
587
588    // === Blobs ===
589
590    /// Store a blob. Overwrites if exists (content-addressed, idempotent).
591    pub fn store_blob(
592        &self,
593        repo_id: &str,
594        hash_hex: &str,
595        data: &[u8],
596    ) -> Result<(), StorageError> {
597        if data.len() > self.max_blob_size {
598            return Err(StorageError::BlobTooLarge(self.max_blob_size));
599        }
600        let conn = self
601            .conn
602            .lock()
603            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
604        conn.execute(
605            "INSERT OR REPLACE INTO blobs (repo_id, blob_hash, data) VALUES (?1, ?2, ?3)",
606            params![repo_id, hash_hex, data],
607        )?;
608        Ok(())
609    }
610
611    pub fn delete_blob(&self, repo_id: &str, hash_hex: &str) -> Result<(), StorageError> {
612        let conn = self
613            .conn
614            .lock()
615            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
616        conn.execute(
617            "DELETE FROM blobs WHERE repo_id = ?1 AND blob_hash = ?2",
618            params![repo_id, hash_hex],
619        )?;
620        Ok(())
621    }
622
623    /// Get a blob by hash.
624    pub fn get_blob(&self, repo_id: &str, hash_hex: &str) -> Result<Option<Vec<u8>>, StorageError> {
625        let conn = self
626            .conn
627            .lock()
628            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
629        let result = conn.query_row(
630            "SELECT data FROM blobs WHERE repo_id = ?1 AND blob_hash = ?2",
631            params![repo_id, hash_hex],
632            |row| row.get::<_, Vec<u8>>(0),
633        );
634
635        match result {
636            Ok(data) => Ok(Some(data)),
637            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
638            Err(e) => Err(StorageError::Database(e)),
639        }
640    }
641
642    /// Get all blobs for a repo. Blobs exceeding `max_blob_size` are returned
643    /// with empty data and `truncated: true`.
644    pub fn get_all_blobs(&self, repo_id: &str) -> Result<Vec<BlobRef>, StorageError> {
645        let conn = self
646            .conn
647            .lock()
648            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
649        let mut stmt = conn.prepare("SELECT blob_hash, data FROM blobs WHERE repo_id = ?1")?;
650
651        let rows = stmt.query_map(params![repo_id], |row| {
652            let hash_hex: String = row.get(0)?;
653            let data: Vec<u8> = row.get(1)?;
654            Ok((hash_hex, data))
655        })?;
656
657        let max_blob_size = self.max_blob_size;
658        let mut blobs = Vec::new();
659        for row in rows {
660            let (hash_hex, data) = row?;
661            let (data_b64, truncated) = if data.len() > max_blob_size {
662                (String::new(), true)
663            } else {
664                (base64_encode(&data), false)
665            };
666            blobs.push(BlobRef {
667                hash: HashProto { value: hash_hex },
668                data: data_b64,
669                truncated,
670            });
671        }
672        Ok(blobs)
673    }
674
675    /// Get specific blobs by hash set.
676    pub fn get_blobs(
677        &self,
678        repo_id: &str,
679        hashes: &std::collections::HashSet<String>,
680    ) -> Result<Vec<BlobRef>, StorageError> {
681        if hashes.is_empty() {
682            return Ok(vec![]);
683        }
684
685        let placeholders: Vec<String> = hashes.iter().map(|_| "?".to_owned()).collect();
686        let sql = format!(
687            "SELECT blob_hash, data FROM blobs WHERE repo_id = ?1 AND blob_hash IN ({})",
688            placeholders.join(",")
689        );
690
691        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
692        params_vec.push(Box::new(repo_id.to_owned()));
693        for h in hashes {
694            params_vec.push(Box::new(h.clone()));
695        }
696        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
697            params_vec.iter().map(std::convert::AsRef::as_ref).collect();
698
699        let conn = self
700            .conn
701            .lock()
702            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
703        let mut stmt = conn.prepare(&sql)?;
704        let rows = stmt.query_map(param_refs.as_slice(), |row| {
705            let hash_hex: String = row.get(0)?;
706            let data: Vec<u8> = row.get(1)?;
707            Ok((hash_hex, data))
708        })?;
709
710        let mut blobs = Vec::new();
711        for row in rows {
712            let (hash_hex, data) = row?;
713            let (data_b64, truncated) = if data.len() > self.max_blob_size {
714                (String::new(), true)
715            } else {
716                (base64_encode(&data), false)
717            };
718            blobs.push(BlobRef {
719                hash: HashProto { value: hash_hex },
720                data: data_b64,
721                truncated,
722            });
723        }
724        Ok(blobs)
725    }
726
727    /// Get the target patch ID for a specific branch, if it exists.
728    pub fn get_branch_target(
729        &self,
730        repo_id: &str,
731        branch_name: &str,
732    ) -> Result<Option<String>, StorageError> {
733        let conn = self
734            .conn
735            .lock()
736            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
737        let result = conn.query_row(
738            "SELECT target_patch_id FROM branches WHERE repo_id = ?1 AND name = ?2",
739            params![repo_id, branch_name],
740            |row| row.get::<_, String>(0),
741        );
742        match result {
743            Ok(hex) => Ok(Some(hex)),
744            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
745            Err(e) => Err(StorageError::Database(e)),
746        }
747    }
748
749    /// Check if `ancestor_id` is an ancestor of `descendant_id` using a recursive CTE.
750    /// Replaces the old N+1 per-hop approach with a single SQL query.
751    pub fn is_ancestor(
752        &self,
753        repo_id: &str,
754        ancestor_id: &str,
755        descendant_id: &str,
756    ) -> Result<bool, StorageError> {
757        if ancestor_id == descendant_id {
758            return Ok(true);
759        }
760
761        let conn = self
762            .conn
763            .lock()
764            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
765
766        // SQLite recursive CTEs require parent_id to be a scalar column,
767        // but our schema stores parent_ids as a JSON array. Use batched
768        // application-level BFS instead.
769        drop(conn);
770
771        // Batch approach: load all reachable patches in batches
772        self.is_ancestor_batched(repo_id, ancestor_id, descendant_id)
773    }
774
775    /// Batched ancestor check: loads patches in chunks to minimize SQL round-trips.
776    fn is_ancestor_batched(
777        &self,
778        repo_id: &str,
779        ancestor_id: &str,
780        descendant_id: &str,
781    ) -> Result<bool, StorageError> {
782        let mut visited = std::collections::HashSet::new();
783        let mut frontier = vec![descendant_id.to_owned()];
784
785        while !frontier.is_empty() {
786            // Deduplicate frontier
787            frontier.sort();
788            frontier.dedup();
789            frontier.retain(|id| visited.insert(id.clone()));
790
791            if frontier.is_empty() {
792                break;
793            }
794
795            // Load all patches in this batch in a single query
796            let patches = self.get_patches_batch(repo_id, &frontier)?;
797            frontier.clear();
798
799            for patch in patches.values() {
800                for parent in &patch.parent_ids {
801                    let parent_hex = &parent.value;
802                    if parent_hex == ancestor_id {
803                        return Ok(true);
804                    }
805                    if !visited.contains(parent_hex) {
806                        frontier.push(parent_hex.clone());
807                    }
808                }
809            }
810
811            // Safety: limit traversal depth to prevent pathological cases
812            if visited.len() > 100_000 {
813                return Ok(false);
814            }
815        }
816        Ok(false)
817    }
818
819    /// Fetch multiple patches by ID in a single SQL query.
820    /// Returns a HashMap keyed by patch_id for O(1) lookup.
821    fn get_patches_batch(
822        &self,
823        repo_id: &str,
824        ids: &[String],
825    ) -> Result<std::collections::HashMap<String, PatchProto>, StorageError> {
826        if ids.is_empty() {
827            return Ok(std::collections::HashMap::new());
828        }
829
830        let conn = self
831            .conn
832            .lock()
833            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
834
835        // Build a query with parameterized IN clause
836        let placeholders: Vec<String> = ids.iter().enumerate().map(|(i, _)| format!("?{}", i + 2)).collect();
837        let sql = format!(
838            "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
839             FROM patches WHERE repo_id = ?1 AND patch_id IN ({})",
840            placeholders.join(", ")
841        );
842
843        let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(repo_id.to_owned())];
844        for id in ids {
845            params.push(Box::new(id.clone()));
846        }
847        let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
848
849        let mut stmt = conn.prepare(&sql)?;
850        let rows = stmt.query_map(param_refs.as_slice(), |row| {
851            let id_hex: String = row.get(0)?;
852            let operation_type: String = row.get(1)?;
853            let touch_set_json: String = row.get(2)?;
854            let target_path: Option<String> = row.get(3)?;
855            let payload: String = row.get(4)?;
856            let parent_ids_json: String = row.get(5)?;
857            let author: String = row.get(6)?;
858            let message: String = row.get(7)?;
859            let timestamp: i64 = row.get(8)?;
860
861            let touch_set: Vec<String> =
862                serde_json::from_str(&touch_set_json).unwrap_or_default();
863            let parent_ids: Vec<String> =
864                serde_json::from_str(&parent_ids_json).unwrap_or_default();
865
866            Ok((id_hex.clone(), PatchProto {
867                id: HashProto { value: id_hex },
868                operation_type,
869                touch_set,
870                target_path,
871                payload,
872                parent_ids: parent_ids
873                    .into_iter()
874                    .map(|h| HashProto { value: h })
875                    .collect(),
876                author,
877                message,
878                timestamp: timestamp as u64,
879            }))
880        })?;
881
882        let mut result = std::collections::HashMap::with_capacity(ids.len());
883        for row in rows {
884            let (id_hex, patch) = row?;
885            result.insert(id_hex, patch);
886        }
887        Ok(result)
888    }
889
890    // === Branch Protection ===
891
892    pub fn protect_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
893        let conn = self
894            .conn
895            .lock()
896            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
897        conn.execute(
898            "INSERT OR IGNORE INTO branch_protection (repo_id, branch_name) VALUES (?1, ?2)",
899            params![repo_id, branch_name],
900        )?;
901        Ok(())
902    }
903
904    pub fn unprotect_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
905        let conn = self
906            .conn
907            .lock()
908            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
909        conn.execute(
910            "DELETE FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
911            params![repo_id, branch_name],
912        )?;
913        Ok(())
914    }
915
916    pub fn is_branch_protected(
917        &self,
918        repo_id: &str,
919        branch_name: &str,
920    ) -> Result<bool, StorageError> {
921        let conn = self
922            .conn
923            .lock()
924            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
925        let count: i64 = conn.query_row(
926            "SELECT COUNT(*) FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
927            params![repo_id, branch_name],
928            |row| row.get(0),
929        )?;
930        Ok(count > 0)
931    }
932
933    // === Authorized Keys ===
934
935    /// Add an authorized public key for an author.
936    pub fn add_authorized_key(
937        &self,
938        author: &str,
939        public_key_bytes: &[u8],
940    ) -> Result<(), StorageError> {
941        let conn = self
942            .conn
943            .lock()
944            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
945        conn.execute(
946            "INSERT OR REPLACE INTO authorized_keys (author, public_key) VALUES (?1, ?2)",
947            params![author, public_key_bytes],
948        )?;
949        Ok(())
950    }
951
952    /// Get the public key for an author.
953    pub fn get_authorized_key(&self, author: &str) -> Result<Option<Vec<u8>>, StorageError> {
954        let conn = self
955            .conn
956            .lock()
957            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
958        let result = conn.query_row(
959            "SELECT public_key FROM authorized_keys WHERE author = ?1",
960            params![author],
961            |row| row.get::<_, Vec<u8>>(0),
962        );
963
964        match result {
965            Ok(bytes) => Ok(Some(bytes)),
966            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
967            Err(e) => Err(StorageError::Database(e)),
968        }
969    }
970
971    /// Check if any authorized keys exist (for auth-required vs auth-optional mode).
972    pub fn has_authorized_keys(&self) -> Result<bool, StorageError> {
973        let conn = self
974            .conn
975            .lock()
976            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
977        let count: i64 =
978            conn.query_row("SELECT COUNT(*) FROM authorized_keys", [], |row| row.get(0))?;
979        Ok(count > 0)
980    }
981
982    // === Tokens ===
983
984    pub fn store_token(
985        &self,
986        token: &str,
987        created_at: u64,
988        description: &str,
989        expires_at: i64,
990    ) -> Result<(), StorageError> {
991        let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
992        let conn = self
993            .conn
994            .lock()
995            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
996        conn.execute(
997            "INSERT INTO tokens (token, created_at, description, expires_at) VALUES (?1, ?2, ?3, ?4)",
998            params![token_hash, created_at as i64, description, expires_at],
999        )?;
1000        Ok(())
1001    }
1002
1003    pub fn verify_token(&self, token: &str) -> Result<bool, StorageError> {
1004        let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
1005        let now = std::time::SystemTime::now()
1006            .duration_since(std::time::UNIX_EPOCH)
1007            .unwrap_or_default()
1008            .as_secs() as i64;
1009        let conn = self
1010            .conn
1011            .lock()
1012            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1013        let count: i64 = conn.query_row(
1014            "SELECT COUNT(*) FROM tokens WHERE token = ?1 AND expires_at > ?2",
1015            params![token_hash, now],
1016            |row| row.get(0),
1017        )?;
1018        Ok(count > 0)
1019    }
1020
1021    pub fn has_tokens(&self) -> Result<bool, StorageError> {
1022        let conn = self
1023            .conn
1024            .lock()
1025            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1026        let count: i64 = conn.query_row("SELECT COUNT(*) FROM tokens", [], |row| row.get(0))?;
1027        Ok(count > 0)
1028    }
1029
1030    pub fn has_users(&self) -> Result<bool, StorageError> {
1031        let conn = self
1032            .conn
1033            .lock()
1034            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1035        let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |row| row.get(0))?;
1036        Ok(count > 0)
1037    }
1038
1039    // === Mirrors ===
1040
1041    pub fn add_mirror(
1042        &self,
1043        repo_name: &str,
1044        upstream_url: &str,
1045        upstream_repo: &str,
1046    ) -> Result<i64, StorageError> {
1047        let conn = self
1048            .conn
1049            .lock()
1050            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1051        conn.execute(
1052            "INSERT INTO mirrors (repo_name, upstream_url, upstream_repo, last_sync, status)
1053             VALUES (?1, ?2, ?3, NULL, 'idle')",
1054            params![repo_name, upstream_url, upstream_repo],
1055        )?;
1056        Ok(conn.last_insert_rowid())
1057    }
1058
1059    pub fn get_mirror(&self, mirror_id: i64) -> Result<Option<MirrorRow>, StorageError> {
1060        let conn = self
1061            .conn
1062            .lock()
1063            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1064        let result = conn.query_row(
1065            "SELECT repo_name, upstream_url, upstream_repo, last_sync, status FROM mirrors WHERE id = ?1",
1066            params![mirror_id],
1067            |row| {
1068                Ok((
1069                    row.get::<_, String>(0)?,
1070                    row.get::<_, String>(1)?,
1071                    row.get::<_, String>(2)?,
1072                    row.get::<_, Option<i64>>(3)?,
1073                    row.get::<_, String>(4)?,
1074                ))
1075            },
1076        );
1077        match result {
1078            Ok(row) => Ok(Some(row)),
1079            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1080            Err(e) => Err(StorageError::Database(e)),
1081        }
1082    }
1083
1084    pub fn update_mirror_status(
1085        &self,
1086        mirror_id: i64,
1087        status: &str,
1088        last_sync: Option<i64>,
1089    ) -> Result<(), StorageError> {
1090        let conn = self
1091            .conn
1092            .lock()
1093            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1094        conn.execute(
1095            "UPDATE mirrors SET status = ?1, last_sync = COALESCE(?2, last_sync) WHERE id = ?3",
1096            params![status, last_sync, mirror_id],
1097        )?;
1098        Ok(())
1099    }
1100
1101    pub fn list_mirrors(&self) -> Result<Vec<MirrorListRow>, StorageError> {
1102        let conn = self
1103            .conn
1104            .lock()
1105            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1106        let mut stmt = conn.prepare(
1107            "SELECT id, repo_name, upstream_url, upstream_repo, last_sync, status FROM mirrors ORDER BY id",
1108        )?;
1109        let rows = stmt.query_map([], |row| {
1110            Ok((
1111                row.get::<_, i64>(0)?,
1112                row.get::<_, String>(1)?,
1113                row.get::<_, String>(2)?,
1114                row.get::<_, String>(3)?,
1115                row.get::<_, Option<i64>>(4)?,
1116                row.get::<_, String>(5)?,
1117            ))
1118        })?;
1119        let mut mirrors = Vec::new();
1120        for row in rows {
1121            mirrors.push(row?);
1122        }
1123        Ok(mirrors)
1124    }
1125
1126    pub fn get_mirror_by_repo(&self, repo_name: &str) -> Result<Option<i64>, StorageError> {
1127        let conn = self
1128            .conn
1129            .lock()
1130            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1131        let result = conn.query_row(
1132            "SELECT id FROM mirrors WHERE repo_name = ?1",
1133            params![repo_name],
1134            |row| row.get::<_, i64>(0),
1135        );
1136        match result {
1137            Ok(id) => Ok(Some(id)),
1138            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1139            Err(e) => Err(StorageError::Database(e)),
1140        }
1141    }
1142
1143    // === Users ===
1144
1145    pub fn create_user(
1146        &self,
1147        username: &str,
1148        display_name: &str,
1149        role: &str,
1150        api_token: &str,
1151    ) -> Result<(), StorageError> {
1152        let token_hash = format!("{:x}", sha2::Sha256::digest(api_token.as_bytes()));
1153        let created_at = std::time::SystemTime::now()
1154            .duration_since(std::time::UNIX_EPOCH)
1155            .unwrap_or_default()
1156            .as_secs() as i64;
1157        let conn = self
1158            .conn
1159            .lock()
1160            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1161        conn.execute(
1162            "INSERT INTO users (username, display_name, role, api_token, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
1163            params![username, display_name, role, token_hash, created_at],
1164        )?;
1165        Ok(())
1166    }
1167
1168    pub fn get_user(&self, username: &str) -> Result<Option<UserInfo>, StorageError> {
1169        let conn = self
1170            .conn
1171            .lock()
1172            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1173        let result = conn.query_row(
1174            "SELECT username, display_name, role, api_token, created_at FROM users WHERE username = ?1",
1175            params![username],
1176            |row| {
1177                Ok(UserInfo {
1178                    username: row.get(0)?,
1179                    display_name: row.get(1)?,
1180                    role: row.get(2)?,
1181                    api_token: row.get(3)?,
1182                    created_at: row.get(4)?,
1183                })
1184            },
1185        );
1186        match result {
1187            Ok(user) => Ok(Some(user)),
1188            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1189            Err(e) => Err(StorageError::Database(e)),
1190        }
1191    }
1192
1193    pub fn get_user_by_token(&self, token: &str) -> Result<Option<UserInfo>, StorageError> {
1194        let token_hash = format!("{:x}", sha2::Sha256::digest(token.as_bytes()));
1195        let conn = self
1196            .conn
1197            .lock()
1198            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1199        let result = conn.query_row(
1200            "SELECT username, display_name, role, api_token, created_at FROM users WHERE api_token = ?1",
1201            params![token_hash],
1202            |row| {
1203                Ok(UserInfo {
1204                    username: row.get(0)?,
1205                    display_name: row.get(1)?,
1206                    role: row.get(2)?,
1207                    api_token: row.get(3)?,
1208                    created_at: row.get(4)?,
1209                })
1210            },
1211        );
1212        match result {
1213            Ok(user) => Ok(Some(user)),
1214            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1215            Err(e) => Err(StorageError::Database(e)),
1216        }
1217    }
1218
1219    pub fn list_users(&self) -> Result<Vec<UserInfo>, StorageError> {
1220        let conn = self
1221            .conn
1222            .lock()
1223            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1224        let mut stmt = conn.prepare(
1225            "SELECT username, display_name, role, api_token, created_at FROM users ORDER BY username",
1226        )?;
1227        let rows = stmt.query_map([], |row| {
1228            Ok(UserInfo {
1229                username: row.get(0)?,
1230                display_name: row.get(1)?,
1231                role: row.get(2)?,
1232                api_token: row.get(3)?,
1233                created_at: row.get(4)?,
1234            })
1235        })?;
1236        let mut users = Vec::new();
1237        for row in rows {
1238            users.push(row?);
1239        }
1240        Ok(users)
1241    }
1242
1243    pub fn update_user_role(&self, username: &str, role: &str) -> Result<(), StorageError> {
1244        let conn = self
1245            .conn
1246            .lock()
1247            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1248        conn.execute(
1249            "UPDATE users SET role = ?1 WHERE username = ?2",
1250            params![role, username],
1251        )?;
1252        Ok(())
1253    }
1254
1255    pub fn delete_user(&self, username: &str) -> Result<(), StorageError> {
1256        let conn = self
1257            .conn
1258            .lock()
1259            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1260        conn.execute("DELETE FROM users WHERE username = ?1", params![username])?;
1261        Ok(())
1262    }
1263
1264    pub fn delete_repo(&self, repo_id: &str) -> Result<(), StorageError> {
1265        let conn = self
1266            .conn
1267            .lock()
1268            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1269        conn.execute("DELETE FROM patches WHERE repo_id = ?1", params![repo_id])?;
1270        conn.execute("DELETE FROM branches WHERE repo_id = ?1", params![repo_id])?;
1271        conn.execute("DELETE FROM blobs WHERE repo_id = ?1", params![repo_id])?;
1272        conn.execute(
1273            "DELETE FROM branch_protection WHERE repo_id = ?1",
1274            params![repo_id],
1275        )?;
1276        conn.execute("DELETE FROM repos WHERE repo_id = ?1", params![repo_id])?;
1277        Ok(())
1278    }
1279
1280    pub fn delete_branch(&self, repo_id: &str, branch_name: &str) -> Result<(), StorageError> {
1281        let conn = self
1282            .conn
1283            .lock()
1284            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1285        conn.execute(
1286            "DELETE FROM branches WHERE repo_id = ?1 AND name = ?2",
1287            params![repo_id, branch_name],
1288        )?;
1289        conn.execute(
1290            "DELETE FROM branch_protection WHERE repo_id = ?1 AND branch_name = ?2",
1291            params![repo_id, branch_name],
1292        )?;
1293        Ok(())
1294    }
1295
1296    pub fn delete_mirror(&self, mirror_id: i64) -> Result<(), StorageError> {
1297        let conn = self
1298            .conn
1299            .lock()
1300            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1301        conn.execute("DELETE FROM mirrors WHERE id = ?1", params![mirror_id])?;
1302        Ok(())
1303    }
1304
1305    pub fn search_repos(&self, query: &str) -> Result<Vec<String>, StorageError> {
1306        let pattern = format!("%{query}%");
1307        let conn = self
1308            .conn
1309            .lock()
1310            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1311        let mut stmt =
1312            conn.prepare("SELECT repo_id FROM repos WHERE repo_id LIKE ?1 ORDER BY repo_id")?;
1313        let rows = stmt.query_map(params![pattern], |row| row.get::<_, String>(0))?;
1314        let mut ids = Vec::new();
1315        for row in rows {
1316            ids.push(row?);
1317        }
1318        Ok(ids)
1319    }
1320
1321    pub fn get_patches_at(
1322        &self,
1323        repo_id: &str,
1324        tip_id: &str,
1325    ) -> Result<Vec<PatchProto>, StorageError> {
1326        // BFS to discover all reachable patch IDs, then load in batch
1327        let mut visited = std::collections::HashSet::new();
1328        let mut frontier = vec![tip_id.to_owned()];
1329
1330        while !frontier.is_empty() {
1331            frontier.sort();
1332            frontier.dedup();
1333            frontier.retain(|id| visited.insert(id.clone()));
1334            if frontier.is_empty() {
1335                break;
1336            }
1337
1338            let patches = self.get_patches_batch(repo_id, &frontier)?;
1339            frontier.clear();
1340
1341            for patch in patches.values() {
1342                for parent in &patch.parent_ids {
1343                    if !visited.contains(&parent.value) {
1344                        frontier.push(parent.value.clone());
1345                    }
1346                }
1347            }
1348
1349            if visited.len() > 100_000 {
1350                break;
1351            }
1352        }
1353
1354        // Single batch load of all discovered patches
1355        let all_ids: Vec<String> = visited.into_iter().collect();
1356        let patches_map = self.get_patches_batch(repo_id, &all_ids)?;
1357
1358        // Sort deterministically
1359        let mut patches: Vec<PatchProto> = patches_map.into_values().collect();
1360        patches.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.value.cmp(&b.id.value)));
1361        Ok(patches)
1362    }
1363
1364    pub fn get_tree_at_branch(
1365        &self,
1366        repo_id: &str,
1367        branch: &str,
1368    ) -> Result<Vec<crate::types::TreeEntry>, StorageError> {
1369        use crate::types::TreeEntry;
1370
1371        let Some(tip_id) = self.get_branch_target(repo_id, branch)? else { return Ok(Vec::new()) };
1372
1373        let mut patches = self.get_patches_at(repo_id, &tip_id)?;
1374        patches.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.value.cmp(&b.id.value)));
1375
1376        let mut tree: std::collections::HashMap<String, String> = std::collections::HashMap::new();
1377
1378        for patch in &patches {
1379            let path = match &patch.target_path {
1380                Some(p) => p.clone(),
1381                None => continue,
1382            };
1383            match patch.operation_type.as_str() {
1384                "Create" | "Modify" => {
1385                    tree.insert(path, patch.payload.clone());
1386                }
1387                "Delete" => {
1388                    tree.remove(&path);
1389                }
1390                _ => {}
1391            }
1392        }
1393
1394        let mut entries: Vec<TreeEntry> = tree
1395            .into_iter()
1396            .map(|(path, content_hash)| TreeEntry { path, content_hash })
1397            .collect();
1398        entries.sort_by(|a, b| a.path.cmp(&b.path));
1399        Ok(entries)
1400    }
1401
1402    pub fn search_patches(
1403        &self,
1404        repo_id: &str,
1405        query: &str,
1406    ) -> Result<Vec<PatchProto>, StorageError> {
1407        let pattern = format!("%{query}%");
1408        let conn = self
1409            .conn
1410            .lock()
1411            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1412        let mut stmt = conn.prepare(
1413            "SELECT patch_id, operation_type, touch_set, target_path, payload, parent_ids, author, message, timestamp
1414             FROM patches WHERE repo_id = ?1 AND (author LIKE ?2 OR message LIKE ?3) LIMIT 50",
1415        )?;
1416        let rows = stmt.query_map(params![repo_id, pattern, pattern], |row| {
1417            let id_hex: String = row.get(0)?;
1418            let operation_type: String = row.get(1)?;
1419            let touch_set_json: String = row.get(2)?;
1420            let target_path: Option<String> = row.get(3)?;
1421            let payload: String = row.get(4)?;
1422            let parent_ids_json: String = row.get(5)?;
1423            let author: String = row.get(6)?;
1424            let message: String = row.get(7)?;
1425            let timestamp: i64 = row.get(8)?;
1426
1427            let touch_set: Vec<String> = serde_json::from_str(&touch_set_json).unwrap_or_default();
1428            let parent_ids: Vec<String> =
1429                serde_json::from_str(&parent_ids_json).unwrap_or_default();
1430
1431            Ok(PatchProto {
1432                id: HashProto { value: id_hex },
1433                operation_type,
1434                touch_set,
1435                target_path,
1436                payload,
1437                parent_ids: parent_ids
1438                    .into_iter()
1439                    .map(|h| HashProto { value: h })
1440                    .collect(),
1441                author,
1442                message,
1443                timestamp: timestamp as u64,
1444            })
1445        })?;
1446
1447        let mut patches = Vec::new();
1448        for row in rows {
1449            patches.push(row?);
1450        }
1451        Ok(patches)
1452    }
1453
1454    // === Replication ===
1455
1456    pub fn add_replication_peer(&self, peer_url: &str, role: &str) -> Result<i64, StorageError> {
1457        let added_at = std::time::SystemTime::now()
1458            .duration_since(std::time::UNIX_EPOCH)
1459            .unwrap_or_default()
1460            .as_secs() as i64;
1461        let conn = self
1462            .conn
1463            .lock()
1464            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1465        conn.execute(
1466            "INSERT INTO replication_peers (peer_url, role, last_sync_seq, status, added_at) VALUES (?1, ?2, 0, 'active', ?3)",
1467            params![peer_url, role, added_at],
1468        )?;
1469        Ok(conn.last_insert_rowid())
1470    }
1471
1472    pub fn remove_replication_peer(&self, id: i64) -> Result<(), StorageError> {
1473        let conn = self
1474            .conn
1475            .lock()
1476            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1477        conn.execute("DELETE FROM replication_peers WHERE id = ?1", params![id])?;
1478        Ok(())
1479    }
1480
1481    pub fn list_replication_peers(&self) -> Result<Vec<ReplicationPeer>, StorageError> {
1482        let conn = self
1483            .conn
1484            .lock()
1485            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1486        let mut stmt = conn.prepare(
1487            "SELECT id, peer_url, role, last_sync_seq, status, added_at FROM replication_peers ORDER BY id",
1488        )?;
1489        let rows = stmt.query_map([], |row| {
1490            Ok(ReplicationPeer {
1491                id: row.get(0)?,
1492                peer_url: row.get(1)?,
1493                role: row.get(2)?,
1494                last_sync_seq: row.get(3)?,
1495                status: row.get(4)?,
1496                added_at: row.get(5)?,
1497            })
1498        })?;
1499        let mut peers = Vec::new();
1500        for row in rows {
1501            peers.push(row?);
1502        }
1503        Ok(peers)
1504    }
1505
1506    pub fn update_peer_sync_seq(&self, peer_id: i64, seq: i64) -> Result<(), StorageError> {
1507        let conn = self
1508            .conn
1509            .lock()
1510            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1511        conn.execute(
1512            "UPDATE replication_peers SET last_sync_seq = ?1 WHERE id = ?2",
1513            params![seq, peer_id],
1514        )?;
1515        Ok(())
1516    }
1517
1518    pub fn get_replication_peer(&self, id: i64) -> Result<Option<ReplicationPeer>, StorageError> {
1519        let conn = self
1520            .conn
1521            .lock()
1522            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1523        let result = conn.query_row(
1524            "SELECT id, peer_url, role, last_sync_seq, status, added_at FROM replication_peers WHERE id = ?1",
1525            params![id],
1526            |row| {
1527                Ok(ReplicationPeer {
1528                    id: row.get(0)?,
1529                    peer_url: row.get(1)?,
1530                    role: row.get(2)?,
1531                    last_sync_seq: row.get(3)?,
1532                    status: row.get(4)?,
1533                    added_at: row.get(5)?,
1534                })
1535            },
1536        );
1537        match result {
1538            Ok(peer) => Ok(Some(peer)),
1539            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1540            Err(e) => Err(StorageError::Database(e)),
1541        }
1542    }
1543
1544    pub fn log_operation(
1545        &self,
1546        operation: &str,
1547        table_name: &str,
1548        row_id: &str,
1549        data: Option<&str>,
1550    ) -> Result<i64, StorageError> {
1551        let timestamp = std::time::SystemTime::now()
1552            .duration_since(std::time::UNIX_EPOCH)
1553            .unwrap_or_default()
1554            .as_secs() as i64;
1555        let conn = self
1556            .conn
1557            .lock()
1558            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1559        conn.execute(
1560            "INSERT INTO replication_log (operation, table_name, row_id, data, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
1561            params![operation, table_name, row_id, data, timestamp],
1562        )?;
1563        Ok(conn.last_insert_rowid())
1564    }
1565
1566    pub fn get_replication_log(
1567        &self,
1568        since_seq: i64,
1569    ) -> Result<Vec<ReplicationEntry>, StorageError> {
1570        let conn = self
1571            .conn
1572            .lock()
1573            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1574        let mut stmt = conn.prepare(
1575            "SELECT seq, operation, table_name, row_id, data, timestamp FROM replication_log WHERE seq > ?1 ORDER BY seq",
1576        )?;
1577        let rows = stmt.query_map(params![since_seq], |row| {
1578            Ok(ReplicationEntry {
1579                seq: row.get(0)?,
1580                operation: row.get(1)?,
1581                table_name: row.get(2)?,
1582                row_id: row.get(3)?,
1583                data: row.get(4)?,
1584                timestamp: row.get(5)?,
1585            })
1586        })?;
1587        let mut entries = Vec::new();
1588        for row in rows {
1589            entries.push(row?);
1590        }
1591        Ok(entries)
1592    }
1593
1594    pub fn apply_replication_entries(
1595        &self,
1596        entries: &[ReplicationEntry],
1597    ) -> Result<(), StorageError> {
1598        let conn = self
1599            .conn
1600            .lock()
1601            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1602        for entry in entries {
1603            conn.execute(
1604                "INSERT OR IGNORE INTO replication_log (seq, operation, table_name, row_id, data, timestamp) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1605                params![entry.seq, entry.operation, entry.table_name, entry.row_id, entry.data, entry.timestamp],
1606            )?;
1607        }
1608        Ok(())
1609    }
1610
1611    pub fn get_replication_status(&self) -> Result<ReplicationStatus, StorageError> {
1612        let peers = self.list_replication_peers()?;
1613        let conn = self
1614            .conn
1615            .lock()
1616            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1617        let current_seq: i64 = conn.query_row(
1618            "SELECT COALESCE(MAX(seq), 0) FROM replication_log",
1619            [],
1620            |row| row.get(0),
1621        )?;
1622        Ok(ReplicationStatus {
1623            current_seq,
1624            peer_count: peers.len(),
1625            peers,
1626        })
1627    }
1628
1629    // === Webhooks ===
1630
1631    pub fn create_webhook(&self, webhook: &Webhook) -> Result<(), StorageError> {
1632        let events_json = serde_json::to_string(&webhook.events).unwrap_or_default();
1633        let conn = self
1634            .conn
1635            .lock()
1636            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1637        conn.execute(
1638            "INSERT INTO webhooks (id, repo_id, url, events, secret, created_at, active) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1639            params![
1640                webhook.id,
1641                webhook.repo_id,
1642                webhook.url,
1643                events_json,
1644                webhook.secret,
1645                webhook.created_at as i64,
1646                i32::from(webhook.active),
1647            ],
1648        )?;
1649        Ok(())
1650    }
1651
1652    pub fn list_webhooks(&self, repo_id: &str) -> Result<Vec<Webhook>, StorageError> {
1653        let conn = self
1654            .conn
1655            .lock()
1656            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1657        let mut stmt = conn.prepare(
1658            "SELECT id, repo_id, url, events, secret, created_at, active FROM webhooks WHERE repo_id = ?1 ORDER BY created_at",
1659        )?;
1660        let rows = stmt.query_map(params![repo_id], |row| {
1661            let events_json: String = row.get(3)?;
1662            let events: Vec<String> = serde_json::from_str(&events_json).unwrap_or_default();
1663            Ok(Webhook {
1664                id: row.get(0)?,
1665                repo_id: row.get(1)?,
1666                url: row.get(2)?,
1667                events,
1668                secret: row.get(4)?,
1669                created_at: row.get::<_, i64>(5)? as u64,
1670                active: row.get::<_, i32>(6)? != 0,
1671            })
1672        })?;
1673        let mut webhooks = Vec::new();
1674        for row in rows {
1675            webhooks.push(row?);
1676        }
1677        Ok(webhooks)
1678    }
1679
1680    pub fn get_webhook(&self, id: &str) -> Result<Webhook, StorageError> {
1681        let conn = self
1682            .conn
1683            .lock()
1684            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1685        conn.query_row(
1686            "SELECT id, repo_id, url, events, secret, created_at, active FROM webhooks WHERE id = ?1",
1687            params![id],
1688            |row| {
1689                let events_json: String = row.get(3)?;
1690                let events: Vec<String> = serde_json::from_str(&events_json).unwrap_or_default();
1691                Ok(Webhook {
1692                    id: row.get(0)?,
1693                    repo_id: row.get(1)?,
1694                    url: row.get(2)?,
1695                    events,
1696                    secret: row.get(4)?,
1697                    created_at: row.get::<_, i64>(5)? as u64,
1698                    active: row.get::<_, i32>(6)? != 0,
1699                })
1700            },
1701        ).map_err(|e| match e {
1702            rusqlite::Error::QueryReturnedNoRows => StorageError::WebhookNotFound(id.to_owned()),
1703            e => StorageError::Database(e),
1704        })
1705    }
1706
1707    pub fn delete_webhook(&self, id: &str) -> Result<(), StorageError> {
1708        let conn = self
1709            .conn
1710            .lock()
1711            .map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1712        let changes = conn.execute("DELETE FROM webhooks WHERE id = ?1", params![id])?;
1713        if changes == 0 {
1714            return Err(StorageError::WebhookNotFound(id.to_owned()));
1715        }
1716        Ok(())
1717    }
1718
1719    // === SSO / OIDC Configuration ===
1720
1721    /// Store an OIDC provider configuration.
1722    pub fn set_oidc_config(&self, config: &crate::sso::OidcConfig) -> Result<(), StorageError> {
1723        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1724        let json = serde_json::to_string(config)
1725            .map_err(|e| StorageError::PoisonedLock(format!("failed to serialize OIDC config: {e}")))?;
1726        conn.execute(
1727            "INSERT OR REPLACE INTO sso_providers (provider_name, config_json, updated_at) VALUES (?1, ?2, datetime('now'))",
1728            params![config.provider_name, json],
1729        )?;
1730        Ok(())
1731    }
1732
1733    /// Get an OIDC provider configuration by name.
1734    pub fn get_oidc_config(&self, provider_name: &str) -> Result<Option<crate::sso::OidcConfig>, StorageError> {
1735        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1736        let result = conn.query_row(
1737            "SELECT config_json FROM sso_providers WHERE provider_name = ?1",
1738            params![provider_name],
1739            |row| row.get::<_, String>(0),
1740        );
1741        match result {
1742            Ok(json) => {
1743                let config: crate::sso::OidcConfig = serde_json::from_str(&json)
1744                    .map_err(|e| StorageError::PoisonedLock(format!("failed to deserialize OIDC config: {e}")))?;
1745                Ok(Some(config))
1746            }
1747            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1748            Err(e) => Err(StorageError::Database(e)),
1749        }
1750    }
1751
1752    /// List all configured OIDC providers.
1753    pub fn list_oidc_configs(&self) -> Result<Vec<crate::sso::OidcConfig>, StorageError> {
1754        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1755        let mut stmt = conn.prepare("SELECT config_json FROM sso_providers ORDER BY provider_name")?;
1756        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
1757        let mut configs = Vec::new();
1758        for row in rows {
1759            let json = row?;
1760            let config: crate::sso::OidcConfig = serde_json::from_str(&json)
1761                .map_err(|e| StorageError::PoisonedLock(format!("failed to deserialize OIDC config: {e}")))?;
1762            configs.push(config);
1763        }
1764        Ok(configs)
1765    }
1766
1767    /// Delete an OIDC provider configuration.
1768    pub fn delete_oidc_config(&self, provider_name: &str) -> Result<bool, StorageError> {
1769        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1770        let affected = conn.execute(
1771            "DELETE FROM sso_providers WHERE provider_name = ?1",
1772            params![provider_name],
1773        )?;
1774        Ok(affected > 0)
1775    }
1776
1777    // === SSO State Management ===
1778
1779    /// Store an SSO authorization state for CSRF validation.
1780    ///
1781    /// Returns `Err` if the state already exists (unlikely collision).
1782    pub fn store_sso_state(
1783        &self,
1784        state: &str,
1785        provider_name: &str,
1786        nonce: &str,
1787    ) -> Result<(), StorageError> {
1788        let created_at = std::time::SystemTime::now()
1789            .duration_since(std::time::UNIX_EPOCH)
1790            .unwrap_or_default()
1791            .as_secs() as i64;
1792        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1793        conn.execute(
1794            "INSERT INTO sso_states (state, provider_name, nonce, created_at) VALUES (?1, ?2, ?3, ?4)",
1795            params![state, provider_name, nonce, created_at],
1796        )?;
1797        Ok(())
1798    }
1799
1800    /// Consume an SSO authorization state.
1801    ///
1802    /// Returns the stored provider name and nonce if the state is valid.
1803    /// The state is deleted after retrieval (one-time use).
1804    /// Returns `None` if the state does not exist or has expired (10 minutes).
1805    pub fn consume_sso_state(&self, state: &str) -> Result<Option<(String, String)>, StorageError> {
1806        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1807        let result = conn.query_row(
1808            "SELECT provider_name, nonce, created_at FROM sso_states WHERE state = ?1",
1809            params![state],
1810            |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, i64>(2)?)),
1811        );
1812        match result {
1813            Ok((provider_name, nonce, created_at)) => {
1814                // Delete the state (one-time use).
1815                let _ = conn.execute("DELETE FROM sso_states WHERE state = ?1", params![state]);
1816                // Check expiry (10 minutes).
1817                let now = std::time::SystemTime::now()
1818                    .duration_since(std::time::UNIX_EPOCH)
1819                    .unwrap_or_default()
1820                    .as_secs() as i64;
1821                let max_age = 10 * 60; // 10 minutes
1822                if now - created_at > max_age {
1823                    return Ok(None);
1824                }
1825                Ok(Some((provider_name, nonce)))
1826            }
1827            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1828            Err(e) => Err(StorageError::Database(e)),
1829        }
1830    }
1831
1832    /// Clean up expired SSO states older than 10 minutes.
1833    pub fn cleanup_expired_sso_states(&self) -> Result<usize, StorageError> {
1834        let cutoff = std::time::SystemTime::now()
1835            .duration_since(std::time::UNIX_EPOCH)
1836            .unwrap_or_default()
1837            .as_secs() as i64
1838            - (10 * 60);
1839        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1840        let affected = conn.execute(
1841            "DELETE FROM sso_states WHERE created_at < ?1",
1842            params![cutoff],
1843        )?;
1844        Ok(affected)
1845    }
1846
1847    /// Look up a local username by SSO provider + subject.
1848    pub fn get_sso_user_mapping(
1849        &self,
1850        provider_name: &str,
1851        provider_sub: &str,
1852    ) -> Result<Option<String>, StorageError> {
1853        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1854        let result = conn.query_row(
1855            "SELECT username FROM sso_user_mappings WHERE provider_name = ?1 AND provider_sub = ?2",
1856            params![provider_name, provider_sub],
1857            |row| row.get::<_, String>(0),
1858        );
1859        match result {
1860            Ok(username) => Ok(Some(username)),
1861            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1862            Err(e) => Err(StorageError::Database(e)),
1863        }
1864    }
1865
1866    /// Create or update a user from SSO authentication.
1867    ///
1868    /// If a user already exists with the same username, updates their display name.
1869    /// If the user doesn't exist, creates a new one with the "member" role.
1870    /// Also creates/updates the SSO user mapping.
1871    pub fn upsert_sso_user(
1872        &self,
1873        provider_name: &str,
1874        provider_sub: &str,
1875        username: &str,
1876        display_name: &str,
1877    ) -> Result<String, StorageError> {
1878        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1879        let created_at = std::time::SystemTime::now()
1880            .duration_since(std::time::UNIX_EPOCH)
1881            .unwrap_or_default()
1882            .as_secs() as i64;
1883
1884        // Upsert the user (create if not exists, update display name if exists).
1885        conn.execute(
1886            "INSERT INTO users (username, display_name, role, api_token, created_at) VALUES (?1, ?2, 'member', NULL, ?3)
1887             ON CONFLICT(username) DO UPDATE SET display_name = ?2",
1888            params![username, display_name, created_at],
1889        )?;
1890
1891        // Upsert the SSO mapping.
1892        conn.execute(
1893            "INSERT INTO sso_user_mappings (provider_name, provider_sub, username, linked_at) VALUES (?1, ?2, ?3, ?4)
1894             ON CONFLICT(provider_name, provider_sub) DO UPDATE SET username = ?3, linked_at = ?4",
1895            params![provider_name, provider_sub, username, created_at],
1896        )?;
1897
1898        Ok(username.to_owned())
1899    }
1900
1901    /// Look up a local user by SSO provider + email.
1902    ///
1903    /// Falls back to searching by username if the email matches a username.
1904    pub fn get_user_by_email(&self, email: &str) -> Result<Option<UserInfo>, StorageError> {
1905        self.get_user(email)
1906    }
1907
1908    /// Update a user's API token.
1909    pub fn update_user_token(&self, username: &str, token_hash: &str) -> Result<(), StorageError> {
1910        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1911        conn.execute(
1912            "UPDATE users SET api_token = ?1 WHERE username = ?2",
1913            params![token_hash, username],
1914        )?;
1915        Ok(())
1916    }
1917
1918    // === Audit Logging ===
1919
1920    /// Write an audit log entry.
1921    #[allow(clippy::too_many_arguments)]
1922    pub fn write_audit_entry(
1923        &self,
1924        actor: &str,
1925        action: &str,
1926        resource_type: &str,
1927        resource_id: &str,
1928        status: &str,
1929        details: &str,
1930        request_id: &str,
1931        client_ip: &str,
1932    ) -> Result<(), StorageError> {
1933        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1934        conn.execute(
1935            "INSERT INTO audit_log (actor, action, resource_type, resource_id, status, details, request_id, client_ip) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1936            params![actor, action, resource_type, resource_id, status, details, request_id, client_ip],
1937        )?;
1938        Ok(())
1939    }
1940
1941    /// Query audit log entries with optional filters.
1942    pub fn query_audit_log(
1943        &self,
1944        actor: Option<&str>,
1945        action: Option<&str>,
1946        limit: usize,
1947        offset: usize,
1948    ) -> Result<Vec<AuditEntry>, StorageError> {
1949        let conn = self.conn.lock().map_err(|e| StorageError::PoisonedLock(e.to_string()))?;
1950        let effective_limit: i64 = limit.clamp(1, 1000) as i64;
1951        let effective_offset: i64 = offset as i64;
1952
1953        let mut sql = String::from(
1954            "SELECT id, timestamp, actor, action, resource_type, resource_id, status, details, request_id, client_ip FROM audit_log WHERE 1=1",
1955        );
1956        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1957
1958        if let Some(a) = actor {
1959            sql.push_str(" AND actor = ?");
1960            param_values.push(Box::new(a.to_owned()));
1961        }
1962        if let Some(a) = action {
1963            sql.push_str(" AND action = ?");
1964            param_values.push(Box::new(a.to_owned()));
1965        }
1966        sql.push_str(" ORDER BY id DESC LIMIT ? OFFSET ?");
1967        param_values.push(Box::new(effective_limit));
1968        param_values.push(Box::new(effective_offset));
1969
1970        let param_refs: Vec<&dyn rusqlite::types::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
1971
1972        let mut stmt = conn.prepare(&sql)?;
1973        let rows = stmt.query_map(param_refs.as_slice(), |row| {
1974            Ok(AuditEntry {
1975                id: row.get(0)?,
1976                timestamp: row.get(1)?,
1977                actor: row.get(2)?,
1978                action: row.get(3)?,
1979                resource_type: row.get(4)?,
1980                resource_id: row.get(5)?,
1981                status: row.get(6)?,
1982                details: row.get(7)?,
1983                request_id: row.get(8)?,
1984                client_ip: row.get(9)?,
1985            })
1986        })?;
1987
1988        let mut entries = Vec::new();
1989        for row in rows {
1990            entries.push(row?);
1991        }
1992        Ok(entries)
1993    }
1994}
1995
1996#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1997pub struct ReplicationPeer {
1998    pub id: i64,
1999    pub peer_url: String,
2000    pub role: String,
2001    pub last_sync_seq: i64,
2002    pub status: String,
2003    pub added_at: i64,
2004}
2005
2006#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2007pub struct ReplicationEntry {
2008    pub seq: i64,
2009    pub operation: String,
2010    pub table_name: String,
2011    pub row_id: String,
2012    pub data: Option<String>,
2013    pub timestamp: i64,
2014}
2015
2016#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2017pub struct ReplicationStatus {
2018    pub current_seq: i64,
2019    pub peer_count: usize,
2020    pub peers: Vec<ReplicationPeer>,
2021}
2022
2023/// A single audit log entry.
2024#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2025pub struct AuditEntry {
2026    pub id: i64,
2027    pub timestamp: String,
2028    pub actor: String,
2029    pub action: String,
2030    pub resource_type: String,
2031    pub resource_id: String,
2032    pub status: String,
2033    pub details: String,
2034    pub request_id: String,
2035    pub client_ip: String,
2036}
2037
2038fn base64_encode(data: &[u8]) -> String {
2039    use base64::Engine;
2040    base64::engine::general_purpose::STANDARD.encode(data)
2041}
2042
2043fn hash_to_hex(h: &HashProto) -> String {
2044    h.value.clone()
2045}
2046
2047#[cfg(test)]
2048mod tests {
2049    use super::*;
2050    fn make_hash_proto(hex: &str) -> HashProto {
2051        HashProto {
2052            value: hex.to_string(),
2053        }
2054    }
2055
2056    fn make_patch(id_hex: &str, op: &str, parents: &[&str], author: &str) -> PatchProto {
2057        PatchProto {
2058            id: make_hash_proto(id_hex),
2059            operation_type: op.to_string(),
2060            touch_set: vec![format!("file_{id_hex}")],
2061            target_path: Some(format!("file_{id_hex}")),
2062            payload: String::new(),
2063            parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
2064            author: author.to_string(),
2065            message: format!("patch {id_hex}"),
2066            timestamp: 0,
2067        }
2068    }
2069
2070    #[allow(dead_code)]
2071    fn make_branch(name: &str, target: &str) -> BranchProto {
2072        BranchProto {
2073            name: name.to_string(),
2074            target_id: make_hash_proto(target),
2075        }
2076    }
2077
2078    #[test]
2079    fn test_persistence_across_reopen() {
2080        let dir = tempfile::tempdir().unwrap();
2081        let db_path = dir.path().join("hub.db");
2082
2083        // Write
2084        let store = HubStorage::open(&db_path).unwrap();
2085        store.ensure_repo("test-repo").unwrap();
2086        let patch = make_patch(&"a".repeat(64), "Create", &[], "alice");
2087        store.insert_patch("test-repo", &patch).unwrap();
2088        store
2089            .set_branch("test-repo", "main", &"a".repeat(64))
2090            .unwrap();
2091        store
2092            .store_blob("test-repo", &"deadbeef".repeat(8), b"hello")
2093            .unwrap();
2094        drop(store);
2095
2096        // Read back
2097        let store2 = HubStorage::open(&db_path).unwrap();
2098        assert!(store2.repo_exists("test-repo").unwrap());
2099        let all_patches = store2.get_all_patches("test-repo", 0, 10000).unwrap();
2100        assert_eq!(all_patches.len(), 1);
2101        let branches = store2.get_branches("test-repo").unwrap();
2102        assert_eq!(branches.len(), 1);
2103        assert_eq!(branches[0].name, "main");
2104        let blob = store2
2105            .get_blob("test-repo", &"deadbeef".repeat(8))
2106            .unwrap()
2107            .unwrap();
2108        assert_eq!(blob, b"hello");
2109    }
2110
2111    #[test]
2112    fn test_duplicate_patch_ignored() {
2113        let store = HubStorage::open_in_memory().unwrap();
2114        store.ensure_repo("repo").unwrap();
2115        let patch = make_patch(&"a".repeat(64), "Create", &[], "alice");
2116
2117        assert!(store.insert_patch("repo", &patch).unwrap());
2118        assert!(!store.insert_patch("repo", &patch).unwrap());
2119        assert_eq!(store.patch_count("repo").unwrap(), 1);
2120    }
2121
2122    #[test]
2123    fn test_authorized_keys() {
2124        let store = HubStorage::open_in_memory().unwrap();
2125        assert!(!store.has_authorized_keys().unwrap());
2126
2127        let key = [0u8; 32];
2128        store.add_authorized_key("alice", &key).unwrap();
2129        assert!(store.has_authorized_keys().unwrap());
2130
2131        let retrieved = store.get_authorized_key("alice").unwrap().unwrap();
2132        assert_eq!(retrieved, key);
2133
2134        assert!(store.get_authorized_key("bob").unwrap().is_none());
2135    }
2136
2137    #[test]
2138    fn test_list_repos() {
2139        let store = HubStorage::open_in_memory().unwrap();
2140        store.ensure_repo("repo-1").unwrap();
2141        store.ensure_repo("repo-2").unwrap();
2142        store.ensure_repo("repo-1").unwrap(); // duplicate
2143
2144        let repos = store.list_repos().unwrap();
2145        assert_eq!(repos.len(), 2);
2146    }
2147
2148    #[test]
2149    fn test_webhook_crud() {
2150        let store = HubStorage::open_in_memory().unwrap();
2151        store.ensure_repo("test-repo").unwrap();
2152
2153        let webhook = Webhook {
2154            id: "wh-1".to_string(),
2155            repo_id: "test-repo".to_string(),
2156            url: "https://example.com/hook".to_string(),
2157            events: vec!["push".to_string(), "branch.create".to_string()],
2158            secret: Some("secret123".to_string()),
2159            created_at: 1000,
2160            active: true,
2161        };
2162        store.create_webhook(&webhook).unwrap();
2163
2164        let listed = store.list_webhooks("test-repo").unwrap();
2165        assert_eq!(listed.len(), 1);
2166        assert_eq!(listed[0].id, "wh-1");
2167        assert_eq!(listed[0].events.len(), 2);
2168
2169        let fetched = store.get_webhook("wh-1").unwrap();
2170        assert_eq!(fetched.url, "https://example.com/hook");
2171        assert_eq!(fetched.secret, Some("secret123".to_string()));
2172        assert!(fetched.active);
2173
2174        assert!(store.list_webhooks("other-repo").unwrap().is_empty());
2175
2176        store.delete_webhook("wh-1").unwrap();
2177        assert!(store.list_webhooks("test-repo").unwrap().is_empty());
2178    }
2179}