Skip to main content

suture_hub/
server.rs

1use sha2::Digest;
2use axum::{
3    Json,
4    extract::{ConnectInfo, Path, Query, State},
5    http::{HeaderMap, StatusCode},
6    response::{Html, IntoResponse},
7    routing::get,
8};
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use std::collections::HashSet;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14use crate::blob_backend::BlobBackend;
15use crate::middleware::request_id_layer;
16use crate::storage::HubStorage;
17use crate::storage::{ReplicationEntry, ReplicationStatus};
18pub use crate::types::*;
19use crate::webhooks::{Webhook, WebhookManager};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum Role {
23    Admin,
24    Member,
25    Reader,
26}
27
28impl Role {
29    #[must_use] 
30    pub fn parse(s: &str) -> Self {
31        match s {
32            "admin" => Self::Admin,
33            "member" => Self::Member,
34            _ => Self::Reader,
35        }
36    }
37
38    #[must_use] 
39    pub fn as_str(&self) -> &'static str {
40        match self {
41            Self::Admin => "admin",
42            Self::Member => "member",
43            Self::Reader => "reader",
44        }
45    }
46}
47
48impl PartialOrd for Role {
49    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50        fn rank(r: &Role) -> u8 {
51            match r {
52                Role::Admin => 3,
53                Role::Member => 2,
54                Role::Reader => 1,
55            }
56        }
57        rank(self).partial_cmp(&rank(other))
58    }
59}
60
61#[derive(serde::Deserialize)]
62pub struct PaginationParams {
63    pub offset: Option<u32>,
64    pub limit: Option<u32>,
65    pub cursor: Option<String>,
66}
67
68#[derive(serde::Deserialize)]
69pub struct AuditQueryParams {
70    pub actor: Option<String>,
71    pub action: Option<String>,
72    pub limit: Option<usize>,
73    pub offset: Option<usize>,
74}
75
76#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
77struct CursorData {
78    offset: u64,
79}
80
81fn decode_cursor(cursor: &str) -> Option<u64> {
82    let bytes = base64_decode(cursor).ok()?;
83    let data: CursorData = serde_json::from_slice(&bytes).ok()?;
84    Some(data.offset)
85}
86
87fn encode_cursor(offset: u64) -> String {
88    let data = CursorData { offset };
89    let json = serde_json::to_vec(&data).unwrap_or_default();
90    base64_encode(&json)
91}
92
93pub struct SutureHubServer {
94    pub(crate) storage: Arc<RwLock<HubStorage>>,
95    blob_backend: Option<Arc<dyn BlobBackend>>,
96    no_auth: bool,
97    rate_limits:
98        Arc<std::sync::RwLock<std::collections::HashMap<String, (u32, std::time::Instant)>>>,
99    max_pushes_per_hour: u32,
100    max_pulls_per_hour: u32,
101    max_token_creates_per_minute: u32,
102    rate_limit_window: std::time::Duration,
103    replication_role: Arc<std::sync::RwLock<String>>,
104    webhook_manager: Arc<WebhookManager>,
105    #[allow(dead_code)]
106    rate_limit_db: Option<Arc<tokio::sync::Mutex<rusqlite::Connection>>>,
107    lfs_data_dir: Option<std::path::PathBuf>,
108    #[cfg(feature = "raft-cluster")]
109    raft_node: Arc<tokio::sync::Mutex<suture_raft::RaftNode>>,
110    #[cfg(feature = "raft-cluster")]
111    raft_node_id: u64,
112}
113
114impl Default for SutureHubServer {
115    fn default() -> Self {
116        Self::new_in_memory()
117    }
118}
119
120impl SutureHubServer {
121    #[must_use] 
122    pub fn new() -> Self {
123        Self::new_in_memory()
124    }
125
126    #[must_use] 
127    pub fn new_in_memory() -> Self {
128        Self {
129            storage: Arc::new(RwLock::new(
130                HubStorage::open_in_memory().expect("in-memory storage must open"),
131            )),
132            blob_backend: None,
133            no_auth: false,
134            rate_limits: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
135            max_pushes_per_hour: 100,
136            max_pulls_per_hour: 1000,
137            max_token_creates_per_minute: 5,
138            rate_limit_window: std::time::Duration::from_secs(60),
139            replication_role: Arc::new(std::sync::RwLock::new("standalone".to_owned())),
140            webhook_manager: Arc::new(WebhookManager::new()),
141            rate_limit_db: None,
142            lfs_data_dir: None,
143            #[cfg(feature = "raft-cluster")]
144            raft_node: Arc::new(tokio::sync::Mutex::new(suture_raft::RaftNode::new(1, vec![]))),
145            #[cfg(feature = "raft-cluster")]
146            raft_node_id: 1,
147        }
148    }
149
150    pub fn with_db(path: &std::path::Path) -> Result<Self, crate::storage::StorageError> {
151        let rate_limit_db_path = path.with_extension("rate.db");
152        let rate_limit_conn = rusqlite::Connection::open(&rate_limit_db_path)?;
153        rate_limit_conn.execute_batch("PRAGMA journal_mode=WAL;")?;
154        rate_limit_conn.execute_batch(
155            "CREATE TABLE IF NOT EXISTS rate_limits (
156                key TEXT PRIMARY KEY,
157                count INTEGER NOT NULL DEFAULT 0,
158                window_start INTEGER NOT NULL
159            );",
160        )?;
161        Ok(Self {
162            storage: Arc::new(RwLock::new(HubStorage::open(path)?)),
163            blob_backend: None,
164            no_auth: false,
165            rate_limits: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
166            max_pushes_per_hour: 100,
167            max_pulls_per_hour: 1000,
168            max_token_creates_per_minute: 5,
169            rate_limit_window: std::time::Duration::from_secs(60),
170            replication_role: Arc::new(std::sync::RwLock::new("standalone".to_owned())),
171            webhook_manager: Arc::new(WebhookManager::new()),
172            rate_limit_db: Some(Arc::new(tokio::sync::Mutex::new(rate_limit_conn))),
173            lfs_data_dir: None,
174            #[cfg(feature = "raft-cluster")]
175            raft_node: Arc::new(tokio::sync::Mutex::new(suture_raft::RaftNode::new(1, vec![]))),
176            #[cfg(feature = "raft-cluster")]
177            raft_node_id: 1,
178        })
179    }
180
181    pub fn set_no_auth(&mut self, no_auth: bool) {
182        self.no_auth = no_auth;
183    }
184
185    #[must_use] 
186    pub fn is_no_auth(&self) -> bool {
187        self.no_auth
188    }
189
190    #[must_use] 
191    pub fn storage(&self) -> &Arc<RwLock<HubStorage>> {
192        &self.storage
193    }
194
195    pub fn shutdown(&self) {
196        tracing::info!("Hub server shutting down");
197    }
198
199    /// Check if this node is the Raft leader (or standalone).
200    ///
201    /// Returns `true` if:
202    /// - Raft is not enabled (standalone mode), or
203    /// - Raft is enabled and this node is the leader
204    #[cfg(feature = "raft-cluster")]
205    pub async fn is_leader(&self) -> bool {
206        let raft = self.raft_node.lock().await;
207        matches!(raft.state(), suture_raft::NodeState::Leader)
208    }
209
210    /// Check if this node is the leader (no-op when Raft is disabled).
211    #[cfg(not(feature = "raft-cluster"))]
212    pub async fn is_leader(&self) -> bool {
213        true // standalone mode — always leader
214    }
215
216    /// Get the current Raft leader ID, if known.
217    #[cfg(feature = "raft-cluster")]
218    pub async fn raft_leader(&self) -> Option<u64> {
219        self.raft_node.lock().await.leader()
220    }
221
222    /// Get this node's Raft state.
223    #[cfg(feature = "raft-cluster")]
224    pub async fn raft_state(&self) -> String {
225        let raft = self.raft_node.lock().await;
226        match raft.state() {
227            suture_raft::NodeState::Leader => "leader".to_owned(),
228            suture_raft::NodeState::Follower => "follower".to_owned(),
229            suture_raft::NodeState::Candidate => "candidate".to_owned(),
230            suture_raft::NodeState::PreCandidate => "pre-candidate".to_owned(),
231        }
232    }
233
234    /// Propose a Raft command (must be called on leader).
235    #[cfg(feature = "raft-cluster")]
236    pub async fn raft_propose(&self, command: Vec<u8>) -> Result<(), suture_raft::RaftError> {
237        let mut raft = self.raft_node.lock().await;
238        raft.propose(command)
239    }
240
241    /// Get committed but unapplied Raft entries.
242    #[cfg(feature = "raft-cluster")]
243    pub async fn raft_committed_entries(&self) -> Vec<suture_raft::LogEntry> {
244        let raft = self.raft_node.lock().await;
245        raft.committed_entries().to_vec()
246    }
247
248    /// Advance the Raft applied index.
249    #[cfg(feature = "raft-cluster")]
250    pub async fn raft_advance_applied(&self, count: usize) {
251        let mut raft = self.raft_node.lock().await;
252        raft.advance_applied(count);
253    }
254
255    pub fn set_rate_limit_config(&mut self, pushes: u32, pulls: u32, window: std::time::Duration) {
256        self.max_pushes_per_hour = pushes;
257        self.max_pulls_per_hour = pulls;
258        self.rate_limit_window = window;
259    }
260
261    #[must_use]
262    pub fn with_lfs_dir(mut self, path: std::path::PathBuf) -> Self {
263        if let Err(e) = std::fs::create_dir_all(&path) {
264            tracing::warn!("Failed to create directory {}: {}", path.display(), e);
265        }
266        self.lfs_data_dir = Some(path);
267        self
268    }
269
270    pub fn set_replication_role(&self, role: &str) {
271        *self.replication_role.write().unwrap_or_else(std::sync::PoisonError::into_inner) = role.to_owned();
272    }
273
274    #[must_use] 
275    pub fn get_replication_role(&self) -> String {
276        self.replication_role.read().unwrap_or_else(std::sync::PoisonError::into_inner).clone()
277    }
278
279    pub fn set_blob_backend(&mut self, backend: Arc<dyn BlobBackend>) {
280        self.blob_backend = Some(backend);
281    }
282
283    fn blob_store(
284        &self,
285        store: &HubStorage,
286        repo_id: &str,
287        hash_hex: &str,
288        data: &[u8],
289    ) -> Result<(), String> {
290        self.blob_backend.as_ref().map_or_else(
291            || store
292                .store_blob(repo_id, hash_hex, data)
293                .map_err(|e| e.to_string()),
294            |backend| backend.store_blob(repo_id, hash_hex, data),
295        )
296    }
297
298    fn blob_get(
299        &self,
300        store: &HubStorage,
301        repo_id: &str,
302        hash_hex: &str,
303    ) -> Result<Option<Vec<u8>>, String> {
304        self.blob_backend.as_ref().map_or_else(
305            || store.get_blob(repo_id, hash_hex).map_err(|e| e.to_string()),
306            |backend| backend.get_blob(repo_id, hash_hex),
307        )
308    }
309
310    pub async fn log_write(
311        &self,
312        operation: &str,
313        table_name: &str,
314        row_id: &str,
315        data: Option<&str>,
316    ) -> Result<i64, crate::storage::StorageError> {
317        let store = self.storage.write().await;
318        store.log_operation(operation, table_name, row_id, data)
319    }
320
321    pub async fn handle_add_peer(&self, req: AddPeerRequest) -> AddPeerResponse {
322        let store = self.storage.write().await;
323        match store.add_replication_peer(&req.peer_url, &req.role) {
324            Ok(peer_id) => AddPeerResponse {
325                success: true,
326                peer_id: Some(peer_id),
327                error: None,
328            },
329            Err(e) => AddPeerResponse {
330                success: false,
331                peer_id: None,
332                error: Some(format!("{e}")),
333            },
334        }
335    }
336
337    pub async fn handle_remove_peer(&self, id: i64) -> RemovePeerResponse {
338        let store = self.storage.write().await;
339        match store.remove_replication_peer(id) {
340            Ok(()) => RemovePeerResponse {
341                success: true,
342                error: None,
343            },
344            Err(e) => RemovePeerResponse {
345                success: false,
346                error: Some(format!("{e}")),
347            },
348        }
349    }
350
351    pub async fn handle_list_peers(&self) -> ListPeersResponse {
352        let store = self.storage.read().await;
353        ListPeersResponse {
354            peers: store.list_replication_peers().unwrap_or_else(|e| {
355                tracing::warn!("store list_replication_peers failed: {e}");
356                Default::default()
357            }),
358        }
359    }
360
361    pub async fn handle_replication_status(&self) -> ReplicationStatusResponse {
362        let store = self.storage.read().await;
363        let status = match store.get_replication_status() {
364            Ok(s) => s,
365            Err(e) => {
366                tracing::error!("Failed to get replication status: {e}");
367                return ReplicationStatusResponse {
368                    status: ReplicationStatus {
369                        current_seq: 0,
370                        peer_count: 0,
371                        peers: vec![],
372                    },
373                };
374            }
375        };
376        ReplicationStatusResponse { status }
377    }
378
379    pub async fn handle_replication_sync(&self, entries: Vec<ReplicationEntry>) -> SyncResponse {
380        let store = self.storage.write().await;
381        match store.apply_replication_entries(&entries) {
382            Ok(()) => SyncResponse {
383                success: true,
384                applied: entries.len(),
385                error: None,
386            },
387            Err(e) => SyncResponse {
388                success: false,
389                applied: 0,
390                error: Some(format!("{e}")),
391            },
392        }
393    }
394
395    pub fn check_rate_limit(&self, ip: &str, key: &str) -> Result<(), u64> {
396        let window = self.rate_limit_window;
397        if window.is_zero() {
398            return Ok(());
399        }
400
401        let full_key = format!("{key}:{ip}");
402        let now = std::time::Instant::now();
403        let mut limits = self.rate_limits.write().unwrap_or_else(std::sync::PoisonError::into_inner);
404
405        limits.retain(|_, (_, start)| now.duration_since(*start) < window);
406
407        let limit = match key {
408            "push" => self.max_pushes_per_hour,
409            "pull" => self.max_pulls_per_hour,
410            "token_create" => self.max_token_creates_per_minute,
411            _ => return Ok(()),
412        };
413
414        if let Some(&(count, window_start)) = limits.get(&full_key) {
415            if count >= limit {
416                let elapsed = now.duration_since(window_start);
417                let remaining = window.saturating_sub(elapsed);
418                let retry_after = remaining.as_secs().max(1);
419                return Err(retry_after);
420            }
421            limits.insert(full_key, (count + 1, window_start));
422        } else {
423            limits.insert(full_key, (1, now));
424        }
425
426        Ok(())
427    }
428
429    pub async fn handle_repo_patches_cursor(
430        &self,
431        repo_id: &str,
432        offset: u64,
433        limit: u32,
434    ) -> (Vec<PatchProto>, Option<String>) {
435        let store = self.storage.read().await;
436        let effective_limit = limit.min(200) as usize;
437        let offset = offset as usize;
438        let patches = store.get_all_patches(repo_id, offset, effective_limit + 1).unwrap_or_else(|e| {
439            tracing::warn!("store get_all_patches failed: {e}");
440            Default::default()
441        });
442        let has_more = patches.len() > effective_limit;
443        let mut collected = patches;
444        if has_more {
445            collected.truncate(effective_limit);
446        }
447        let next_cursor = if has_more {
448            Some(encode_cursor(offset as u64 + limit as u64))
449        } else {
450            None
451        };
452        (collected, next_cursor)
453    }
454
455    pub async fn add_authorized_key(
456        &self,
457        author: &str,
458        public_key_bytes: &[u8],
459    ) -> Result<(), crate::storage::StorageError> {
460        let store = self.storage.write().await;
461        store.add_authorized_key(author, public_key_bytes)
462    }
463
464    pub async fn handle_push(
465        &self,
466        req: PushRequest,
467    ) -> Result<PushResponse, (StatusCode, PushResponse)> {
468        if let Some(ref sig_bytes) = req.signature {
469            let store = self.storage.read().await;
470            let has_keys = match store.has_authorized_keys() {
471                Ok(v) => v,
472                Err(e) => {
473                    tracing::error!("Failed to check authorized keys: {e}");
474                    return Err((
475                        StatusCode::INTERNAL_SERVER_ERROR,
476                        PushResponse {
477                            success: false,
478                            error: Some("database error".to_owned()),
479                            existing_patches: vec![],
480                        },
481                    ));
482                }
483            };
484            if has_keys
485                && let Err(e) = verify_push_signature(&store, &req, sig_bytes)
486            {
487                return Err((
488                    StatusCode::FORBIDDEN,
489                    PushResponse {
490                        success: false,
491                        error: Some(format!("authentication failed: {e}")),
492                        existing_patches: vec![],
493                    },
494                ));
495            }
496        } else if !self.no_auth {
497            let store = self.storage.read().await;
498            let has_keys = store.has_authorized_keys().unwrap_or_else(|e| {
499                tracing::error!("Failed to check authorized keys: {e}");
500                true
501            });
502            let has_tokens = store.has_tokens().unwrap_or_else(|e| {
503                tracing::error!("Failed to check tokens: {e}");
504                true
505            });
506            if has_keys || has_tokens {
507                return Err((
508                    StatusCode::FORBIDDEN,
509                    PushResponse {
510                        success: false,
511                        error: Some("authentication required: no signature provided".to_owned()),
512                        existing_patches: vec![],
513                    },
514                ));
515            }
516        }
517
518        let mut existing_patches = Vec::new();
519
520        let store = self.storage.write().await;
521        if let Err(e) = store.ensure_repo(&req.repo_id) {
522            return Err((
523                StatusCode::INTERNAL_SERVER_ERROR,
524                PushResponse {
525                    success: false,
526                    error: Some(format!("storage error: {e}")),
527                    existing_patches: vec![],
528                },
529            ));
530        }
531
532        for blob in &req.blobs {
533            let hex = hash_to_hex(&blob.hash);
534            let data = match base64_decode(&blob.data) {
535                Ok(d) => d,
536                Err(e) => {
537                    return Err((
538                        StatusCode::BAD_REQUEST,
539                        PushResponse {
540                            success: false,
541                            error: Some(format!("invalid base64 in blob: {e}")),
542                            existing_patches: vec![],
543                        },
544                    ));
545                }
546            };
547            if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
548                return Err((
549                    StatusCode::INTERNAL_SERVER_ERROR,
550                    PushResponse {
551                        success: false,
552                        error: Some(format!("storage error: {e}")),
553                        existing_patches: vec![],
554                    },
555                ));
556            }
557        }
558
559        for patch in &req.patches {
560            let inserted = match store.insert_patch(&req.repo_id, patch) {
561                Ok(i) => i,
562                Err(e) => {
563                    return Err((
564                        StatusCode::INTERNAL_SERVER_ERROR,
565                        PushResponse {
566                            success: false,
567                            error: Some(format!("storage error: {e}")),
568                            existing_patches: vec![],
569                        },
570                    ));
571                }
572            };
573            if !inserted {
574                existing_patches.push(patch.id.clone());
575            }
576        }
577
578        for patch in &req.patches {
579            if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
580                tracing::warn!("Failed to log operation: {}", e);
581            }
582        }
583
584        for branch in &req.branches {
585            let target_hex = hash_to_hex(&branch.target_id);
586
587            if !req.force
588                && let Some(ref known) = req.known_branches
589                && let Some(known_branch) = known.iter().find(|kb| kb.name == branch.name)
590            {
591                let known_target = hash_to_hex(&known_branch.target_id);
592                if known_target != target_hex
593                    && let Ok(Some(current_target)) =
594                        store.get_branch_target(&req.repo_id, &branch.name)
595                    && !store
596                        .is_ancestor(&req.repo_id, &current_target, &target_hex)
597                        .unwrap_or_else(|e| {
598                            tracing::warn!("Failed to check ancestry: {e}");
599                            true
600                        })
601                {
602                    return Err((
603                        StatusCode::CONFLICT,
604                        PushResponse {
605                            success: false,
606                            error: Some(format!(
607                                "branch '{}' rejected: non-fast-forward push (use --force to override)",
608                                branch.name
609                            )),
610                            existing_patches: vec![],
611                        },
612                    ));
613                }
614            }
615
616            if store
617                .is_branch_protected(&req.repo_id, &branch.name)
618                .unwrap_or_else(|e| {
619                    tracing::warn!("Failed to check branch protection: {e}");
620                    false
621                })
622            {
623                let push_authors: std::collections::HashSet<&str> =
624                    req.patches.iter().map(|p| p.author.as_str()).collect();
625                let is_owner =
626                    push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
627                if !is_owner {
628                    return Err((
629                        StatusCode::FORBIDDEN,
630                        PushResponse {
631                            success: false,
632                            error: Some(format!(
633                                "branch '{}' is protected and can only be updated by its owner",
634                                branch.name
635                            )),
636                            existing_patches: vec![],
637                        },
638                    ));
639                }
640            }
641
642            if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
643                return Err((
644                    StatusCode::INTERNAL_SERVER_ERROR,
645                    PushResponse {
646                        success: false,
647                        error: Some(format!("storage error: {e}")),
648                        existing_patches: vec![],
649                    },
650                ));
651            }
652        }
653
654        for branch in &req.branches {
655            let target_hex = hash_to_hex(&branch.target_id);
656            if let Err(e) = store.log_operation(
657                "set",
658                "branches",
659                &format!("{}:{}", req.repo_id, branch.name),
660                Some(&target_hex),
661            ) {
662                tracing::warn!("Failed to log operation: {}", e);
663            }
664        }
665
666        let repo_id = req.repo_id.clone();
667        let patch_data = serde_json::json!({
668            "patch_count": req.patches.len(),
669            "branch_count": req.branches.len(),
670            "existing_patches": existing_patches.clone(),
671        });
672        let manager = Arc::clone(&self.webhook_manager);
673        let storage = Arc::clone(&self.storage);
674        tokio::spawn(async move {
675            let hooks = {
676                let store = storage.read().await;
677                store.list_webhooks(&repo_id).unwrap_or_else(|e| {
678                    tracing::warn!("store list_webhooks failed: {e}");
679                    Default::default()
680                })
681            };
682            if !hooks.is_empty() {
683                let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
684                if result.failed > 0 {
685                    tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
686                }
687            }
688        });
689
690        Ok(PushResponse {
691            success: true,
692            error: None,
693            existing_patches,
694        })
695    }
696
697    pub async fn handle_pull(&self, req: PullRequest) -> PullResponse {
698        let store = self.storage.read().await;
699
700        let exists = match store.repo_exists(&req.repo_id) {
701            Ok(e) => e,
702            Err(e) => {
703                tracing::error!("Failed to check repo existence: {e}");
704                return PullResponse {
705                    success: false,
706                    error: Some(format!("database error: {e}")),
707                    patches: vec![],
708                    branches: vec![],
709                    blobs: vec![],
710                };
711            }
712        };
713        if !exists {
714            return PullResponse {
715                success: false,
716                error: Some(format!("repo not found: {}", req.repo_id)),
717                patches: vec![],
718                branches: vec![],
719                blobs: vec![],
720            };
721        }
722
723        let all_patches = store.get_all_patches_unbounded(&req.repo_id).unwrap_or_else(|e| {
724            tracing::warn!("store get_all_patches failed: {e}");
725            Default::default()
726        });
727        let client_ancestors = collect_ancestors(&all_patches, &req.known_branches);
728        let mut new_patches = collect_new_patches(&all_patches, &client_ancestors);
729
730        if let Some(depth) = req.max_depth {
731            new_patches.truncate(depth as usize);
732        }
733
734        let branches = store.get_branches(&req.repo_id).unwrap_or_else(|e| {
735            tracing::warn!("store get_branches failed: {e}");
736            Default::default()
737        });
738
739        let mut needed_hashes: std::collections::HashSet<String> = std::collections::HashSet::new();
740        for patch in &new_patches {
741            if patch.operation_type == "batch" {
742                if let Ok(decoded) = base64_decode(&patch.payload)
743                    && let Ok(changes) = serde_json::from_str::<Vec<serde_json::Value>>(
744                        &String::from_utf8_lossy(&decoded),
745                    )
746                {
747                    for change in &changes {
748                        if let Some(payload_val) = change.get("payload").and_then(|v| v.as_array())
749                        {
750                            let hex_bytes: Vec<u8> = payload_val
751                                .iter()
752                                .filter_map(|v| v.as_u64().map(|n| n as u8))
753                                .collect();
754                            if let Ok(hex) = String::from_utf8(hex_bytes) {
755                                needed_hashes.insert(hex);
756                            }
757                        }
758                    }
759                }
760            } else if !patch.payload.is_empty() {
761                // Payload may be raw hex (from tests) or base64-encoded (from CLI).
762                // Try raw hex first — if it looks like a hex hash, use it directly.
763                // Otherwise try base64 decode.
764                let hex = if patch.payload.chars().all(|c| c.is_ascii_hexdigit()) {
765                    patch.payload.clone()
766                } else if let Ok(decoded) = base64_decode(&patch.payload) {
767                    String::from_utf8_lossy(&decoded).to_string()
768                } else {
769                    patch.payload.clone()
770                };
771                needed_hashes.insert(hex);
772            }
773        }
774        let blobs = store
775            .get_blobs(&req.repo_id, &needed_hashes)
776            .unwrap_or_else(|e| {
777                tracing::warn!("store get_blobs failed: {e}");
778                Default::default()
779            });
780
781        PullResponse {
782            success: true,
783            error: None,
784            patches: new_patches,
785            branches,
786            blobs,
787        }
788    }
789
790    pub async fn handle_list_repos(&self) -> ListReposResponse {
791        let store = self.storage.read().await;
792        let repo_ids = match store.list_repos() {
793            Ok(r) => r,
794            Err(e) => {
795                tracing::error!("Failed to list repos: {e}");
796                return ListReposResponse { repo_ids: vec![] };
797            }
798        };
799        ListReposResponse { repo_ids }
800    }
801
802    pub async fn handle_repo_info(&self, repo_id: &str) -> RepoInfoResponse {
803        let store = self.storage.read().await;
804
805        let exists = match store.repo_exists(repo_id) {
806            Ok(e) => e,
807            Err(e) => {
808                tracing::error!("Failed to check repo existence: {e}");
809                return RepoInfoResponse {
810                    repo_id: repo_id.to_owned(),
811                    patch_count: 0,
812                    branches: vec![],
813                    success: false,
814                    error: Some(format!("database error: {e}")),
815                };
816            }
817        };
818        if !exists {
819            return RepoInfoResponse {
820                repo_id: repo_id.to_owned(),
821                patch_count: 0,
822                branches: vec![],
823                success: false,
824                error: Some(format!("repo not found: {repo_id}")),
825            };
826        }
827
828        let patch_count = store.patch_count(repo_id).unwrap_or_else(|e| {
829            tracing::error!("Failed to get patch count: {e}");
830            0
831        });
832        let branches = store.get_branches(repo_id).unwrap_or_else(|e| {
833            tracing::warn!("store get_branches failed: {e}");
834            Default::default()
835        });
836
837        RepoInfoResponse {
838            repo_id: repo_id.to_owned(),
839            patch_count,
840            branches,
841            success: true,
842            error: None,
843        }
844    }
845
846    pub async fn handle_mirror_setup(
847        &self,
848        req: crate::types::MirrorSetupRequest,
849    ) -> crate::types::MirrorSetupResponse {
850        if let Err(e) = validate_mirror_url(&req.upstream_url) {
851            return crate::types::MirrorSetupResponse {
852                success: false,
853                error: Some(format!("invalid upstream URL: {e}")),
854                mirror_id: None,
855            };
856        }
857
858        let store = self.storage.write().await;
859
860        match store.add_mirror(&req.repo_name, &req.upstream_url, &req.upstream_repo) {
861            Ok(mirror_id) => {
862                if let Err(e) = store.ensure_repo(&req.repo_name) {
863                    return crate::types::MirrorSetupResponse {
864                        success: false,
865                        error: Some(format!("failed to create repo: {e}")),
866                        mirror_id: None,
867                    };
868                }
869                crate::types::MirrorSetupResponse {
870                    success: true,
871                    error: None,
872                    mirror_id: Some(mirror_id),
873                }
874            }
875            Err(e) => crate::types::MirrorSetupResponse {
876                success: false,
877                error: Some(format!("failed to register mirror: {e}")),
878                mirror_id: None,
879            },
880        }
881    }
882
883    pub async fn handle_mirror_sync(
884        &self,
885        req: crate::types::MirrorSyncRequest,
886    ) -> crate::types::MirrorSyncResponse {
887        let store = self.storage.write().await;
888
889        let mirror_info = match store.get_mirror(req.mirror_id) {
890            Ok(Some(info)) => info,
891            Ok(None) => {
892                return crate::types::MirrorSyncResponse {
893                    success: false,
894                    error: Some(format!("mirror {} not found", req.mirror_id)),
895                    patches_synced: 0,
896                    branches_synced: 0,
897                };
898            }
899            Err(e) => {
900                return crate::types::MirrorSyncResponse {
901                    success: false,
902                    error: Some(format!("database error: {e}")),
903                    patches_synced: 0,
904                    branches_synced: 0,
905                };
906            }
907        };
908
909        let (local_repo, upstream_url, upstream_repo, _, _) = mirror_info;
910
911        if let Err(e) = validate_mirror_url(&upstream_url) {
912            return crate::types::MirrorSyncResponse {
913                success: false,
914                error: Some(format!("invalid upstream URL: {e}")),
915                patches_synced: 0,
916                branches_synced: 0,
917            };
918        }
919
920        if let Err(e) = store.update_mirror_status(req.mirror_id, "syncing", None) {
921            return crate::types::MirrorSyncResponse {
922                success: false,
923                error: Some(format!("failed to update status: {e}")),
924                patches_synced: 0,
925                branches_synced: 0,
926            };
927        }
928
929        drop(store);
930
931        let upstream_pull = crate::types::PullRequest {
932            repo_id: upstream_repo,
933            known_branches: vec![],
934            max_depth: None,
935        };
936
937        let client = reqwest::Client::new();
938        let pull_resp = match client
939            .post(format!("{upstream_url}/pull"))
940            .json(&upstream_pull)
941            .send()
942            .await
943        {
944            Ok(resp) => resp,
945            Err(e) => {
946                let store = self.storage.write().await;
947                if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
948                    tracing::warn!("Failed to update mirror status: {}", e);
949                }
950                return crate::types::MirrorSyncResponse {
951                    success: false,
952                    error: Some(format!("failed to reach upstream: {e}")),
953                    patches_synced: 0,
954                    branches_synced: 0,
955                };
956            }
957        };
958
959        let pull_result: crate::types::PullResponse = match pull_resp.json().await {
960            Ok(r) => r,
961            Err(e) => {
962                let store = self.storage.write().await;
963                if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
964                    tracing::warn!("Failed to update mirror status: {}", e);
965                }
966                return crate::types::MirrorSyncResponse {
967                    success: false,
968                    error: Some(format!("failed to parse upstream response: {e}")),
969                    patches_synced: 0,
970                    branches_synced: 0,
971                };
972            }
973        };
974
975        if !pull_result.success {
976            let store = self.storage.write().await;
977            if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
978                tracing::warn!("Failed to update mirror status: {}", e);
979            }
980            return crate::types::MirrorSyncResponse {
981                success: false,
982                error: pull_result.error,
983                patches_synced: 0,
984                branches_synced: 0,
985            };
986        }
987
988        let store = self.storage.write().await;
989        let mut patches_synced = 0u64;
990
991        for blob in &pull_result.blobs {
992            let hex = hash_to_hex(&blob.hash);
993            let Ok(data) = base64_decode(&blob.data) else { continue };
994            if let Err(e) = self.blob_store(&store, &local_repo, &hex, &data) {
995                tracing::warn!("Failed to store blob during mirror sync: {}", e);
996            }
997        }
998
999        for patch in &pull_result.patches {
1000            let inserted = store.insert_patch(&local_repo, patch).unwrap_or_else(|e| {
1001                tracing::warn!("Failed to insert patch during mirror sync: {e}");
1002                false
1003            });
1004            if inserted {
1005                patches_synced += 1;
1006            }
1007        }
1008
1009        let branches_synced = pull_result.branches.len() as u64;
1010        for branch in &pull_result.branches {
1011            let target_hex = hash_to_hex(&branch.target_id);
1012            if let Err(e) = store.set_branch(&local_repo, &branch.name, &target_hex) {
1013                tracing::warn!("Failed to update branch during mirror sync: {}", e);
1014            }
1015        }
1016
1017        let now = std::time::SystemTime::now()
1018            .duration_since(std::time::UNIX_EPOCH)
1019            .unwrap_or_default()
1020            .as_secs() as i64;
1021        if let Err(e) = store.update_mirror_status(req.mirror_id, "idle", Some(now)) {
1022            tracing::warn!("Failed to update mirror status: {}", e);
1023        }
1024
1025        crate::types::MirrorSyncResponse {
1026            success: true,
1027            error: None,
1028            patches_synced,
1029            branches_synced,
1030        }
1031    }
1032
1033    pub async fn handle_mirror_status(
1034        &self,
1035        req: crate::types::MirrorStatusRequest,
1036    ) -> crate::types::MirrorStatusResponse {
1037        let store = self.storage.read().await;
1038
1039        let mirrors = store.list_mirrors().unwrap_or_else(|e| {
1040            tracing::warn!("store list_mirrors failed: {e}");
1041            Default::default()
1042        });
1043
1044        let entries: Vec<crate::types::MirrorStatusEntry> = mirrors
1045            .into_iter()
1046            .filter(|m| {
1047                if let Some(mid) = req.mirror_id
1048                    && m.0 != mid
1049                {
1050                    return false;
1051                }
1052                if let Some(ref name) = req.repo_name
1053                    && &m.1 != name
1054                {
1055                    return false;
1056                }
1057                true
1058            })
1059            .map(|m| crate::types::MirrorStatusEntry {
1060                mirror_id: m.0,
1061                repo_name: m.1,
1062                upstream_url: m.2,
1063                upstream_repo: m.3,
1064                last_sync: m.4.map(|v| v as u64),
1065                status: m.5,
1066            })
1067            .collect();
1068
1069        crate::types::MirrorStatusResponse {
1070            success: true,
1071            error: None,
1072            mirrors: entries,
1073        }
1074    }
1075
1076    pub async fn handle_pull_v2(
1077        &self,
1078        req: crate::types::PullRequestV2,
1079    ) -> crate::types::PullResponseV2 {
1080        let store = self.storage.read().await;
1081
1082        let exists = match store.repo_exists(&req.repo_id) {
1083            Ok(e) => e,
1084            Err(e) => {
1085                tracing::error!("Failed to check repo existence: {e}");
1086                return crate::types::PullResponseV2 {
1087                    success: false,
1088                    error: Some(format!("database error: {e}")),
1089                    patches: vec![],
1090                    branches: vec![],
1091                    blobs: vec![],
1092                    deltas: vec![],
1093                    protocol_version: crate::types::PROTOCOL_VERSION_V2,
1094                };
1095            }
1096        };
1097        if !exists {
1098            return crate::types::PullResponseV2 {
1099                success: false,
1100                error: Some(format!("repo not found: {}", req.repo_id)),
1101                patches: vec![],
1102                branches: vec![],
1103                blobs: vec![],
1104                deltas: vec![],
1105                protocol_version: crate::types::PROTOCOL_VERSION_V2,
1106            };
1107        }
1108
1109        let all_patches = store.get_all_patches_unbounded(&req.repo_id).unwrap_or_else(|e| {
1110            tracing::warn!("store get_all_patches failed: {e}");
1111            Default::default()
1112        });
1113        let client_ancestors = collect_ancestors(&all_patches, &req.known_branches);
1114        let mut new_patches = collect_new_patches(&all_patches, &client_ancestors);
1115
1116        if let Some(depth) = req.max_depth {
1117            new_patches.truncate(depth as usize);
1118        }
1119
1120        let branches = store.get_branches(&req.repo_id).unwrap_or_else(|e| {
1121            tracing::warn!("store get_branches failed: {e}");
1122            Default::default()
1123        });
1124
1125        let mut needed_hashes: std::collections::HashSet<String> = std::collections::HashSet::new();
1126        for patch in &new_patches {
1127            if patch.operation_type == "batch" {
1128                if let Ok(decoded) = base64_decode(&patch.payload)
1129                    && let Ok(changes) = serde_json::from_str::<Vec<serde_json::Value>>(
1130                        &String::from_utf8_lossy(&decoded),
1131                    )
1132                {
1133                    for change in &changes {
1134                        if let Some(payload_val) = change.get("payload").and_then(|v| v.as_array())
1135                        {
1136                            let hex_bytes: Vec<u8> = payload_val
1137                                .iter()
1138                                .filter_map(|v| v.as_u64().map(|n| n as u8))
1139                                .collect();
1140                            if let Ok(hex) = String::from_utf8(hex_bytes) {
1141                                needed_hashes.insert(hex);
1142                            }
1143                        }
1144                    }
1145                }
1146            } else if !patch.payload.is_empty() {
1147                let hex = if patch.payload.chars().all(|c| c.is_ascii_hexdigit()) {
1148                    patch.payload.clone()
1149                } else if let Ok(decoded) = base64_decode(&patch.payload) {
1150                    String::from_utf8_lossy(&decoded).to_string()
1151                } else {
1152                    patch.payload.clone()
1153                };
1154                needed_hashes.insert(hex);
1155            }
1156        }
1157
1158        let known_hash_set: std::collections::HashSet<String> = req
1159            .known_blob_hashes
1160            .iter()
1161            .map(|h| h.value.clone())
1162            .collect();
1163
1164        let mut blobs = Vec::new();
1165        let mut deltas = Vec::new();
1166
1167        if req.capabilities.supports_delta {
1168            for needed_hash in &needed_hashes {
1169                let Ok(Some(target_data)) = self.blob_get(&store, &req.repo_id, needed_hash) else {
1170                        if let Ok(b) = store.get_blobs(
1171                            &req.repo_id,
1172                            &std::collections::HashSet::from([needed_hash.clone()]),
1173                        ) && let Some(blob) = b.into_iter().next()
1174                        {
1175                            blobs.push(blob);
1176                        }
1177                        continue;
1178                    };
1179
1180                if known_hash_set.contains(needed_hash) {
1181                    let Ok(Some(base_data)) = self.blob_get(&store, &req.repo_id, needed_hash) else {
1182                            blobs.push(BlobRef {
1183                                hash: HashProto {
1184                                    value: needed_hash.clone(),
1185                                },
1186                                data: base64_encode(&target_data),
1187                                truncated: false,
1188                            });
1189                            continue;
1190                        };
1191
1192                    if base_data == target_data {
1193                        continue;
1194                    }
1195
1196                    let (_base_copy, delta_bytes) =
1197                        suture_protocol::compute_delta(&base_data, &target_data);
1198
1199                    if delta_bytes.len() < target_data.len() {
1200                        deltas.push(BlobDelta {
1201                            base_hash: HashProto {
1202                                value: needed_hash.clone(),
1203                            },
1204                            target_hash: HashProto {
1205                                value: needed_hash.clone(),
1206                            },
1207                            encoding: DeltaEncoding::BinaryPatch,
1208                            delta_data: base64_encode(&delta_bytes),
1209                        });
1210                    } else {
1211                        blobs.push(BlobRef {
1212                            hash: HashProto {
1213                                value: needed_hash.clone(),
1214                            },
1215                            data: base64_encode(&target_data),
1216                            truncated: false,
1217                        });
1218                    }
1219                } else {
1220                    blobs.push(BlobRef {
1221                        hash: HashProto {
1222                            value: needed_hash.clone(),
1223                        },
1224                        data: base64_encode(&target_data),
1225                        truncated: false,
1226                    });
1227                }
1228            }
1229        } else {
1230            blobs = store
1231                .get_blobs(&req.repo_id, &needed_hashes)
1232                .unwrap_or_else(|e| {
1233                    tracing::error!("Failed to get blobs for repo {}: {e}", req.repo_id);
1234                    Default::default()
1235                });
1236        }
1237
1238        crate::types::PullResponseV2 {
1239            success: true,
1240            error: None,
1241            patches: new_patches,
1242            branches,
1243            blobs,
1244            deltas,
1245            protocol_version: crate::types::PROTOCOL_VERSION_V2,
1246        }
1247    }
1248
1249    pub async fn handle_push_v2(
1250        &self,
1251        req: crate::types::PushRequestV2,
1252    ) -> Result<PushResponse, (StatusCode, PushResponse)> {
1253        if let Some(ref sig_bytes) = req.signature {
1254            let store = self.storage.read().await;
1255            let has_keys = match store.has_authorized_keys() {
1256                Ok(v) => v,
1257                Err(e) => {
1258                    tracing::error!("Failed to check authorized keys: {e}");
1259                    return Err((
1260                        StatusCode::INTERNAL_SERVER_ERROR,
1261                        PushResponse {
1262                            success: false,
1263                            error: Some("database error".to_owned()),
1264                            existing_patches: vec![],
1265                        },
1266                    ));
1267                }
1268            };
1269            if has_keys {
1270                let v1_req = PushRequest {
1271                    repo_id: req.repo_id.clone(),
1272                    patches: req.patches.clone(),
1273                    branches: req.branches.clone(),
1274                    blobs: req.blobs.clone(),
1275                    signature: req.signature.clone(),
1276                    known_branches: req.known_branches.clone(),
1277                    force: req.force,
1278                };
1279                if let Err(e) = verify_push_signature(&store, &v1_req, sig_bytes) {
1280                    return Err((
1281                        StatusCode::FORBIDDEN,
1282                        PushResponse {
1283                            success: false,
1284                            error: Some(format!("authentication failed: {e}")),
1285                            existing_patches: vec![],
1286                        },
1287                    ));
1288                }
1289            }
1290        } else if !self.no_auth {
1291            let store = self.storage.read().await;
1292            let has_keys = store.has_authorized_keys().unwrap_or_else(|e| {
1293                tracing::error!("Failed to check authorized keys: {e}");
1294                true
1295            });
1296            let has_tokens = store.has_tokens().unwrap_or_else(|e| {
1297                tracing::error!("Failed to check tokens: {e}");
1298                true
1299            });
1300            if has_keys || has_tokens {
1301                return Err((
1302                    StatusCode::FORBIDDEN,
1303                    PushResponse {
1304                        success: false,
1305                        error: Some("authentication required: no signature provided".to_owned()),
1306                        existing_patches: vec![],
1307                    },
1308                ));
1309            }
1310        }
1311
1312        let store = self.storage.write().await;
1313        if let Err(e) = store.ensure_repo(&req.repo_id) {
1314            return Err((
1315                StatusCode::INTERNAL_SERVER_ERROR,
1316                PushResponse {
1317                    success: false,
1318                    error: Some(format!("storage error: {e}")),
1319                    existing_patches: vec![],
1320                },
1321            ));
1322        }
1323
1324        let mut existing_patches = Vec::new();
1325
1326        for delta in &req.deltas {
1327            if matches!(delta.encoding, DeltaEncoding::BinaryPatch) {
1328                let base_hex = hash_to_hex(&delta.base_hash);
1329                let target_hex = hash_to_hex(&delta.target_hash);
1330                let Ok(Some(base_data)) = self.blob_get(&store, &req.repo_id, &base_hex) else { continue };
1331                let Ok(delta_bytes) = base64_decode(&delta.delta_data) else { continue };
1332                let reconstructed = suture_protocol::apply_delta(&base_data, &delta_bytes);
1333                if let Err(e) = self.blob_store(&store, &req.repo_id, &target_hex, &reconstructed) {
1334                    return Err((
1335                        StatusCode::INTERNAL_SERVER_ERROR,
1336                        PushResponse {
1337                            success: false,
1338                            error: Some(format!("storage error reconstructing delta blob: {e}")),
1339                            existing_patches: vec![],
1340                        },
1341                    ));
1342                }
1343            } else if matches!(delta.encoding, DeltaEncoding::FullBlob) {
1344                let target_hex = hash_to_hex(&delta.target_hash);
1345                let data = match base64_decode(&delta.delta_data) {
1346                    Ok(d) => d,
1347                    Err(e) => {
1348                        return Err((
1349                            StatusCode::BAD_REQUEST,
1350                            PushResponse {
1351                                success: false,
1352                                error: Some(format!("invalid base64 in delta blob: {e}")),
1353                                existing_patches: vec![],
1354                            },
1355                        ));
1356                    }
1357                };
1358                if let Err(e) = self.blob_store(&store, &req.repo_id, &target_hex, &data) {
1359                    return Err((
1360                        StatusCode::INTERNAL_SERVER_ERROR,
1361                        PushResponse {
1362                            success: false,
1363                            error: Some(format!("storage error: {e}")),
1364                            existing_patches: vec![],
1365                        },
1366                    ));
1367                }
1368            }
1369        }
1370
1371        for blob in &req.blobs {
1372            let hex = hash_to_hex(&blob.hash);
1373            let data = match base64_decode(&blob.data) {
1374                Ok(d) => d,
1375                Err(e) => {
1376                    return Err((
1377                        StatusCode::BAD_REQUEST,
1378                        PushResponse {
1379                            success: false,
1380                            error: Some(format!("invalid base64 in blob: {e}")),
1381                            existing_patches: vec![],
1382                        },
1383                    ));
1384                }
1385            };
1386            if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
1387                return Err((
1388                    StatusCode::INTERNAL_SERVER_ERROR,
1389                    PushResponse {
1390                        success: false,
1391                        error: Some(format!("storage error: {e}")),
1392                        existing_patches: vec![],
1393                    },
1394                ));
1395            }
1396        }
1397
1398        for patch in &req.patches {
1399            let inserted = match store.insert_patch(&req.repo_id, patch) {
1400                Ok(i) => i,
1401                Err(e) => {
1402                    return Err((
1403                        StatusCode::INTERNAL_SERVER_ERROR,
1404                        PushResponse {
1405                            success: false,
1406                            error: Some(format!("storage error: {e}")),
1407                            existing_patches: vec![],
1408                        },
1409                    ));
1410                }
1411            };
1412            if !inserted {
1413                existing_patches.push(patch.id.clone());
1414            }
1415        }
1416
1417        for patch in &req.patches {
1418            if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
1419                tracing::warn!("Failed to log operation: {}", e);
1420            }
1421        }
1422
1423        for branch in &req.branches {
1424            let target_hex = hash_to_hex(&branch.target_id);
1425
1426            if !req.force
1427                && let Some(ref known) = req.known_branches
1428                && let Some(known_branch) = known.iter().find(|kb| kb.name == branch.name)
1429            {
1430                let known_target = hash_to_hex(&known_branch.target_id);
1431                if known_target != target_hex
1432                    && let Ok(Some(current_target)) =
1433                        store.get_branch_target(&req.repo_id, &branch.name)
1434                    && !store
1435                        .is_ancestor(&req.repo_id, &current_target, &target_hex)
1436                        .unwrap_or_else(|e| {
1437                            tracing::warn!("Failed to check ancestry: {e}");
1438                            true
1439                        })
1440                {
1441                    return Err((
1442                        StatusCode::CONFLICT,
1443                        PushResponse {
1444                            success: false,
1445                            error: Some(format!(
1446                                "branch '{}' rejected: non-fast-forward push (use --force to override)",
1447                                branch.name
1448                            )),
1449                            existing_patches: vec![],
1450                        },
1451                    ));
1452                }
1453            }
1454
1455            if store
1456                .is_branch_protected(&req.repo_id, &branch.name)
1457                .unwrap_or_else(|e| {
1458                    tracing::warn!("Failed to check branch protection: {e}");
1459                    false
1460                })
1461            {
1462                let push_authors: std::collections::HashSet<&str> =
1463                    req.patches.iter().map(|p| p.author.as_str()).collect();
1464                let is_owner =
1465                    push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
1466                if !is_owner {
1467                    return Err((
1468                        StatusCode::FORBIDDEN,
1469                        PushResponse {
1470                            success: false,
1471                            error: Some(format!(
1472                                "branch '{}' is protected and can only be updated by its owner",
1473                                branch.name
1474                            )),
1475                            existing_patches: vec![],
1476                        },
1477                    ));
1478                }
1479            }
1480
1481            if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
1482                return Err((
1483                    StatusCode::INTERNAL_SERVER_ERROR,
1484                    PushResponse {
1485                        success: false,
1486                        error: Some(format!("storage error: {e}")),
1487                        existing_patches: vec![],
1488                    },
1489                ));
1490            }
1491        }
1492
1493        for branch in &req.branches {
1494            let target_hex = hash_to_hex(&branch.target_id);
1495            if let Err(e) = store.log_operation(
1496                "set",
1497                "branches",
1498                &format!("{}:{}", req.repo_id, branch.name),
1499                Some(&target_hex),
1500            ) {
1501                tracing::warn!("Failed to log operation: {}", e);
1502            }
1503        }
1504
1505        let repo_id = req.repo_id.clone();
1506        let patch_data = serde_json::json!({
1507            "patch_count": req.patches.len(),
1508            "branch_count": req.branches.len(),
1509            "existing_patches": existing_patches.clone(),
1510        });
1511        let manager = Arc::clone(&self.webhook_manager);
1512        let storage = Arc::clone(&self.storage);
1513        tokio::spawn(async move {
1514            let hooks = {
1515                let store = storage.read().await;
1516                store.list_webhooks(&repo_id).unwrap_or_else(|e| {
1517                    tracing::warn!("store list_webhooks failed: {e}");
1518                    Default::default()
1519                })
1520            };
1521            if !hooks.is_empty() {
1522                let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
1523                if result.failed > 0 {
1524                    tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
1525                }
1526            }
1527        });
1528
1529        Ok(PushResponse {
1530            success: true,
1531            error: None,
1532            existing_patches,
1533        })
1534    }
1535
1536    pub async fn handle_batch_push(
1537        &self,
1538        req: BatchPatchRequest,
1539    ) -> Result<PushResponse, (StatusCode, PushResponse)> {
1540        let mut existing_patches = Vec::new();
1541
1542        let store = self.storage.write().await;
1543        if let Err(e) = store.ensure_repo(&req.repo_id) {
1544            let msg = format!("storage error: {e}");
1545            return Err((
1546                StatusCode::INTERNAL_SERVER_ERROR,
1547                PushResponse {
1548                    success: false,
1549                    error: Some(msg),
1550                    existing_patches: vec![],
1551                },
1552            ));
1553        }
1554
1555        for blob in &req.blobs {
1556            let hex = hash_to_hex(&blob.hash);
1557            let data = match base64_decode(&blob.data) {
1558                Ok(d) => d,
1559                Err(e) => {
1560                    let msg = format!("invalid base64 in blob: {e}");
1561                    return Err((
1562                        StatusCode::BAD_REQUEST,
1563                        PushResponse {
1564                            success: false,
1565                            error: Some(msg),
1566                            existing_patches: vec![],
1567                        },
1568                    ));
1569                }
1570            };
1571            if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
1572                let msg = format!("storage error: {e}");
1573                return Err((
1574                    StatusCode::INTERNAL_SERVER_ERROR,
1575                    PushResponse {
1576                        success: false,
1577                        error: Some(msg),
1578                        existing_patches: vec![],
1579                    },
1580                ));
1581            }
1582        }
1583
1584        for patch in &req.patches {
1585            let inserted = match store.insert_patch(&req.repo_id, patch) {
1586                Ok(i) => i,
1587                Err(e) => {
1588                    let msg = format!("storage error: {e}");
1589                    return Err((
1590                        StatusCode::INTERNAL_SERVER_ERROR,
1591                        PushResponse {
1592                            success: false,
1593                            error: Some(msg),
1594                            existing_patches: vec![],
1595                        },
1596                    ));
1597                }
1598            };
1599            if !inserted {
1600                existing_patches.push(patch.id.clone());
1601            }
1602        }
1603
1604        for patch in &req.patches {
1605            if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
1606                tracing::warn!("Failed to log operation: {}", e);
1607            }
1608        }
1609
1610        for branch in &req.branches {
1611            let target_hex = hash_to_hex(&branch.target_id);
1612
1613            if store
1614                .is_branch_protected(&req.repo_id, &branch.name)
1615                .unwrap_or_else(|e| {
1616                    tracing::warn!("Failed to check branch protection: {e}");
1617                    false
1618                })
1619            {
1620                let push_authors: std::collections::HashSet<&str> =
1621                    req.patches.iter().map(|p| p.author.as_str()).collect();
1622                let is_owner =
1623                    push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
1624                if !is_owner && !req.force {
1625                    let msg = format!(
1626                        "branch '{}' is protected and can only be updated by its owner",
1627                        branch.name
1628                    );
1629                    return Err((
1630                        StatusCode::FORBIDDEN,
1631                        PushResponse {
1632                            success: false,
1633                            error: Some(msg),
1634                            existing_patches: vec![],
1635                        },
1636                    ));
1637                }
1638            }
1639
1640            if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
1641                let msg = format!("storage error: {e}");
1642                return Err((
1643                    StatusCode::INTERNAL_SERVER_ERROR,
1644                    PushResponse {
1645                        success: false,
1646                        error: Some(msg),
1647                        existing_patches: vec![],
1648                    },
1649                ));
1650            }
1651        }
1652
1653        for branch in &req.branches {
1654            let target_hex = hash_to_hex(&branch.target_id);
1655            if let Err(e) = store.log_operation(
1656                "set",
1657                "branches",
1658                &format!("{}:{}", req.repo_id, branch.name),
1659                Some(&target_hex),
1660            ) {
1661                tracing::warn!("Failed to log operation: {}", e);
1662            }
1663        }
1664
1665        let repo_id = req.repo_id.clone();
1666        let patch_data = serde_json::json!({
1667            "patch_count": req.patches.len(),
1668            "branch_count": req.branches.len(),
1669            "existing_patches": existing_patches.clone(),
1670        });
1671        let manager = Arc::clone(&self.webhook_manager);
1672        let storage = Arc::clone(&self.storage);
1673        tokio::spawn(async move {
1674            let hooks = {
1675                let store = storage.read().await;
1676                store.list_webhooks(&repo_id).unwrap_or_else(|e| {
1677                    tracing::warn!("store list_webhooks failed: {e}");
1678                    Default::default()
1679                })
1680            };
1681            if !hooks.is_empty() {
1682                let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
1683                if result.failed > 0 {
1684                    tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
1685                }
1686            }
1687        });
1688
1689        Ok(PushResponse {
1690            success: true,
1691            error: None,
1692            existing_patches,
1693        })
1694    }
1695
1696    #[cfg(feature = "raft-cluster")]
1697    pub async fn apply_raft_command(&self, cmd: crate::raft::HubCommand) -> Result<(), String> {
1698        use crate::raft::HubCommand;
1699
1700        let store = self.storage.write().await;
1701
1702        match cmd {
1703            HubCommand::CreateRepo { repo_id } => {
1704                store.ensure_repo(&repo_id).map_err(|e| e.to_string())?;
1705                Ok(())
1706            }
1707            HubCommand::DeleteRepo { repo_id } => {
1708                store.delete_repo(&repo_id).map_err(|e| e.to_string())
1709            }
1710            HubCommand::StoreBlob { hash, data } => store
1711                .store_blob("_raft_default", &hash, &data)
1712                .map_err(|e| e.to_string()),
1713            HubCommand::DeleteBlob { hash } => {
1714                if let Err(e) = store.delete_blob("_raft_default", &hash) {
1715                    tracing::warn!("Failed to delete blob: {}", e);
1716                }
1717                Ok(())
1718            }
1719            HubCommand::CreateBranch {
1720                repo_id,
1721                branch,
1722                target,
1723            }
1724            | HubCommand::UpdateBranch {
1725                repo_id,
1726                branch,
1727                target,
1728            } => store
1729                .set_branch(&repo_id, &branch, &target)
1730                .map_err(|e| e.to_string()),
1731            HubCommand::DeleteBranch { repo_id, branch } => store
1732                .delete_branch(&repo_id, &branch)
1733                .map_err(|e| e.to_string()),
1734            HubCommand::StorePatch {
1735                repo_id,
1736                patch_id,
1737                patch_data,
1738            } => {
1739                let patch: crate::types::PatchProto = match serde_json::from_slice(&patch_data) {
1740                    Ok(p) => p,
1741                    Err(e) => return Err(format!("failed to deserialize patch: {e}")),
1742                };
1743                let expected_hex = patch_id;
1744                let actual_hex = hash_to_hex(&patch.id);
1745                if actual_hex != expected_hex {
1746                    return Err(format!(
1747                        "patch_id mismatch: expected {expected_hex}, got {actual_hex}"
1748                    ));
1749                }
1750                store
1751                    .insert_patch(&repo_id, &patch)
1752                    .map_err(|e| e.to_string())?;
1753                Ok(())
1754            }
1755        }
1756    }
1757}
1758
1759fn validate_mirror_url(url: &str) -> Result<(), &'static str> {
1760    let parsed = url::Url::parse(url).map_err(|_| "invalid URL syntax")?;
1761
1762    match parsed.scheme() {
1763        "http" | "https" => {}
1764        _ => return Err("only http and https URLs are allowed"),
1765    }
1766
1767    let host = parsed.host_str().ok_or("URL must have a host")?;
1768
1769    if let Ok(addr) = host.parse::<std::net::IpAddr>() {
1770        match addr {
1771            std::net::IpAddr::V4(v4) => {
1772                if v4.is_loopback() || v4.is_private() || v4.is_link_local() {
1773                    return Err("private/internal IP addresses are not allowed");
1774                }
1775            }
1776            std::net::IpAddr::V6(v6) => {
1777                if v6.is_loopback() || v6.is_unicast_link_local() {
1778                    return Err("private/internal IP addresses are not allowed");
1779                }
1780            }
1781        }
1782    }
1783
1784    if host == "169.254.169.254" || host == "metadata.google.internal" {
1785        return Err("metadata endpoints are not allowed");
1786    }
1787
1788    Ok(())
1789}
1790
1791fn verify_push_signature(
1792    store: &HubStorage,
1793    req: &PushRequest,
1794    sig_bytes: &[u8],
1795) -> Result<(), String> {
1796    if sig_bytes.len() != 64 {
1797        return Err("signature must be 64 bytes".to_owned());
1798    }
1799    let signature = Signature::from_bytes(
1800        sig_bytes
1801            .try_into()
1802            .map_err(|_| "invalid signature length")?,
1803    );
1804
1805    let canonical = canonical_push_bytes(req);
1806
1807    let mut authors: HashSet<&str> = HashSet::new();
1808    for patch in &req.patches {
1809        authors.insert(&patch.author);
1810    }
1811
1812    for author in &authors {
1813        let pub_key_bytes = match store.get_authorized_key(author) {
1814            Ok(Some(bytes)) => bytes,
1815            Ok(None) => continue,
1816            Err(e) => {
1817                tracing::warn!("Failed to get authorized key for '{}': {e}", author);
1818                continue;
1819            }
1820        };
1821        if pub_key_bytes.len() != 32 {
1822            continue;
1823        }
1824        let pub_key_array: [u8; 32] = pub_key_bytes
1825            .try_into()
1826            .map_err(|_| "invalid public key length")?;
1827        let verifying_key = VerifyingKey::from_bytes(&pub_key_array)
1828            .map_err(|e| format!("invalid public key: {e}"))?;
1829        if verifying_key.verify(&canonical, &signature).is_ok() {
1830            return Ok(());
1831        }
1832    }
1833
1834    Err("no matching authorized key found for signature".to_owned())
1835}
1836
1837fn collect_ancestors(
1838    all_patches: &[PatchProto],
1839    known_branches: &[BranchProto],
1840) -> HashSet<String> {
1841    let patch_map: std::collections::HashMap<String, &PatchProto> = all_patches
1842        .iter()
1843        .map(|p| (hash_to_hex(&p.id), p))
1844        .collect();
1845
1846    let mut ancestors = HashSet::new();
1847    let mut stack: Vec<String> = known_branches
1848        .iter()
1849        .filter_map(|b| {
1850            let hex = hash_to_hex(&b.target_id);
1851            if patch_map.contains_key(&hex) {
1852                Some(hex)
1853            } else {
1854                None
1855            }
1856        })
1857        .collect();
1858
1859    while let Some(id_hex) = stack.pop() {
1860        if ancestors.insert(id_hex.clone())
1861            && let Some(patch) = patch_map.get(&id_hex)
1862        {
1863            for parent in &patch.parent_ids {
1864                let parent_hex = hash_to_hex(parent);
1865                if !ancestors.contains(&parent_hex) {
1866                    stack.push(parent_hex);
1867                }
1868            }
1869        }
1870    }
1871
1872    ancestors
1873}
1874
1875fn collect_new_patches(
1876    all_patches: &[PatchProto],
1877    client_ancestors: &HashSet<String>,
1878) -> Vec<PatchProto> {
1879    let patch_map: std::collections::HashMap<String, &PatchProto> = all_patches
1880        .iter()
1881        .map(|p| (hash_to_hex(&p.id), p))
1882        .collect();
1883
1884    let mut reachable: HashSet<String> = HashSet::new();
1885    let mut stack: Vec<String> = all_patches.iter().map(|p| hash_to_hex(&p.id)).collect();
1886
1887    while let Some(id_hex) = stack.pop() {
1888        if reachable.insert(id_hex.clone())
1889            && let Some(patch) = patch_map.get(&id_hex)
1890        {
1891            for parent in &patch.parent_ids {
1892                let parent_hex = hash_to_hex(parent);
1893                if !reachable.contains(&parent_hex) {
1894                    stack.push(parent_hex);
1895                }
1896            }
1897        }
1898    }
1899
1900    let mut new_ids: Vec<String> = reachable
1901        .into_iter()
1902        .filter(|id| !client_ancestors.contains(id))
1903        .collect();
1904    // Sort to ensure deterministic ordering for topological sort input.
1905    new_ids.sort();
1906
1907    let mut seen: HashSet<String> = HashSet::new();
1908    let mut stack: Vec<String> = new_ids;
1909
1910    while let Some(id_hex) = stack.pop() {
1911        if seen.insert(id_hex.clone())
1912            && let Some(patch) = patch_map.get(&id_hex)
1913        {
1914            for parent in &patch.parent_ids {
1915                let parent_hex = hash_to_hex(parent);
1916                if !client_ancestors.contains(&parent_hex) && !seen.contains(&parent_hex) {
1917                    stack.push(parent_hex);
1918                }
1919            }
1920        }
1921    }
1922
1923    let mut result: Vec<PatchProto> = seen
1924        .into_iter()
1925        .filter_map(|id| patch_map.get(&id).map(|p| (*p).clone()))
1926        .collect();
1927
1928    topological_sort(&mut result);
1929    result
1930}
1931
1932fn topological_sort(patches: &mut Vec<PatchProto>) {
1933    let index_map: std::collections::HashMap<String, usize> = patches
1934        .iter()
1935        .enumerate()
1936        .map(|(i, p)| (hash_to_hex(&p.id), i))
1937        .collect();
1938
1939    let n = patches.len();
1940    let mut visited = vec![false; n];
1941    let mut order = Vec::with_capacity(n);
1942
1943    for i in 0..n {
1944        if !visited[i] {
1945            dfs(i, patches, &index_map, &mut visited, &mut order);
1946        }
1947    }
1948
1949    let sorted: Vec<PatchProto> = order.into_iter().map(|i| patches[i].clone()).collect();
1950    *patches = sorted;
1951}
1952
1953fn dfs(
1954    idx: usize,
1955    patches: &[PatchProto],
1956    index_map: &std::collections::HashMap<String, usize>,
1957    visited: &mut [bool],
1958    order: &mut Vec<usize>,
1959) {
1960    visited[idx] = true;
1961    let patch = &patches[idx];
1962    for parent in &patch.parent_ids {
1963        let parent_hex = hash_to_hex(parent);
1964        if let Some(&parent_idx) = index_map.get(&parent_hex)
1965            && !visited[parent_idx]
1966        {
1967            dfs(parent_idx, patches, index_map, visited, order);
1968        }
1969    }
1970    order.push(idx);
1971}
1972
1973async fn check_auth(hub: &SutureHubServer, headers: &HeaderMap) -> Result<(), StatusCode> {
1974    if hub.no_auth {
1975        return Ok(());
1976    }
1977
1978    let store = hub.storage.read().await;
1979    let auth_keys_configured = store.has_authorized_keys().unwrap_or_else(|e| {
1980        tracing::error!("Failed to check authorized keys: {e}");
1981        true
1982    });
1983    let tokens_exist = store.has_tokens().unwrap_or_else(|e| {
1984        tracing::error!("Failed to check tokens: {e}");
1985        true
1986    });
1987    drop(store);
1988
1989    if !auth_keys_configured && !tokens_exist {
1990        return Ok(());
1991    }
1992
1993    if let Some(auth_header) = headers.get("authorization")
1994        && let Ok(auth_str) = auth_header.to_str()
1995        && let Some(token) = auth_str.strip_prefix("Bearer ")
1996    {
1997        let store = hub.storage.read().await;
1998        if store.verify_token(token).unwrap_or_else(|e| {
1999            tracing::error!("Failed to verify token: {e}");
2000            false
2001        }) {
2002            return Ok(());
2003        }
2004    }
2005
2006    Err(StatusCode::UNAUTHORIZED)
2007}
2008
2009async fn resolve_user(hub: &SutureHubServer, headers: &HeaderMap) -> Option<UserInfo> {
2010    if let Some(auth_header) = headers.get("authorization")
2011        && let Ok(auth_str) = auth_header.to_str()
2012        && let Some(token) = auth_str.strip_prefix("Bearer ")
2013    {
2014        let store = hub.storage.read().await;
2015        return match store.get_user_by_token(token) {
2016            Ok(Some(user)) => Some(user),
2017            Ok(None) => None,
2018            Err(e) => {
2019                tracing::warn!("Failed to get user by token: {e}");
2020                None
2021            }
2022        };
2023    }
2024    None
2025}
2026
2027async fn require_role(
2028    hub: &SutureHubServer,
2029    headers: &HeaderMap,
2030    required_role: &Role,
2031) -> Result<UserInfo, StatusCode> {
2032    if hub.no_auth {
2033        return Err(StatusCode::UNAUTHORIZED);
2034    }
2035
2036    let user = resolve_user(hub, headers)
2037        .await
2038        .ok_or(StatusCode::UNAUTHORIZED)?;
2039    let user_role = Role::parse(&user.role);
2040
2041    if user_role >= *required_role {
2042        Ok(user)
2043    } else {
2044        Err(StatusCode::FORBIDDEN)
2045    }
2046}
2047
2048fn generate_api_token() -> String {
2049    let mut bytes = [0u8; 32];
2050    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut bytes);
2051    hex::encode(bytes)
2052}
2053
2054fn generate_random_token() -> String {
2055    let mut bytes = [0u8; 32];
2056    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut bytes);
2057    hex::encode(bytes)
2058}
2059
2060pub async fn push_compressed_handler(
2061    State(hub): State<Arc<SutureHubServer>>,
2062    headers: HeaderMap,
2063    Json(req): Json<PushRequest>,
2064) -> (StatusCode, Json<PushResponse>) {
2065    if let Err(status) = check_auth(&hub, &headers).await {
2066        return (
2067            status,
2068            Json(PushResponse {
2069                success: false,
2070                error: Some("authentication failed".to_owned()),
2071                existing_patches: vec![],
2072            }),
2073        );
2074    }
2075    let mut req = req;
2076    for blob in &mut req.blobs {
2077        let compressed_data = match base64_decode(&blob.data) {
2078            Ok(d) => d,
2079            Err(e) => {
2080                return (
2081                    StatusCode::BAD_REQUEST,
2082                    Json(PushResponse {
2083                        success: false,
2084                        error: Some(format!("invalid base64 in compressed blob: {e}")),
2085                        existing_patches: vec![],
2086                    }),
2087                );
2088            }
2089        };
2090        let decompressed = match suture_protocol::decompress(&compressed_data) {
2091            Ok(d) => d,
2092            Err(e) => {
2093                return (
2094                    StatusCode::BAD_REQUEST,
2095                    Json(PushResponse {
2096                        success: false,
2097                        error: Some(e),
2098                        existing_patches: vec![],
2099                    }),
2100                );
2101            }
2102        };
2103        blob.data = base64_encode(&decompressed);
2104    }
2105    match hub.handle_push(req).await {
2106        Ok(resp) => (StatusCode::OK, Json(resp)),
2107        Err((status, resp)) => (status, Json(resp)),
2108    }
2109}
2110
2111pub async fn pull_compressed_handler(
2112    State(hub): State<Arc<SutureHubServer>>,
2113    headers: HeaderMap,
2114    Json(req): Json<PullRequest>,
2115) -> (StatusCode, Json<PullResponse>) {
2116    if let Err(status) = check_auth(&hub, &headers).await {
2117        return (
2118            status,
2119            Json(PullResponse {
2120                success: false,
2121                error: Some("authentication failed".to_owned()),
2122                patches: vec![],
2123                branches: vec![],
2124                blobs: vec![],
2125            }),
2126        );
2127    }
2128    let mut resp = hub.handle_pull(req).await;
2129    if resp.success {
2130        for blob in &mut resp.blobs {
2131            let Ok(raw) = base64_decode(&blob.data) else { continue };
2132            let Ok(compressed) = suture_protocol::compress(&raw) else { continue };
2133            blob.data = base64_encode(&compressed);
2134        }
2135    }
2136    let status = if resp.success {
2137        StatusCode::OK
2138    } else {
2139        StatusCode::NOT_FOUND
2140    };
2141    (status, Json(resp))
2142}
2143
2144pub async fn push_handler(
2145    State(hub): State<Arc<SutureHubServer>>,
2146    headers: HeaderMap,
2147    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2148    Json(req): Json<PushRequest>,
2149) -> (StatusCode, HeaderMap, Json<PushResponse>) {
2150    let ip = addr.ip().to_string();
2151    if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
2152        let mut hdrs = HeaderMap::new();
2153        if let Ok(val) = retry_after.to_string().parse() {
2154            hdrs.insert(axum::http::header::RETRY_AFTER, val);
2155        }
2156        return (
2157            StatusCode::TOO_MANY_REQUESTS,
2158            hdrs,
2159            Json(PushResponse {
2160                success: false,
2161                error: Some("rate limit exceeded".to_owned()),
2162                existing_patches: vec![],
2163            }),
2164        );
2165    }
2166
2167    if let Err(status) = check_auth(&hub, &headers).await {
2168        return (
2169            status,
2170            HeaderMap::new(),
2171            Json(PushResponse {
2172                success: false,
2173                error: Some("authentication failed".to_owned()),
2174                existing_patches: vec![],
2175            }),
2176        );
2177    }
2178
2179    if !hub.no_auth
2180        && let Some(user) = resolve_user(&hub, &headers).await
2181        && Role::parse(&user.role) < Role::Member
2182    {
2183        return (
2184            StatusCode::FORBIDDEN,
2185            HeaderMap::new(),
2186            Json(PushResponse {
2187                success: false,
2188                error: Some("insufficient permissions: readers cannot push".to_owned()),
2189                existing_patches: vec![],
2190            }),
2191        );
2192    }
2193
2194    match hub.handle_push(req).await {
2195        Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
2196        Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
2197    }
2198}
2199
2200pub async fn pull_handler(
2201    State(hub): State<Arc<SutureHubServer>>,
2202    headers: HeaderMap,
2203    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2204    Json(req): Json<PullRequest>,
2205) -> (StatusCode, HeaderMap, Json<PullResponse>) {
2206    let ip = addr.ip().to_string();
2207    if let Err(retry_after) = hub.check_rate_limit(&ip, "pull") {
2208        let mut hdrs = HeaderMap::new();
2209        if let Ok(val) = retry_after.to_string().parse() {
2210            hdrs.insert(axum::http::header::RETRY_AFTER, val);
2211        }
2212        return (
2213            StatusCode::TOO_MANY_REQUESTS,
2214            hdrs,
2215            Json(PullResponse {
2216                success: false,
2217                error: Some("rate limit exceeded".to_owned()),
2218                patches: vec![],
2219                branches: vec![],
2220                blobs: vec![],
2221            }),
2222        );
2223    }
2224
2225    if let Err(status) = check_auth(&hub, &headers).await {
2226        return (
2227            status,
2228            HeaderMap::new(),
2229            Json(PullResponse {
2230                success: false,
2231                error: Some("authentication failed".to_owned()),
2232                patches: vec![],
2233                branches: vec![],
2234                blobs: vec![],
2235            }),
2236        );
2237    }
2238    let resp = hub.handle_pull(req).await;
2239    let status = if resp.success {
2240        StatusCode::OK
2241    } else {
2242        StatusCode::NOT_FOUND
2243    };
2244    (status, HeaderMap::new(), Json(resp))
2245}
2246
2247pub async fn list_repos_handler(
2248    State(hub): State<Arc<SutureHubServer>>,
2249) -> Json<ListReposResponse> {
2250    Json(hub.handle_list_repos().await)
2251}
2252
2253pub async fn repo_info_handler(
2254    State(hub): State<Arc<SutureHubServer>>,
2255    Path(repo_id): Path<String>,
2256) -> (StatusCode, Json<RepoInfoResponse>) {
2257    let resp = hub.handle_repo_info(&repo_id).await;
2258    let status = if resp.success {
2259        StatusCode::OK
2260    } else {
2261        StatusCode::NOT_FOUND
2262    };
2263    (status, Json(resp))
2264}
2265
2266pub async fn handshake_handler(
2267    Json(req): Json<crate::types::HandshakeRequest>,
2268) -> Json<crate::types::HandshakeResponse> {
2269    let compatible = req.client_version == crate::types::PROTOCOL_VERSION;
2270    Json(crate::types::HandshakeResponse {
2271        server_version: crate::types::PROTOCOL_VERSION,
2272        server_name: "suture-hub".to_owned(),
2273        compatible,
2274    })
2275}
2276
2277/// GET /handshake — returns version info without requiring a request body.
2278/// Used by `suture push`/`suture pull` which send a bare GET for compatibility checking.
2279pub async fn handshake_get_handler() -> Json<crate::types::HandshakeResponse> {
2280    Json(crate::types::HandshakeResponse {
2281        server_version: crate::types::PROTOCOL_VERSION,
2282        server_name: "suture-hub".to_owned(),
2283        compatible: true,
2284    })
2285}
2286
2287#[derive(Debug, serde::Serialize)]
2288pub struct TokenResponse {
2289    pub token: String,
2290    pub created_at: u64,
2291}
2292
2293pub async fn create_token_handler(
2294    State(hub): State<Arc<SutureHubServer>>,
2295    headers: HeaderMap,
2296    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2297) -> (StatusCode, HeaderMap, Json<TokenResponse>) {
2298    let ip = addr.ip().to_string();
2299    if let Err(retry_after) = hub.check_rate_limit(&ip, "token_create") {
2300        let mut hdrs = HeaderMap::new();
2301        if let Ok(val) = retry_after.to_string().parse() {
2302            hdrs.insert(axum::http::header::RETRY_AFTER, val);
2303        }
2304        return (
2305            StatusCode::TOO_MANY_REQUESTS,
2306            hdrs,
2307            Json(TokenResponse {
2308                token: String::new(),
2309                created_at: 0,
2310            }),
2311        );
2312    }
2313
2314    if hub.no_auth {
2315        let token = generate_random_token();
2316        let created_at = std::time::SystemTime::now()
2317            .duration_since(std::time::UNIX_EPOCH)
2318            .unwrap_or_default()
2319            .as_secs();
2320        let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2321        let store = hub.storage.write().await;
2322        if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2323            tracing::error!("Failed to store auth token: {}", e);
2324            return (
2325                StatusCode::INTERNAL_SERVER_ERROR,
2326                HeaderMap::new(),
2327                Json(TokenResponse { token: String::new(), created_at: 0 }),
2328            );
2329        }
2330        return (
2331            StatusCode::OK,
2332            HeaderMap::new(),
2333            Json(TokenResponse { token, created_at }),
2334        );
2335    }
2336
2337    let store = hub.storage.read().await;
2338    let tokens_exist = store.has_tokens().unwrap_or_else(|e| {
2339        tracing::error!("Failed to check tokens: {e}");
2340        true
2341    });
2342    let users_exist = store.has_users().unwrap_or_else(|e| {
2343        tracing::error!("Failed to check users: {e}");
2344        true
2345    });
2346    let auth_keys_configured = store.has_authorized_keys().unwrap_or_else(|e| {
2347        tracing::error!("Failed to check authorized keys: {e}");
2348        true
2349    });
2350    drop(store);
2351
2352    if !tokens_exist && !users_exist && !auth_keys_configured {
2353        let token = generate_random_token();
2354        let created_at = std::time::SystemTime::now()
2355            .duration_since(std::time::UNIX_EPOCH)
2356            .unwrap_or_default()
2357            .as_secs();
2358        let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2359        let store = hub.storage.write().await;
2360        if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2361            tracing::error!("Failed to store auth token: {}", e);
2362            return (
2363                StatusCode::INTERNAL_SERVER_ERROR,
2364                HeaderMap::new(),
2365                Json(TokenResponse { token: String::new(), created_at: 0 }),
2366            );
2367        }
2368        return (
2369            StatusCode::OK,
2370            HeaderMap::new(),
2371            Json(TokenResponse { token, created_at }),
2372        );
2373    }
2374
2375    let user = resolve_user(&hub, &headers).await;
2376    if let Some(u) = user {
2377        let role = Role::parse(&u.role);
2378        if role < Role::Admin {
2379            return (
2380                StatusCode::FORBIDDEN,
2381                HeaderMap::new(),
2382                Json(TokenResponse {
2383                    token: String::new(),
2384                    created_at: 0,
2385                }),
2386            );
2387        }
2388    } else {
2389        let store = hub.storage.read().await;
2390        let valid_token = if let Some(auth_header) = headers.get("authorization")
2391            && let Ok(auth_str) = auth_header.to_str()
2392            && let Some(token) = auth_str.strip_prefix("Bearer ")
2393        {
2394            store.verify_token(token).unwrap_or_else(|e| {
2395                tracing::error!("Failed to verify token: {e}");
2396                false
2397            })
2398        } else {
2399            false
2400        };
2401        drop(store);
2402        if !valid_token {
2403            return (
2404                StatusCode::UNAUTHORIZED,
2405                HeaderMap::new(),
2406                Json(TokenResponse {
2407                    token: String::new(),
2408                    created_at: 0,
2409                }),
2410            );
2411        }
2412    }
2413
2414    let token = generate_random_token();
2415    let created_at = std::time::SystemTime::now()
2416        .duration_since(std::time::UNIX_EPOCH)
2417        .unwrap_or_default()
2418        .as_secs();
2419    let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2420
2421    let store = hub.storage.write().await;
2422    if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2423        tracing::error!("Failed to store auth token: {e}");
2424        return (
2425            StatusCode::INTERNAL_SERVER_ERROR,
2426            HeaderMap::new(),
2427            Json(TokenResponse {
2428                token: String::new(),
2429                created_at: 0,
2430            }),
2431        );
2432    }
2433
2434    (
2435        StatusCode::OK,
2436        HeaderMap::new(),
2437        Json(TokenResponse { token, created_at }),
2438    )
2439}
2440
2441#[derive(Debug, serde::Serialize)]
2442pub struct VerifyResponse {
2443    pub valid: bool,
2444}
2445
2446pub async fn verify_token_handler(
2447    State(hub): State<Arc<SutureHubServer>>,
2448    Json(auth_req): Json<crate::types::AuthRequest>,
2449) -> Json<VerifyResponse> {
2450    let valid = match &auth_req.method {
2451        crate::types::AuthMethod::Token(token) => {
2452            let store = hub.storage.read().await;
2453            store.verify_token(token).unwrap_or_else(|e| {
2454                tracing::warn!("Failed to verify token: {e}");
2455                false
2456            })
2457        }
2458        _ => false,
2459    };
2460    Json(VerifyResponse { valid })
2461}
2462
2463pub async fn mirror_setup_handler(
2464    State(hub): State<Arc<SutureHubServer>>,
2465    headers: HeaderMap,
2466    Json(req): Json<crate::types::MirrorSetupRequest>,
2467) -> (StatusCode, Json<crate::types::MirrorSetupResponse>) {
2468    if let Err(status) = check_auth(&hub, &headers).await {
2469        let resp = crate::types::MirrorSetupResponse { success: false, mirror_id: None, error: Some("unauthorized".to_owned()) };
2470        return (status, Json(resp));
2471    }
2472    let resp = hub.handle_mirror_setup(req).await;
2473    let status = if resp.success {
2474        StatusCode::OK
2475    } else {
2476        StatusCode::BAD_REQUEST
2477    };
2478    (status, Json(resp))
2479}
2480
2481pub async fn mirror_sync_handler(
2482    State(hub): State<Arc<SutureHubServer>>,
2483    headers: HeaderMap,
2484    Json(req): Json<crate::types::MirrorSyncRequest>,
2485) -> (StatusCode, Json<crate::types::MirrorSyncResponse>) {
2486    if let Err(status) = check_auth(&hub, &headers).await {
2487        let resp = crate::types::MirrorSyncResponse { success: false, error: Some("unauthorized".to_owned()), patches_synced: 0, branches_synced: 0 };
2488        return (status, Json(resp));
2489    }
2490    let store = hub.storage.read().await;
2491    let actual_mirror_id = if req.mirror_id == 0 {
2492        let repo_name = req.local_repo.clone().unwrap_or_default();
2493        match store.get_mirror_by_repo(&repo_name) {
2494            Ok(Some(id)) => id,
2495            _ => {
2496                return (
2497                    StatusCode::BAD_REQUEST,
2498                    Json(crate::types::MirrorSyncResponse {
2499                        success: false,
2500                        error: Some("mirror not found by local_repo".to_owned()),
2501                        patches_synced: 0,
2502                        branches_synced: 0,
2503                    }),
2504                );
2505            }
2506        }
2507    } else {
2508        req.mirror_id
2509    };
2510    drop(store);
2511    let actual_req = crate::types::MirrorSyncRequest {
2512        mirror_id: actual_mirror_id,
2513        local_repo: req.local_repo,
2514        remote_url: req.remote_url,
2515    };
2516    let resp = hub.handle_mirror_sync(actual_req).await;
2517    let status = if resp.success {
2518        StatusCode::OK
2519    } else {
2520        StatusCode::INTERNAL_SERVER_ERROR
2521    };
2522    (status, Json(resp))
2523}
2524
2525pub async fn mirror_status_handler(
2526    State(hub): State<Arc<SutureHubServer>>,
2527    Json(req): Json<crate::types::MirrorStatusRequest>,
2528) -> (StatusCode, Json<crate::types::MirrorStatusResponse>) {
2529    let resp = hub.handle_mirror_status(req).await;
2530    (StatusCode::OK, Json(resp))
2531}
2532
2533pub async fn mirror_status_get_handler(
2534    State(hub): State<Arc<SutureHubServer>>,
2535) -> (StatusCode, Json<crate::types::MirrorStatusResponse>) {
2536    let resp = hub
2537        .handle_mirror_status(crate::types::MirrorStatusRequest {
2538            mirror_id: None,
2539            repo_name: None,
2540        })
2541        .await;
2542    (StatusCode::OK, Json(resp))
2543}
2544
2545fn base64_encode(data: &[u8]) -> String {
2546    use base64::Engine;
2547    base64::engine::general_purpose::STANDARD.encode(data)
2548}
2549
2550fn base64_decode(s: &str) -> Result<Vec<u8>, String> {
2551    use base64::Engine;
2552    base64::engine::general_purpose::STANDARD
2553        .decode(s)
2554        .map_err(|e| e.to_string())
2555}
2556
2557pub async fn repo_branches_handler(
2558    State(hub): State<Arc<SutureHubServer>>,
2559    Path(repo_id): Path<String>,
2560) -> (StatusCode, Json<Vec<BranchProto>>) {
2561    let store = hub.storage.read().await;
2562        let branches = store.get_branches(&repo_id).unwrap_or_else(|e| {
2563            tracing::warn!("store get_branches failed: {e}");
2564            Default::default()
2565        });
2566    (StatusCode::OK, Json(branches))
2567}
2568
2569pub async fn repo_patches_handler(
2570    State(hub): State<Arc<SutureHubServer>>,
2571    Path(repo_id): Path<String>,
2572    Query(params): Query<PaginationParams>,
2573) -> (StatusCode, Json<serde_json::Value>) {
2574    let offset = params
2575        .cursor
2576        .as_deref()
2577        .and_then(decode_cursor)
2578        .unwrap_or_else(|| u64::from(params.offset.unwrap_or(0)));
2579    let limit = params.limit.unwrap_or(50);
2580    let (patches, next_cursor) = hub
2581        .handle_repo_patches_cursor(&repo_id, offset, limit)
2582        .await;
2583    (
2584        StatusCode::OK,
2585        Json(serde_json::json!({
2586            "patches": patches,
2587            "next_cursor": next_cursor.unwrap_or_default(),
2588        })),
2589    )
2590}
2591
2592pub async fn repo_tree_handler(
2593    State(hub): State<Arc<SutureHubServer>>,
2594    Path((repo_id, branch)): Path<(String, String)>,
2595) -> (StatusCode, Json<serde_json::Value>) {
2596    let store = hub.storage.read().await;
2597    match store.get_tree_at_branch(&repo_id, &branch) {
2598        Ok(entries) => {
2599            let files: Vec<serde_json::Value> = entries
2600                .into_iter()
2601                .map(|e| {
2602                    serde_json::json!({
2603                        "path": e.path,
2604                        "content_hash": e.content_hash,
2605                    })
2606                })
2607                .collect();
2608            (
2609                StatusCode::OK,
2610                Json(serde_json::json!({"success": true, "files": files})),
2611            )
2612        }
2613        Err(e) => (
2614            StatusCode::INTERNAL_SERVER_ERROR,
2615            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2616        ),
2617    }
2618}
2619
2620pub async fn protect_branch_handler(
2621    State(hub): State<Arc<SutureHubServer>>,
2622    headers: HeaderMap,
2623    Path((repo_id, branch)): Path<(String, String)>,
2624) -> (StatusCode, Json<serde_json::Value>) {
2625    if let Err(status) = check_auth(&hub, &headers).await {
2626        return (status, Json(serde_json::json!({"success": false, "error": "unauthorized"})));
2627    }
2628    let store = hub.storage.write().await;
2629    match store.protect_branch(&repo_id, &branch) {
2630        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2631        Err(e) => (
2632            StatusCode::INTERNAL_SERVER_ERROR,
2633            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2634        ),
2635    }
2636}
2637
2638pub async fn unprotect_branch_handler(
2639    State(hub): State<Arc<SutureHubServer>>,
2640    headers: HeaderMap,
2641    Path((repo_id, branch)): Path<(String, String)>,
2642) -> (StatusCode, Json<serde_json::Value>) {
2643    if let Err(status) = check_auth(&hub, &headers).await {
2644        return (status, Json(serde_json::json!({"success": false, "error": "unauthorized"})));
2645    }
2646    let store = hub.storage.write().await;
2647    match store.unprotect_branch(&repo_id, &branch) {
2648        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2649        Err(e) => (
2650            StatusCode::INTERNAL_SERVER_ERROR,
2651            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2652        ),
2653    }
2654}
2655
2656pub async fn create_repo_handler(
2657    State(hub): State<Arc<SutureHubServer>>,
2658    headers: HeaderMap,
2659    Json(req): Json<crate::types::CreateRepoRequest>,
2660) -> (StatusCode, Json<serde_json::Value>) {
2661    if let Err(status) = check_auth(&hub, &headers).await {
2662        return (
2663            status,
2664            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2665        );
2666    }
2667    let store = hub.storage.write().await;
2668    match store.ensure_repo(&req.repo_id) {
2669        Ok(_) => (
2670            StatusCode::CREATED,
2671            Json(serde_json::json!({"success": true, "repo_id": req.repo_id})),
2672        ),
2673        Err(e) => (
2674            StatusCode::INTERNAL_SERVER_ERROR,
2675            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2676        ),
2677    }
2678}
2679
2680pub async fn delete_repo_handler(
2681    State(hub): State<Arc<SutureHubServer>>,
2682    headers: HeaderMap,
2683    Path(repo_id): Path<String>,
2684) -> (StatusCode, Json<serde_json::Value>) {
2685    if let Err(status) = check_auth(&hub, &headers).await {
2686        return (
2687            status,
2688            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2689        );
2690    }
2691    let store = hub.storage.write().await;
2692    match store.delete_repo(&repo_id) {
2693        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2694        Err(e) => (
2695            StatusCode::INTERNAL_SERVER_ERROR,
2696            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2697        ),
2698    }
2699}
2700
2701pub async fn create_branch_handler(
2702    State(hub): State<Arc<SutureHubServer>>,
2703    headers: HeaderMap,
2704    Path(repo_id): Path<String>,
2705    Json(req): Json<crate::types::CreateBranchRequest>,
2706) -> (StatusCode, Json<serde_json::Value>) {
2707    if let Err(status) = check_auth(&hub, &headers).await {
2708        return (
2709            status,
2710            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2711        );
2712    }
2713    let store = hub.storage.write().await;
2714    match store.set_branch(&repo_id, &req.name, &req.target) {
2715        Ok(()) => {
2716            let branch_data = serde_json::json!({"name": req.name, "target": req.target});
2717            let manager = Arc::clone(&hub.webhook_manager);
2718            let storage = Arc::clone(&hub.storage);
2719            let rid = repo_id.clone();
2720            drop(store);
2721            tokio::spawn(async move {
2722                let hooks = {
2723                    let store = storage.read().await;
2724                    store.list_webhooks(&rid).unwrap_or_else(|e| {
2725                        tracing::warn!("store list_webhooks failed: {e}");
2726                        Default::default()
2727                    })
2728                };
2729                if !hooks.is_empty() {
2730                    let result = manager
2731                        .trigger(&hooks, "branch.create", &rid, branch_data)
2732                        .await;
2733                    if result.failed > 0 {
2734                        tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
2735                    }
2736                }
2737            });
2738            (
2739                StatusCode::CREATED,
2740                Json(serde_json::json!({"success": true})),
2741            )
2742        }
2743        Err(e) => (
2744            StatusCode::INTERNAL_SERVER_ERROR,
2745            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2746        ),
2747    }
2748}
2749
2750pub async fn delete_branch_handler(
2751    State(hub): State<Arc<SutureHubServer>>,
2752    headers: HeaderMap,
2753    Path((repo_id, branch_name)): Path<(String, String)>,
2754) -> (StatusCode, Json<serde_json::Value>) {
2755    if let Err(status) = check_auth(&hub, &headers).await {
2756        return (
2757            status,
2758            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2759        );
2760    }
2761    let store = hub.storage.write().await;
2762    match store.delete_branch(&repo_id, &branch_name) {
2763        Ok(()) => {
2764            let branch_data = serde_json::json!({"name": branch_name});
2765            let manager = Arc::clone(&hub.webhook_manager);
2766            let storage = Arc::clone(&hub.storage);
2767            let rid = repo_id.clone();
2768            drop(store);
2769            tokio::spawn(async move {
2770                let hooks = {
2771                    let store = storage.read().await;
2772                    store.list_webhooks(&rid).unwrap_or_else(|e| {
2773                        tracing::warn!("store list_webhooks failed: {e}");
2774                        Default::default()
2775                    })
2776                };
2777                if !hooks.is_empty() {
2778                    let result = manager
2779                        .trigger(&hooks, "branch.delete", &rid, branch_data)
2780                        .await;
2781                    if result.failed > 0 {
2782                        tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
2783                    }
2784                }
2785            });
2786            (StatusCode::OK, Json(serde_json::json!({"success": true})))
2787        }
2788        Err(e) => (
2789            StatusCode::INTERNAL_SERVER_ERROR,
2790            Json(serde_json::json!({"success": false, "error": e.to_string()})),
2791        ),
2792    }
2793}
2794
2795pub async fn get_blob_handler(
2796    State(hub): State<Arc<SutureHubServer>>,
2797    Path((repo_id, hash)): Path<(String, String)>,
2798) -> (StatusCode, Json<serde_json::Value>) {
2799    let store = hub.storage.read().await;
2800    match hub.blob_get(&store, &repo_id, &hash) {
2801        Ok(Some(data)) => {
2802            use base64::Engine;
2803            let encoded = base64::engine::general_purpose::STANDARD.encode(&data);
2804            (
2805                StatusCode::OK,
2806                Json(serde_json::json!({"success": true, "data": encoded})),
2807            )
2808        }
2809        Ok(None) => (
2810            StatusCode::NOT_FOUND,
2811            Json(serde_json::json!({"success": false, "error": "blob not found"})),
2812        ),
2813        Err(e) => (
2814            StatusCode::INTERNAL_SERVER_ERROR,
2815            Json(serde_json::json!({"success": false, "error": e})),
2816        ),
2817    }
2818}
2819
2820pub async fn lfs_batch_handler(
2821    State(hub): State<Arc<SutureHubServer>>,
2822    Json(req): Json<suture_protocol::LfsBatchRequest>,
2823) -> (StatusCode, Json<serde_json::Value>) {
2824    // Validate repo_id to prevent path traversal
2825    if !req.repo_id.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.') {
2826        return (
2827            StatusCode::BAD_REQUEST,
2828            Json(serde_json::json!({
2829                "message": "invalid repo_id: must contain only alphanumeric characters, hyphens, underscores, and dots"
2830            })),
2831        );
2832    }
2833
2834    let lfs_dir = match &hub.lfs_data_dir {
2835        Some(d) => d.clone(),
2836        None => {
2837            return (
2838                StatusCode::SERVICE_UNAVAILABLE,
2839                Json(serde_json::json!({
2840                    "message": "LFS storage not configured on this hub"
2841                })),
2842            );
2843        }
2844    };
2845
2846    let repo_dir = lfs_dir.join(&req.repo_id);
2847    let obj_dir = repo_dir.join("objects");
2848    let obj_dir_clone = obj_dir.clone();
2849    if let Err(e) = tokio::task::spawn_blocking(move || std::fs::create_dir_all(obj_dir_clone))
2850        .await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string())))
2851    {
2852        tracing::warn!("Failed to create directory {}: {}", obj_dir.display(), e);
2853    }
2854
2855    let mut actions = Vec::with_capacity(req.objects.len());
2856    for obj in &req.objects {
2857        let oid = &obj.oid;
2858        if !oid.chars().all(|c| c.is_ascii_hexdigit()) || oid.len() > 128 {
2859            continue;
2860        }
2861        let prefix = &oid[..2.min(oid.len())];
2862        let obj_path = obj_dir.join(prefix).join(oid);
2863
2864        let action = match req.operation {
2865            suture_protocol::LfsOperation::Upload => {
2866                let op = obj_path.clone();
2867                let exists = tokio::task::spawn_blocking(move || op.exists())
2868                    .await
2869                    .unwrap_or(false);
2870                if exists {
2871                    suture_protocol::LfsAction::None
2872                } else {
2873                    suture_protocol::LfsAction::Upload
2874                }
2875            }
2876            suture_protocol::LfsOperation::Download => {
2877                let op = obj_path.clone();
2878                let exists = tokio::task::spawn_blocking(move || op.exists())
2879                    .await
2880                    .unwrap_or(false);
2881                if exists {
2882                    suture_protocol::LfsAction::Download
2883                } else {
2884                    suture_protocol::LfsAction::None
2885                }
2886            }
2887        };
2888
2889        actions.push(suture_protocol::LfsObjectAction {
2890            oid: obj.oid.clone(),
2891            size: obj.size,
2892            action,
2893            href: None,
2894            header: None,
2895        });
2896    }
2897
2898    for action in &mut actions {
2899        if matches!(
2900            action.action,
2901            suture_protocol::LfsAction::Upload | suture_protocol::LfsAction::Download
2902        ) {
2903            action.href = Some(format!("/lfs/objects/{}/{}", req.repo_id, action.oid));
2904        }
2905    }
2906
2907    (
2908        StatusCode::OK,
2909        Json(serde_json::json!({
2910            "objects": actions,
2911            "transfer": "basic",
2912        })),
2913    )
2914}
2915
2916pub async fn lfs_upload_handler(
2917    State(hub): State<Arc<SutureHubServer>>,
2918    headers: HeaderMap,
2919    Path((repo_id, oid)): Path<(String, String)>,
2920    body: bytes::Bytes,
2921) -> (StatusCode, Json<serde_json::Value>) {
2922    if let Err(status) = check_auth(&hub, &headers).await {
2923        return (status, Json(serde_json::json!({ "message": "unauthorized" })));
2924    }
2925    // Validate repo_id and oid to prevent path traversal
2926    let is_safe = |s: &str| s.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.');
2927    if !is_safe(&repo_id) || !is_safe(&oid) {
2928        return (
2929            StatusCode::BAD_REQUEST,
2930            Json(serde_json::json!({ "message": "invalid repo_id or oid" })),
2931        );
2932    }
2933
2934    let lfs_dir = match &hub.lfs_data_dir {
2935        Some(d) => d.clone(),
2936        None => {
2937            return (
2938                StatusCode::SERVICE_UNAVAILABLE,
2939                Json(serde_json::json!({
2940                    "message": "LFS storage not configured"
2941                })),
2942            );
2943        }
2944    };
2945
2946    if body.is_empty() {
2947        return (
2948            StatusCode::BAD_REQUEST,
2949            Json(serde_json::json!({
2950                "message": "empty body"
2951            })),
2952        );
2953    }
2954
2955    use sha2::{Digest, Sha256};
2956    let hash = Sha256::digest(&body);
2957    let hash_hex = hex::encode(hash);
2958    if hash_hex != oid {
2959        return (
2960            StatusCode::BAD_REQUEST,
2961            Json(serde_json::json!({
2962                "message": format!("hash mismatch: expected {}, got {}", oid, hash_hex)
2963            })),
2964        );
2965    }
2966
2967    let prefix = &oid[..2.min(oid.len())];
2968    let obj_path = lfs_dir
2969        .join(&repo_id)
2970        .join("objects")
2971        .join(prefix)
2972        .join(&oid);
2973    if let Some(parent) = obj_path.parent() {
2974        let parent_owned = parent.to_owned();
2975        if let Err(e) = tokio::task::spawn_blocking(move || std::fs::create_dir_all(parent_owned))
2976            .await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string())))
2977        {
2978            tracing::warn!("Failed to create directory {}: {}", parent.display(), e);
2979        }
2980    }
2981    match tokio::task::spawn_blocking({
2982        let obj_path = obj_path.clone();
2983        let body = body.clone();
2984        move || std::fs::write(obj_path, body)
2985    }).await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string()))) {
2986        Ok(()) => (
2987            StatusCode::OK,
2988            Json(serde_json::json!({
2989                "message": "uploaded",
2990                "oid": oid,
2991                "size": body.len(),
2992            })),
2993        ),
2994        Err(e) => (
2995            StatusCode::INTERNAL_SERVER_ERROR,
2996            Json(serde_json::json!({
2997                "message": e.to_string()
2998            })),
2999        ),
3000    }
3001}
3002
3003pub async fn lfs_download_handler(
3004    State(hub): State<Arc<SutureHubServer>>,
3005    Path((repo_id, oid)): Path<(String, String)>,
3006) -> (StatusCode, axum::response::Response) {
3007    // Validate repo_id and oid to prevent path traversal
3008    let is_safe = |s: &str| s.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.');
3009    if !is_safe(&repo_id) || !is_safe(&oid) {
3010        let body = axum::body::Body::from("{\"message\":\"invalid repo_id or oid\"}");
3011        let response = axum::response::Response::builder().body(body).unwrap_or_else(|_| {
3012            axum::response::Response::new(axum::body::Body::from("{\"message\":\"invalid repo_id or oid\"}"))
3013        });
3014        return (StatusCode::BAD_REQUEST, response.into_response());
3015    }
3016
3017    let lfs_dir = match &hub.lfs_data_dir {
3018        Some(d) => d.clone(),
3019        None => {
3020            return (
3021                StatusCode::SERVICE_UNAVAILABLE,
3022                axum::response::Response::new(axum::body::Body::from(
3023                    serde_json::json!({"message": "LFS storage not configured"}).to_string(),
3024                )),
3025            );
3026        }
3027    };
3028
3029    let prefix = &oid[..2.min(oid.len())];
3030    let obj_path = lfs_dir
3031        .join(&repo_id)
3032        .join("objects")
3033        .join(prefix)
3034        .join(&oid);
3035
3036    match tokio::task::spawn_blocking({
3037        let obj_path = obj_path.clone();
3038        move || std::fs::read(obj_path)
3039    }).await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string()))) {
3040        Err(_) => (
3041            StatusCode::NOT_FOUND,
3042            axum::response::Response::new(axum::body::Body::from(
3043                serde_json::json!({"message": "object not found"}).to_string(),
3044            )),
3045        ),
3046        Ok(data) => {
3047            let len = data.len();
3048            let body = axum::body::Body::from(data);
3049            let response = axum::response::Response::builder()
3050                .status(StatusCode::OK)
3051                .header("Content-Type", "application/octet-stream")
3052                .header("Content-Length", len.to_string())
3053                .body(body)
3054                .unwrap_or_else(|e| {
3055                    tracing::error!("failed to build response: {}", e);
3056                    axum::response::Response::new(axum::body::Body::from("internal error"))
3057                });
3058            (StatusCode::OK, response)
3059        },
3060    }
3061}
3062
3063pub async fn login_handler(
3064    State(hub): State<Arc<SutureHubServer>>,
3065    Json(req): Json<crate::types::LoginRequest>,
3066) -> (StatusCode, Json<serde_json::Value>) {
3067    let store = hub.storage.read().await;
3068    let valid = store.verify_token(&req.token).unwrap_or_else(|e| {
3069        tracing::warn!("Failed to verify token: {e}");
3070        false
3071    });
3072    if !valid {
3073        return (
3074            StatusCode::UNAUTHORIZED,
3075            Json(serde_json::json!({"success": false, "error": "invalid token"})),
3076        );
3077    }
3078    let user = match store.get_user_by_token(&req.token) {
3079        Ok(Some(u)) => Some(u),
3080        Ok(None) => None,
3081        Err(e) => {
3082            tracing::error!("Failed to get user by token: {e}");
3083            None
3084        }
3085    };
3086    match user {
3087        Some(u) => (
3088            StatusCode::OK,
3089            Json(serde_json::json!({"success": true, "user": u, "token": req.token})),
3090        ),
3091        None => (
3092            StatusCode::UNAUTHORIZED,
3093            Json(serde_json::json!({"success": false, "error": "user not found"})),
3094        ),
3095    }
3096}
3097
3098pub async fn search_handler(
3099    State(hub): State<Arc<SutureHubServer>>,
3100    Query(params): Query<crate::types::SearchParams>,
3101) -> (StatusCode, Json<serde_json::Value>) {
3102    let store = hub.storage.read().await;
3103    let repos = match store.search_repos(&params.q) {
3104        Ok(r) => r,
3105        Err(e) => {
3106            tracing::error!("Failed to search repos: {e}");
3107            return (
3108                StatusCode::INTERNAL_SERVER_ERROR,
3109                Json(serde_json::json!({"error": "database error"})),
3110            );
3111        }
3112    };
3113    let mut patches = Vec::new();
3114    for repo_id in &repos {
3115        if let Ok(p) = store.search_patches(repo_id, &params.q) {
3116            patches.extend(p);
3117        }
3118    }
3119    (
3120        StatusCode::OK,
3121        Json(serde_json::json!({"repos": repos, "patches": patches})),
3122    )
3123}
3124
3125#[derive(serde::Deserialize)]
3126pub struct ActivityPaginationParams {
3127    pub limit: Option<u32>,
3128    pub cursor: Option<String>,
3129}
3130
3131pub async fn activity_handler(
3132    State(hub): State<Arc<SutureHubServer>>,
3133    Query(params): Query<ActivityPaginationParams>,
3134) -> (StatusCode, Json<serde_json::Value>) {
3135    let offset = params
3136        .cursor
3137        .as_deref()
3138        .and_then(decode_cursor)
3139        .unwrap_or(0);
3140    let limit = params.limit.unwrap_or(50).min(200) as usize;
3141    let store = hub.storage.read().await;
3142    let entries = store.get_replication_log(0).unwrap_or_else(|e| {
3143        tracing::warn!("store get_replication_log failed: {e}");
3144        Default::default()
3145    });
3146    let mut collected: Vec<_> = entries
3147        .into_iter()
3148        .skip(offset as usize)
3149        .take(limit + 1)
3150        .collect();
3151    let has_more = collected.len() > limit;
3152    if has_more {
3153        collected.truncate(limit);
3154    }
3155    let next_cursor = if has_more {
3156        Some(encode_cursor(offset + limit as u64))
3157    } else {
3158        None
3159    };
3160    (
3161        StatusCode::OK,
3162        Json(serde_json::json!({
3163            "entries": collected,
3164            "next_cursor": next_cursor.unwrap_or_default(),
3165        })),
3166    )
3167}
3168
3169pub async fn delete_mirror_handler(
3170    State(hub): State<Arc<SutureHubServer>>,
3171    headers: HeaderMap,
3172    Path(mirror_id): Path<i64>,
3173) -> (StatusCode, Json<serde_json::Value>) {
3174    if let Err(status) = check_auth(&hub, &headers).await {
3175        return (
3176            status,
3177            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3178        );
3179    }
3180    let store = hub.storage.write().await;
3181    match store.delete_mirror(mirror_id) {
3182        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
3183        Err(e) => (
3184            StatusCode::INTERNAL_SERVER_ERROR,
3185            Json(serde_json::json!({"success": false, "error": e.to_string()})),
3186        ),
3187    }
3188}
3189
3190async fn serve_index() -> Html<&'static str> {
3191    Html(include_str!("../static/index.html"))
3192}
3193
3194async fn serve_static_file(
3195    axum::extract::Path(path): axum::extract::Path<String>,
3196) -> impl IntoResponse {
3197    let content_type = if path.ends_with(".css") {
3198        "text/css"
3199    } else if path.ends_with(".js") {
3200        "application/javascript"
3201    } else if path.ends_with(".html") {
3202        "text/html"
3203    } else if path.ends_with(".json") {
3204        "application/json"
3205    } else {
3206        "application/octet-stream"
3207    };
3208
3209    let static_dir = std::path::Path::new("static");
3210    let file_path = static_dir.join(&path);
3211
3212    let Ok(canonical_static) = tokio::fs::canonicalize(static_dir).await else { return StatusCode::NOT_FOUND.into_response() };
3213    let Ok(canonical_file) = tokio::fs::canonicalize(&file_path).await else { return StatusCode::NOT_FOUND.into_response() };
3214
3215    if !canonical_file.starts_with(&canonical_static) {
3216        return StatusCode::FORBIDDEN.into_response();
3217    }
3218
3219    tokio::fs::read_to_string(&canonical_file).await.map_or_else(
3220        |_| StatusCode::NOT_FOUND.into_response(),
3221        |contents| {
3222            let headers = [(axum::http::header::CONTENT_TYPE, content_type)];
3223            (StatusCode::OK, headers, contents).into_response()
3224        },
3225    )
3226}
3227
3228pub async fn register_handler(
3229    State(hub): State<Arc<SutureHubServer>>,
3230    headers: HeaderMap,
3231    Json(req): Json<crate::types::RegisterRequest>,
3232) -> (StatusCode, Json<crate::types::RegisterResponse>) {
3233    match require_role(&hub, &headers, &Role::Admin).await {
3234        Ok(_) => {}
3235        Err(status) => {
3236            return (
3237                status,
3238                Json(crate::types::RegisterResponse {
3239                    success: false,
3240                    error: Some("admin access required".to_owned()),
3241                    user: None,
3242                }),
3243            );
3244        }
3245    }
3246
3247    let role = req.role.as_deref().unwrap_or("member");
3248    if !matches!(role, "admin" | "member" | "reader") {
3249        return (
3250            StatusCode::BAD_REQUEST,
3251            Json(crate::types::RegisterResponse {
3252                success: false,
3253                error: Some("role must be admin, member, or reader".to_owned()),
3254                user: None,
3255            }),
3256        );
3257    }
3258
3259    let api_token = generate_api_token();
3260
3261    let store = hub.storage.write().await;
3262    match store.create_user(&req.username, &req.display_name, role, &api_token) {
3263        Ok(()) => {
3264            let mut user = match store.get_user(&req.username) {
3265                Ok(Some(u)) => Some(u),
3266                Ok(None) => None,
3267                Err(e) => {
3268                    tracing::error!("Failed to get user after creation: {e}");
3269                    None
3270                }
3271            };
3272            if let Some(ref mut u) = user {
3273                u.api_token = Some(api_token);
3274            }
3275            (
3276                StatusCode::CREATED,
3277                Json(crate::types::RegisterResponse {
3278                    success: true,
3279                    error: None,
3280                    user,
3281                }),
3282            )
3283        }
3284        Err(e) => (
3285            StatusCode::CONFLICT,
3286            Json(crate::types::RegisterResponse {
3287                success: false,
3288                error: Some(format!("failed to create user: {e}")),
3289                user: None,
3290            }),
3291        ),
3292    }
3293}
3294
3295pub async fn list_users_handler(
3296    State(hub): State<Arc<SutureHubServer>>,
3297    headers: HeaderMap,
3298) -> (StatusCode, Json<crate::types::ListUsersResponse>) {
3299    match require_role(&hub, &headers, &Role::Admin).await {
3300        Ok(_) => {}
3301        Err(status) => {
3302            return (
3303                status,
3304                Json(crate::types::ListUsersResponse {
3305                    success: false,
3306                    error: Some("admin access required".to_owned()),
3307                    users: vec![],
3308                }),
3309            );
3310        }
3311    }
3312
3313    let store = hub.storage.read().await;
3314    match store.list_users() {
3315        Ok(users) => {
3316            // Redact API tokens before returning — they are secrets that
3317            // must never be exposed in list responses.
3318            let users: Vec<_> = users
3319                .into_iter()
3320                .map(|mut u| {
3321                    u.api_token = None;
3322                    u
3323                })
3324                .collect();
3325            (
3326                StatusCode::OK,
3327                Json(crate::types::ListUsersResponse {
3328                    success: true,
3329                    error: None,
3330                    users,
3331                }),
3332            )
3333        }
3334        Err(e) => (
3335            StatusCode::INTERNAL_SERVER_ERROR,
3336            Json(crate::types::ListUsersResponse {
3337                success: false,
3338                error: Some(format!("database error: {e}")),
3339                users: vec![],
3340            }),
3341        ),
3342    }
3343}
3344
3345pub async fn get_user_handler(
3346    State(hub): State<Arc<SutureHubServer>>,
3347    headers: HeaderMap,
3348    Path(username): Path<String>,
3349) -> (StatusCode, Json<crate::types::GetUserResponse>) {
3350    let requesting_user = resolve_user(&hub, &headers).await;
3351    let is_admin = requesting_user
3352        .as_ref()
3353        .is_some_and(|u| u.role == "admin");
3354    let is_self = requesting_user
3355        .as_ref()
3356        .is_some_and(|u| u.username == username);
3357
3358    if !is_admin && !is_self {
3359        return (
3360            StatusCode::FORBIDDEN,
3361            Json(crate::types::GetUserResponse {
3362                success: false,
3363                error: Some("access denied".to_owned()),
3364                user: None,
3365            }),
3366        );
3367    }
3368
3369    let store = hub.storage.read().await;
3370    match store.get_user(&username) {
3371        Ok(Some(user)) => {
3372            let mut resp_user = user;
3373            if is_self && !is_admin {
3374                resp_user.api_token = None;
3375            }
3376            (
3377                StatusCode::OK,
3378                Json(crate::types::GetUserResponse {
3379                    success: true,
3380                    error: None,
3381                    user: Some(resp_user),
3382                }),
3383            )
3384        }
3385        Ok(None) => (
3386            StatusCode::NOT_FOUND,
3387            Json(crate::types::GetUserResponse {
3388                success: false,
3389                error: Some("user not found".to_owned()),
3390                user: None,
3391            }),
3392        ),
3393        Err(e) => (
3394            StatusCode::INTERNAL_SERVER_ERROR,
3395            Json(crate::types::GetUserResponse {
3396                success: false,
3397                error: Some(format!("database error: {e}")),
3398                user: None,
3399            }),
3400        ),
3401    }
3402}
3403
3404pub async fn update_role_handler(
3405    State(hub): State<Arc<SutureHubServer>>,
3406    headers: HeaderMap,
3407    Path(username): Path<String>,
3408    Json(req): Json<crate::types::UpdateRoleRequest>,
3409) -> (StatusCode, Json<crate::types::UpdateRoleResponse>) {
3410    match require_role(&hub, &headers, &Role::Admin).await {
3411        Ok(_) => {}
3412        Err(status) => {
3413            return (
3414                status,
3415                Json(crate::types::UpdateRoleResponse {
3416                    success: false,
3417                    error: Some("admin access required".to_owned()),
3418                }),
3419            );
3420        }
3421    }
3422
3423    if !matches!(req.role.as_str(), "admin" | "member" | "reader") {
3424        return (
3425            StatusCode::BAD_REQUEST,
3426            Json(crate::types::UpdateRoleResponse {
3427                success: false,
3428                error: Some("role must be admin, member, or reader".to_owned()),
3429            }),
3430        );
3431    }
3432
3433    let store = hub.storage.write().await;
3434    match store.update_user_role(&username, &req.role) {
3435        Ok(()) => (
3436            StatusCode::OK,
3437            Json(crate::types::UpdateRoleResponse {
3438                success: true,
3439                error: None,
3440            }),
3441        ),
3442        Err(e) => (
3443            StatusCode::INTERNAL_SERVER_ERROR,
3444            Json(crate::types::UpdateRoleResponse {
3445                success: false,
3446                error: Some(format!("database error: {e}")),
3447            }),
3448        ),
3449    }
3450}
3451
3452pub async fn delete_user_handler(
3453    State(hub): State<Arc<SutureHubServer>>,
3454    headers: HeaderMap,
3455    Path(username): Path<String>,
3456) -> (StatusCode, Json<crate::types::DeleteUserResponse>) {
3457    match require_role(&hub, &headers, &Role::Admin).await {
3458        Ok(_) => {}
3459        Err(status) => {
3460            return (
3461                status,
3462                Json(crate::types::DeleteUserResponse {
3463                    success: false,
3464                    error: Some("admin access required".to_owned()),
3465                }),
3466            );
3467        }
3468    }
3469
3470    let store = hub.storage.write().await;
3471    match store.delete_user(&username) {
3472        Ok(()) => (
3473            StatusCode::OK,
3474            Json(crate::types::DeleteUserResponse {
3475                success: true,
3476                error: None,
3477            }),
3478        ),
3479        Err(e) => (
3480            StatusCode::INTERNAL_SERVER_ERROR,
3481            Json(crate::types::DeleteUserResponse {
3482                success: false,
3483                error: Some(format!("database error: {e}")),
3484            }),
3485        ),
3486    }
3487}
3488
3489pub async fn handshake_v2_handler(
3490    Json(req): Json<crate::types::HandshakeRequestV2>,
3491) -> Json<crate::types::HandshakeResponseV2> {
3492    let compatible = req.client_version == crate::types::PROTOCOL_VERSION_V2;
3493    Json(crate::types::HandshakeResponseV2 {
3494        server_version: crate::types::PROTOCOL_VERSION_V2,
3495        server_name: "suture-hub".to_owned(),
3496        compatible,
3497        server_capabilities: crate::types::ServerCapabilities {
3498            supports_delta: true,
3499            supports_compression: true,
3500            max_blob_size: 50 * 1024 * 1024,
3501            protocol_versions: vec![
3502                crate::types::PROTOCOL_VERSION,
3503                crate::types::PROTOCOL_VERSION_V2,
3504            ],
3505        },
3506    })
3507}
3508
3509pub async fn v2_pull_handler(
3510    State(hub): State<Arc<SutureHubServer>>,
3511    headers: HeaderMap,
3512    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3513    Json(req): Json<crate::types::PullRequestV2>,
3514) -> (StatusCode, HeaderMap, Json<crate::types::PullResponseV2>) {
3515    let ip = addr.ip().to_string();
3516    if let Err(retry_after) = hub.check_rate_limit(&ip, "pull") {
3517        let mut hdrs = HeaderMap::new();
3518        if let Ok(val) = retry_after.to_string().parse() {
3519            hdrs.insert(axum::http::header::RETRY_AFTER, val);
3520        }
3521        return (
3522            StatusCode::TOO_MANY_REQUESTS,
3523            hdrs,
3524            Json(crate::types::PullResponseV2 {
3525                success: false,
3526                error: Some("rate limit exceeded".to_owned()),
3527                patches: vec![],
3528                branches: vec![],
3529                blobs: vec![],
3530                deltas: vec![],
3531                protocol_version: crate::types::PROTOCOL_VERSION_V2,
3532            }),
3533        );
3534    }
3535
3536    if let Err(status) = check_auth(&hub, &headers).await {
3537        return (
3538            status,
3539            HeaderMap::new(),
3540            Json(crate::types::PullResponseV2 {
3541                success: false,
3542                error: Some("authentication failed".to_owned()),
3543                patches: vec![],
3544                branches: vec![],
3545                blobs: vec![],
3546                deltas: vec![],
3547                protocol_version: crate::types::PROTOCOL_VERSION_V2,
3548            }),
3549        );
3550    }
3551
3552    let resp = hub.handle_pull_v2(req).await;
3553    let status = if resp.success {
3554        StatusCode::OK
3555    } else {
3556        StatusCode::NOT_FOUND
3557    };
3558    (status, HeaderMap::new(), Json(resp))
3559}
3560
3561pub async fn v2_push_handler(
3562    State(hub): State<Arc<SutureHubServer>>,
3563    headers: HeaderMap,
3564    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3565    Json(req): Json<crate::types::PushRequestV2>,
3566) -> (StatusCode, HeaderMap, Json<PushResponse>) {
3567    let ip = addr.ip().to_string();
3568    if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
3569        let mut hdrs = HeaderMap::new();
3570        if let Ok(val) = retry_after.to_string().parse() {
3571            hdrs.insert(axum::http::header::RETRY_AFTER, val);
3572        }
3573        return (
3574            StatusCode::TOO_MANY_REQUESTS,
3575            hdrs,
3576            Json(PushResponse {
3577                success: false,
3578                error: Some("rate limit exceeded".to_owned()),
3579                existing_patches: vec![],
3580            }),
3581        );
3582    }
3583
3584    if let Err(status) = check_auth(&hub, &headers).await {
3585        return (
3586            status,
3587            HeaderMap::new(),
3588            Json(PushResponse {
3589                success: false,
3590                error: Some("authentication failed".to_owned()),
3591                existing_patches: vec![],
3592            }),
3593        );
3594    }
3595
3596    if !hub.no_auth
3597        && let Some(user) = resolve_user(&hub, &headers).await
3598        && Role::parse(&user.role) < Role::Member
3599    {
3600        return (
3601            StatusCode::FORBIDDEN,
3602            HeaderMap::new(),
3603            Json(PushResponse {
3604                success: false,
3605                error: Some("insufficient permissions: readers cannot push".to_owned()),
3606                existing_patches: vec![],
3607            }),
3608        );
3609    }
3610
3611    match hub.handle_push_v2(req).await {
3612        Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
3613        Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
3614    }
3615}
3616
3617pub async fn add_peer_handler(
3618    State(hub): State<Arc<SutureHubServer>>,
3619    headers: HeaderMap,
3620    Json(req): Json<AddPeerRequest>,
3621) -> (StatusCode, Json<AddPeerResponse>) {
3622    if let Err(status) = check_auth(&hub, &headers).await {
3623        return (status, Json(AddPeerResponse { success: false, peer_id: None, error: Some("unauthorized".to_owned()) }));
3624    }
3625    let role = hub.get_replication_role();
3626    if role != "leader" {
3627        return (
3628            StatusCode::FORBIDDEN,
3629            Json(AddPeerResponse {
3630                success: false,
3631                peer_id: None,
3632                error: Some("only the leader can manage peers".to_owned()),
3633            }),
3634        );
3635    }
3636    let resp = hub.handle_add_peer(req).await;
3637    let status = if resp.success {
3638        StatusCode::OK
3639    } else {
3640        StatusCode::BAD_REQUEST
3641    };
3642    (status, Json(resp))
3643}
3644
3645pub async fn remove_peer_handler(
3646    State(hub): State<Arc<SutureHubServer>>,
3647    headers: HeaderMap,
3648    Path(id): Path<i64>,
3649) -> (StatusCode, Json<RemovePeerResponse>) {
3650    if let Err(status) = check_auth(&hub, &headers).await {
3651        return (status, Json(RemovePeerResponse { success: false, error: Some("unauthorized".to_owned()) }));
3652    }
3653    let role = hub.get_replication_role();
3654    if role != "leader" {
3655        return (
3656            StatusCode::FORBIDDEN,
3657            Json(RemovePeerResponse {
3658                success: false,
3659                error: Some("only the leader can manage peers".to_owned()),
3660            }),
3661        );
3662    }
3663    let resp = hub.handle_remove_peer(id).await;
3664    let status = if resp.success {
3665        StatusCode::OK
3666    } else {
3667        StatusCode::BAD_REQUEST
3668    };
3669    (status, Json(resp))
3670}
3671
3672pub async fn list_peers_handler(
3673    State(hub): State<Arc<SutureHubServer>>,
3674) -> (StatusCode, Json<ListPeersResponse>) {
3675    let resp = hub.handle_list_peers().await;
3676    (StatusCode::OK, Json(resp))
3677}
3678
3679pub async fn replication_status_handler(
3680    State(hub): State<Arc<SutureHubServer>>,
3681) -> (StatusCode, Json<ReplicationStatusResponse>) {
3682    let resp = hub.handle_replication_status().await;
3683    (StatusCode::OK, Json(resp))
3684}
3685
3686pub async fn replication_sync_handler(
3687    State(hub): State<Arc<SutureHubServer>>,
3688    Json(entries): Json<Vec<ReplicationEntry>>,
3689) -> (StatusCode, Json<SyncResponse>) {
3690    let role = hub.get_replication_role();
3691    if role != "follower" && role != "standalone" {
3692        return (
3693            StatusCode::FORBIDDEN,
3694            Json(SyncResponse {
3695                success: false,
3696                applied: 0,
3697                error: Some("sync endpoint is for followers only".to_owned()),
3698            }),
3699        );
3700    }
3701    let resp = hub.handle_replication_sync(entries).await;
3702    let status = if resp.success {
3703        StatusCode::OK
3704    } else {
3705        StatusCode::INTERNAL_SERVER_ERROR
3706    };
3707    (status, Json(resp))
3708}
3709
3710pub async fn batch_push_handler(
3711    State(hub): State<Arc<SutureHubServer>>,
3712    headers: HeaderMap,
3713    ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3714    Json(req): Json<BatchPatchRequest>,
3715) -> (StatusCode, HeaderMap, Json<PushResponse>) {
3716    let ip = addr.ip().to_string();
3717    if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
3718        let mut hdrs = HeaderMap::new();
3719        if let Ok(val) = retry_after.to_string().parse() {
3720            hdrs.insert(axum::http::header::RETRY_AFTER, val);
3721        }
3722        return (
3723            StatusCode::TOO_MANY_REQUESTS,
3724            hdrs,
3725            Json(PushResponse {
3726                success: false,
3727                error: Some("rate limit exceeded".to_owned()),
3728                existing_patches: vec![],
3729            }),
3730        );
3731    }
3732
3733    if let Err(status) = check_auth(&hub, &headers).await {
3734        return (
3735            status,
3736            HeaderMap::new(),
3737            Json(PushResponse {
3738                success: false,
3739                error: Some("authentication failed".to_owned()),
3740                existing_patches: vec![],
3741            }),
3742        );
3743    }
3744
3745    match hub.handle_batch_push(req).await {
3746        Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
3747        Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
3748    }
3749}
3750
3751async fn replication_background_task(hub: Arc<SutureHubServer>) {
3752    let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
3753    loop {
3754        interval.tick().await;
3755
3756        let role = hub.get_replication_role();
3757        if role != "leader" {
3758            continue;
3759        }
3760
3761        let peers = {
3762            let store = hub.storage.read().await;
3763            match store.list_replication_peers() {
3764                Ok(p) => p,
3765                Err(e) => {
3766                    tracing::warn!("replication: failed to list peers: {e}");
3767                    continue;
3768                }
3769            }
3770        };
3771
3772        for peer in &peers {
3773            if peer.status != "active" {
3774                continue;
3775            }
3776
3777            let entries = {
3778                let store = hub.storage.read().await;
3779                match store.get_replication_log(peer.last_sync_seq) {
3780                    Ok(e) => e,
3781                    Err(e) => {
3782                        tracing::warn!("replication: failed to get log for peer {}: {e}", peer.id);
3783                        continue;
3784                    }
3785                }
3786            };
3787
3788            if entries.is_empty() {
3789                continue;
3790            }
3791
3792            let last_seq = entries.last().map_or(peer.last_sync_seq, |e| e.seq);
3793
3794            let client = match reqwest::Client::builder()
3795                .timeout(std::time::Duration::from_secs(10))
3796                .build()
3797            {
3798                Ok(c) => c,
3799                Err(e) => {
3800                    tracing::warn!(
3801                        "replication: failed to build client for peer {}: {e}",
3802                        peer.id
3803                    );
3804                    continue;
3805                }
3806            };
3807
3808            let sync_url = format!("{}/replication/sync", peer.peer_url.trim_end_matches('/'));
3809            match client.post(&sync_url).json(&entries).send().await {
3810                Ok(resp) if resp.status().is_success() => {
3811                    tracing::info!(
3812                        "replication: synced {} entries to peer {} (seq {}-{})",
3813                        entries.len(),
3814                        peer.id,
3815                        peer.last_sync_seq,
3816                        last_seq
3817                    );
3818                    let store = hub.storage.write().await;
3819                    if let Err(e) = store.update_peer_sync_seq(peer.id, last_seq) {
3820                        tracing::warn!("Failed to update peer sync sequence: {}", e);
3821                    }
3822                }
3823                Ok(resp) => {
3824                    tracing::warn!(
3825                        "replication: sync to peer {} returned {}",
3826                        peer.id,
3827                        resp.status()
3828                    );
3829                }
3830                Err(e) => {
3831                    tracing::warn!("replication: failed to sync to peer {}: {e}", peer.id);
3832                }
3833            }
3834        }
3835    }
3836}
3837
3838#[derive(serde::Deserialize)]
3839pub struct CreateWebhookRequest {
3840    pub url: String,
3841    pub events: Vec<String>,
3842    pub secret: Option<String>,
3843}
3844
3845pub async fn create_webhook_handler(
3846    State(hub): State<Arc<SutureHubServer>>,
3847    headers: HeaderMap,
3848    Path(repo_id): Path<String>,
3849    Json(req): Json<CreateWebhookRequest>,
3850) -> (StatusCode, Json<serde_json::Value>) {
3851    if let Err(status) = check_auth(&hub, &headers).await {
3852        return (
3853            status,
3854            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3855        );
3856    }
3857    if req.events.is_empty() {
3858        return (
3859            StatusCode::BAD_REQUEST,
3860            Json(serde_json::json!({"success": false, "error": "events must not be empty"})),
3861        );
3862    }
3863    let random_bytes: [u8; 16] = rand::random();
3864    let id = format!("wh_{}", hex::encode(random_bytes));
3865    let created_at = std::time::SystemTime::now()
3866        .duration_since(std::time::UNIX_EPOCH)
3867        .unwrap_or_default()
3868        .as_secs();
3869    let webhook = Webhook {
3870        id: id.clone(),
3871        repo_id: repo_id.clone(),
3872        url: req.url,
3873        events: req.events,
3874        secret: req.secret,
3875        created_at,
3876        active: true,
3877    };
3878    let store = hub.storage.write().await;
3879    match store.create_webhook(&webhook) {
3880        Ok(()) => (
3881            StatusCode::CREATED,
3882            Json(serde_json::json!({"success": true, "id": id})),
3883        ),
3884        Err(e) => (
3885            StatusCode::INTERNAL_SERVER_ERROR,
3886            Json(serde_json::json!({"success": false, "error": e.to_string()})),
3887        ),
3888    }
3889}
3890
3891pub async fn list_webhooks_handler(
3892    State(hub): State<Arc<SutureHubServer>>,
3893    headers: HeaderMap,
3894    Path(repo_id): Path<String>,
3895) -> (StatusCode, Json<serde_json::Value>) {
3896    if let Err(status) = check_auth(&hub, &headers).await {
3897        return (
3898            status,
3899            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3900        );
3901    }
3902    let store = hub.storage.read().await;
3903    match store.list_webhooks(&repo_id) {
3904        Ok(hooks) => (
3905            StatusCode::OK,
3906            Json(serde_json::json!({"success": true, "webhooks": hooks})),
3907        ),
3908        Err(e) => (
3909            StatusCode::INTERNAL_SERVER_ERROR,
3910            Json(serde_json::json!({"success": false, "error": e.to_string()})),
3911        ),
3912    }
3913}
3914
3915pub async fn delete_webhook_handler(
3916    State(hub): State<Arc<SutureHubServer>>,
3917    headers: HeaderMap,
3918    Path((_repo_id, id)): Path<(String, String)>,
3919) -> (StatusCode, Json<serde_json::Value>) {
3920    if let Err(status) = check_auth(&hub, &headers).await {
3921        return (
3922            status,
3923            Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3924        );
3925    }
3926    let store = hub.storage.write().await;
3927    match store.delete_webhook(&id) {
3928        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
3929        Err(e) => (
3930            StatusCode::NOT_FOUND,
3931            Json(serde_json::json!({"success": false, "error": e.to_string()})),
3932        ),
3933    }
3934}
3935
3936pub async fn health_check() -> Json<serde_json::Value> {
3937    Json(serde_json::json!({"status": "ok"}))
3938}
3939
3940// === SSO / OIDC Handlers ===
3941
3942/// List all configured OIDC providers.
3943pub async fn sso_list_providers_handler(
3944    State(hub): State<Arc<SutureHubServer>>,
3945    headers: HeaderMap,
3946) -> (StatusCode, Json<serde_json::Value>) {
3947    if let Err(status) = check_auth(&hub, &headers).await {
3948        return (status, Json(serde_json::json!({"error": "unauthorized"})));
3949    }
3950    let store = hub.storage.read().await;
3951    match store.list_oidc_configs() {
3952        Ok(configs) => {
3953            let names: Vec<&str> = configs.iter().map(|c| c.provider_name.as_str()).collect();
3954            (StatusCode::OK, Json(serde_json::json!({"providers": names})))
3955        }
3956        Err(e) => (
3957            StatusCode::INTERNAL_SERVER_ERROR,
3958            Json(serde_json::json!({"error": e.to_string()})),
3959        ),
3960    }
3961}
3962
3963/// Configure (create or update) an OIDC provider.
3964pub async fn sso_configure_provider_handler(
3965    State(hub): State<Arc<SutureHubServer>>,
3966    headers: HeaderMap,
3967    Json(config): Json<crate::sso::OidcConfig>,
3968) -> (StatusCode, Json<serde_json::Value>) {
3969    if let Err(status) = check_auth(&hub, &headers).await {
3970        return (status, Json(serde_json::json!({"error": "unauthorized"})));
3971    }
3972    // Validate required fields
3973    if config.provider_name.is_empty() {
3974        return (
3975            StatusCode::BAD_REQUEST,
3976            Json(serde_json::json!({"error": "provider_name is required"})),
3977        );
3978    }
3979    if config.issuer_url.is_empty() {
3980        return (
3981            StatusCode::BAD_REQUEST,
3982            Json(serde_json::json!({"error": "issuer_url is required"})),
3983        );
3984    }
3985    if config.client_id.is_empty() {
3986        return (
3987            StatusCode::BAD_REQUEST,
3988            Json(serde_json::json!({"error": "client_id is required"})),
3989        );
3990    }
3991    let store = hub.storage.read().await;
3992    match store.set_oidc_config(&config) {
3993        Ok(()) => (
3994            StatusCode::OK,
3995            Json(serde_json::json!({"success": true, "provider": config.provider_name})),
3996        ),
3997        Err(e) => (
3998            StatusCode::INTERNAL_SERVER_ERROR,
3999            Json(serde_json::json!({"error": e.to_string()})),
4000        ),
4001    }
4002}
4003
4004/// Delete an OIDC provider configuration.
4005pub async fn sso_delete_provider_handler(
4006    State(hub): State<Arc<SutureHubServer>>,
4007    headers: HeaderMap,
4008    Path(provider_name): Path<String>,
4009) -> (StatusCode, Json<serde_json::Value>) {
4010    if let Err(status) = check_auth(&hub, &headers).await {
4011        return (status, Json(serde_json::json!({"error": "unauthorized"})));
4012    }
4013    let store = hub.storage.read().await;
4014    match store.delete_oidc_config(&provider_name) {
4015        Ok(true) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
4016        Ok(false) => (
4017            StatusCode::NOT_FOUND,
4018            Json(serde_json::json!({"error": format!("provider '{provider_name}' not found")})),
4019        ),
4020        Err(e) => (
4021            StatusCode::INTERNAL_SERVER_ERROR,
4022            Json(serde_json::json!({"error": e.to_string()})),
4023        ),
4024    }
4025}
4026
4027/// Initiate an OIDC authorization flow — returns the redirect URL.
4028pub async fn sso_authorize_handler(
4029    State(hub): State<Arc<SutureHubServer>>,
4030    headers: HeaderMap,
4031    Json(body): Json<serde_json::Value>,
4032) -> (StatusCode, Json<serde_json::Value>) {
4033    if let Err(status) = check_auth(&hub, &headers).await {
4034        return (status, Json(serde_json::json!({"error": "unauthorized"})));
4035    }
4036    let provider_name = match body.get("provider") {
4037        Some(v) => match v.as_str() {
4038            Some(s) => s.to_owned(),
4039            None => {
4040                return (
4041                    StatusCode::BAD_REQUEST,
4042                    Json(serde_json::json!({"error": "provider must be a string"})),
4043                );
4044            }
4045        },
4046        None => {
4047            return (
4048                StatusCode::BAD_REQUEST,
4049                Json(serde_json::json!({"error": "provider is required"})),
4050            );
4051        }
4052    };
4053
4054    let store = hub.storage.read().await;
4055    let config = match store.get_oidc_config(&provider_name) {
4056        Ok(Some(c)) => c,
4057        Ok(None) => {
4058            return (
4059                StatusCode::NOT_FOUND,
4060                Json(serde_json::json!({"error": format!("provider '{provider_name}' not configured")})),
4061            );
4062        }
4063        Err(e) => {
4064            return (
4065                StatusCode::INTERNAL_SERVER_ERROR,
4066                Json(serde_json::json!({"error": e.to_string()})),
4067            );
4068        }
4069    };
4070    drop(store);
4071
4072    let state = crate::sso::generate_state();
4073    let nonce = crate::sso::generate_nonce();
4074
4075    // Store state and nonce for CSRF/replay validation during callback.
4076    let store = hub.storage.read().await;
4077    if let Err(e) = store.store_sso_state(&state, &provider_name, &nonce) {
4078        return (
4079            StatusCode::INTERNAL_SERVER_ERROR,
4080            Json(serde_json::json!({"error": format!("failed to store SSO state: {e}")})),
4081        );
4082    }
4083    drop(store);
4084
4085    let url = crate::sso::authorization_url(&config, &state, &nonce);
4086
4087    (StatusCode::OK, Json(serde_json::json!({
4088        "authorization_url": url,
4089        "state": state,
4090    })))
4091}
4092
4093/// Handle an OIDC callback — exchange the authorization code for tokens,
4094/// validate the ID token, and create/update a local user session.
4095pub async fn sso_callback_handler(
4096    State(hub): State<Arc<SutureHubServer>>,
4097    headers: HeaderMap,
4098    Json(body): Json<serde_json::Value>,
4099) -> (StatusCode, Json<serde_json::Value>) {
4100    if let Err(status) = check_auth(&hub, &headers).await {
4101        return (status, Json(serde_json::json!({"error": "unauthorized"})));
4102    }
4103    let provider_name = body.get("provider").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4104    let code = body.get("code").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4105    let state = body.get("state").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4106
4107    if provider_name.is_empty() {
4108        return (
4109            StatusCode::BAD_REQUEST,
4110            Json(serde_json::json!({"error": "provider is required"})),
4111        );
4112    }
4113    if code.is_empty() {
4114        return (
4115            StatusCode::BAD_REQUEST,
4116            Json(serde_json::json!({"error": "code is required"})),
4117        );
4118    }
4119    if state.is_empty() {
4120        return (
4121            StatusCode::BAD_REQUEST,
4122            Json(serde_json::json!({"error": "state is required"})),
4123        );
4124    }
4125
4126    // Validate state (CSRF protection) and get the nonce.
4127    let store = hub.storage.read().await;
4128    let (stored_provider, nonce) = match store.consume_sso_state(&state) {
4129        Ok(Some((p, n))) => (p, n),
4130        Ok(None) => {
4131            return (
4132                StatusCode::BAD_REQUEST,
4133                Json(serde_json::json!({"error": "invalid or expired state parameter"})),
4134            );
4135        }
4136        Err(e) => {
4137            return (
4138                StatusCode::INTERNAL_SERVER_ERROR,
4139                Json(serde_json::json!({"error": e.to_string()})),
4140            );
4141        }
4142    };
4143
4144    if stored_provider != provider_name {
4145        return (
4146            StatusCode::BAD_REQUEST,
4147            Json(serde_json::json!({"error": "state/provider mismatch"})),
4148        );
4149    }
4150    drop(store);
4151
4152    // Get the OIDC provider config.
4153    let store = hub.storage.read().await;
4154    let config = match store.get_oidc_config(&provider_name) {
4155        Ok(Some(c)) => c,
4156        Ok(None) => {
4157            return (
4158                StatusCode::NOT_FOUND,
4159                Json(serde_json::json!({"error": format!("provider '{provider_name}' not configured")})),
4160            );
4161        }
4162        Err(e) => {
4163            return (
4164                StatusCode::INTERNAL_SERVER_ERROR,
4165                Json(serde_json::json!({"error": e.to_string()})),
4166            );
4167        }
4168    };
4169    drop(store);
4170
4171    // Complete the OIDC callback flow: discover, exchange code, validate token.
4172    let result = match crate::sso::complete_callback(&config, &code, &state, &nonce).await {
4173        Ok(r) => r,
4174        Err(e) => {
4175            tracing::warn!("SSO callback failed for provider '{provider_name}': {e}");
4176            return (
4177                StatusCode::BAD_GATEWAY,
4178                Json(serde_json::json!({"error": format!("SSO authentication failed: {e}")})),
4179            );
4180        }
4181    };
4182
4183    // Create or update the local user from SSO identity.
4184    let user_sub = result.user.sub.clone();
4185    let user_email = result.user.email.clone();
4186    let user_name = result.user.name.clone();
4187    let username = user_email
4188        .as_deref()
4189        .unwrap_or("")
4190        .to_owned();
4191    let username = if username.is_empty() {
4192        format!("{}:{}", provider_name, user_sub)
4193    } else {
4194        username
4195    };
4196    let display_name = user_name
4197        .as_deref()
4198        .unwrap_or(&username)
4199        .to_owned();
4200
4201    let store = hub.storage.read().await;
4202    match store.upsert_sso_user(
4203        &provider_name,
4204        &user_sub,
4205        &username,
4206        &display_name,
4207    ) {
4208        Ok(created_username) => {
4209            // Set the session token as the user's API token.
4210            let token_hash =
4211                format!("{:x}", sha2::Sha256::digest(result.session_token.as_bytes()));
4212            if let Err(e) = store.update_user_token(&created_username, &token_hash) {
4213                tracing::warn!("failed to set SSO session token: {e}");
4214            }
4215            drop(store);
4216
4217            (StatusCode::OK, Json(serde_json::json!({
4218                "status": "authenticated",
4219                "username": created_username,
4220                "display_name": display_name,
4221                "provider": provider_name,
4222                "session_token": result.session_token,
4223                "email": user_email,
4224            })))
4225        }
4226        Err(e) => {
4227            tracing::warn!("failed to upsert SSO user: {e}");
4228            (
4229                StatusCode::INTERNAL_SERVER_ERROR,
4230                Json(serde_json::json!({"error": format!("failed to create user: {e}")})),
4231            )
4232        }
4233    }
4234}
4235
4236pub async fn audit_log_handler(
4237    State(hub): State<Arc<SutureHubServer>>,
4238    headers: HeaderMap,
4239    Query(params): Query<AuditQueryParams>,
4240) -> (StatusCode, Json<serde_json::Value>) {
4241    if let Err(status) = check_auth(&hub, &headers).await {
4242        return (status, Json(serde_json::json!({"error": "unauthorized"})));
4243    }
4244    let store = hub.storage.read().await;
4245    let entries = store
4246        .query_audit_log(
4247            params.actor.as_deref(),
4248            params.action.as_deref(),
4249            params.limit.unwrap_or(100),
4250            params.offset.unwrap_or(0),
4251        )
4252        .unwrap_or_default();
4253    (
4254        StatusCode::OK,
4255        Json(serde_json::json!({
4256            "entries": entries,
4257            "count": entries.len(),
4258        })),
4259    )
4260}
4261
4262/// Raft cluster status endpoint.
4263pub async fn raft_status_handler(
4264    State(hub): State<Arc<SutureHubServer>>,
4265    headers: HeaderMap,
4266) -> (StatusCode, Json<serde_json::Value>) {
4267    if let Err(status) = check_auth(&hub, &headers).await {
4268        return (status, Json(serde_json::json!({"error": "unauthorized"})));
4269    }
4270
4271    #[cfg(feature = "raft-cluster")]
4272    {
4273        let state = hub.raft_state().await;
4274        let leader = hub.raft_leader().await;
4275        let term = {
4276            let raft = hub.raft_node.lock().await;
4277            raft.term()
4278        };
4279        let node_id = hub.raft_node_id;
4280        let is_leader = hub.is_leader().await;
4281
4282        let response = serde_json::json!({
4283            "raft_enabled": true,
4284            "node_id": node_id,
4285            "state": state,
4286            "term": term,
4287            "leader": leader,
4288            "is_leader": is_leader,
4289        });
4290        (StatusCode::OK, Json(response))
4291    }
4292
4293    #[cfg(not(feature = "raft-cluster"))]
4294    {
4295        let response = serde_json::json!({
4296            "raft_enabled": false,
4297            "state": "standalone",
4298            "is_leader": true,
4299        });
4300        (StatusCode::OK, Json(response))
4301    }
4302}
4303
4304pub async fn run_server(
4305    hub: SutureHubServer,
4306    addr: &str,
4307) -> Result<(), Box<dyn std::error::Error>> {
4308    let hub = Arc::new(hub);
4309
4310    {
4311        let hub_clone = Arc::clone(&hub);
4312        tokio::spawn(async move {
4313            replication_background_task(hub_clone).await;
4314        });
4315    }
4316
4317    let (set_request_id, propagate_request_id) = request_id_layer();
4318    let app = axum::Router::new()
4319        .route("/healthz", get(health_check))
4320        .route("/", axum::routing::get(serve_index))
4321        .route("/push", axum::routing::post(push_handler))
4322        .route(
4323            "/push/compressed",
4324            axum::routing::post(push_compressed_handler),
4325        )
4326        .route("/pull", axum::routing::post(pull_handler))
4327        .route(
4328            "/pull/compressed",
4329            axum::routing::post(pull_compressed_handler),
4330        )
4331        .route("/repos", axum::routing::get(list_repos_handler))
4332        .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4333        .route(
4334            "/repos/{repo_id}/branches",
4335            axum::routing::get(repo_branches_handler),
4336        )
4337        .route(
4338            "/repos/{repo_id}/patches",
4339            axum::routing::get(repo_patches_handler),
4340        )
4341        .route("/handshake", axum::routing::get(handshake_get_handler))
4342        .route("/handshake", axum::routing::post(handshake_handler))
4343        .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4344        .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4345        .route("/v2/pull", axum::routing::post(v2_pull_handler))
4346        .route("/v2/push", axum::routing::post(v2_push_handler))
4347        .route("/auth/token", axum::routing::post(create_token_handler))
4348        .route("/auth/verify", axum::routing::post(verify_token_handler))
4349        .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4350        .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4351        .route(
4352            "/mirror/status",
4353            axum::routing::get(mirror_status_get_handler),
4354        )
4355        .route("/mirror/status", axum::routing::post(mirror_status_handler))
4356        .route(
4357            "/repos/{repo_id}/protect/{branch}",
4358            axum::routing::post(protect_branch_handler),
4359        )
4360        .route(
4361            "/repos/{repo_id}/unprotect/{branch}",
4362            axum::routing::post(unprotect_branch_handler),
4363        )
4364        .route("/auth/register", axum::routing::post(register_handler))
4365        .route("/users", axum::routing::get(list_users_handler))
4366        .route("/users/{username}", axum::routing::get(get_user_handler))
4367        .route(
4368            "/users/{username}/role",
4369            axum::routing::patch(update_role_handler),
4370        )
4371        .route(
4372            "/users/{username}",
4373            axum::routing::delete(delete_user_handler),
4374        )
4375        .route("/static/{*path}", axum::routing::get(serve_static_file))
4376        .route("/replication/peers", axum::routing::post(add_peer_handler))
4377        .route("/replication/peers", axum::routing::get(list_peers_handler))
4378        .route(
4379            "/replication/peers/{id}",
4380            axum::routing::delete(remove_peer_handler),
4381        )
4382        .route(
4383            "/replication/status",
4384            axum::routing::get(replication_status_handler),
4385        )
4386        .route(
4387            "/replication/sync",
4388            axum::routing::post(replication_sync_handler),
4389        )
4390        .route("/repos", axum::routing::post(create_repo_handler))
4391        .route(
4392            "/repos/{repo_id}",
4393            axum::routing::delete(delete_repo_handler),
4394        )
4395        .route(
4396            "/repos/{repo_id}/branches",
4397            axum::routing::post(create_branch_handler),
4398        )
4399        .route(
4400            "/repos/{repo_id}/branches/{branch}",
4401            axum::routing::delete(delete_branch_handler),
4402        )
4403        .route(
4404            "/repos/{repo_id}/blobs/{hash}",
4405            axum::routing::get(get_blob_handler),
4406        )
4407        .route(
4408            "/repos/{repo_id}/tree/{branch}",
4409            axum::routing::get(repo_tree_handler),
4410        )
4411        .route("/auth/login", axum::routing::post(login_handler))
4412        .route("/search", axum::routing::get(search_handler))
4413        .route("/activity", axum::routing::get(activity_handler))
4414        .route(
4415            "/mirrors/{id}",
4416            axum::routing::delete(delete_mirror_handler),
4417        )
4418        .route(
4419            "/webhooks/{repo_id}",
4420            axum::routing::post(create_webhook_handler),
4421        )
4422        .route(
4423            "/webhooks/{repo_id}",
4424            axum::routing::get(list_webhooks_handler),
4425        )
4426        .route(
4427            "/webhooks/{repo_id}/{id}",
4428            axum::routing::delete(delete_webhook_handler),
4429        )
4430        .route(
4431            "/repos/{repo_id}/patches/batch",
4432            axum::routing::post(batch_push_handler),
4433        )
4434        .route("/lfs/batch", axum::routing::post(lfs_batch_handler))
4435        .route(
4436            "/lfs/objects/{repo_id}/{oid}",
4437            axum::routing::put(lfs_upload_handler),
4438        )
4439        .route(
4440            "/lfs/objects/{repo_id}/{oid}",
4441            axum::routing::get(lfs_download_handler),
4442        )
4443        // SSO / OIDC routes
4444        .route("/sso/providers", axum::routing::get(sso_list_providers_handler))
4445        .route(
4446            "/sso/providers",
4447            axum::routing::post(sso_configure_provider_handler),
4448        )
4449        .route(
4450            "/sso/providers/{provider_name}",
4451            axum::routing::delete(sso_delete_provider_handler),
4452        )
4453        .route("/sso/authorize", axum::routing::post(sso_authorize_handler))
4454        .route("/sso/callback", axum::routing::post(sso_callback_handler))
4455        .route("/audit/log", axum::routing::get(audit_log_handler))
4456        // Raft cluster endpoints (only available with raft-cluster feature)
4457        .route("/raft/status", axum::routing::get(raft_status_handler))
4458        .layer(axum::middleware::from_fn_with_state(
4459            Arc::clone(&hub),
4460            crate::audit::audit_middleware,
4461        ))
4462        .with_state(Arc::clone(&hub))
4463        .layer(set_request_id)
4464        .layer(propagate_request_id);
4465
4466    let listener = tokio::net::TcpListener::bind(addr).await?;
4467    tracing::info!("Suture Hub listening on {addr}");
4468
4469    let shutdown_tx = tokio::sync::broadcast::channel::<()>(1).0;
4470    let shutdown_tx_ctrlc = shutdown_tx.clone();
4471
4472    tokio::spawn(async move {
4473        tokio::signal::ctrl_c().await.ok();
4474        tracing::info!("received ctrl-c, initiating graceful shutdown");
4475        let _ = shutdown_tx_ctrlc.send(());
4476    });
4477
4478    // Spawn Raft tick loop when raft-cluster feature is enabled
4479    #[cfg(feature = "raft-cluster")]
4480    {
4481        let raft_node = Arc::clone(&hub.raft_node);
4482        let hub_for_raft = Arc::clone(&hub);
4483        tokio::spawn(async move {
4484            let mut interval = tokio::time::interval(
4485                std::time::Duration::from_millis(10), // tick every 10ms
4486            );
4487            loop {
4488                interval.tick().await;
4489                let messages = {
4490                    let mut raft = raft_node.lock().await;
4491                    raft.tick()
4492                };
4493                // In a real multi-node deployment, these messages would be sent
4494                // to peers via HTTP. For now, we log state transitions.
4495                if !messages.is_empty() {
4496                    tracing::trace!(
4497                        "[raft] tick produced {} messages",
4498                        messages.len()
4499                    );
4500                }
4501                // Apply committed entries to replication log
4502                let entries = hub_for_raft.raft_committed_entries().await;
4503                if !entries.is_empty() {
4504                    let count = entries.len();
4505                    let store = hub_for_raft.storage.read().await;
4506                    for entry in &entries {
4507                        // Deserialize the command: expected format is JSON
4508                        // {"operation": "insert|update|delete", "table": "...", "data": "..."}
4509                        if let Ok(cmd) = serde_json::from_slice::<serde_json::Value>(&entry.command) {
4510                            let op = cmd.get("operation").and_then(|v| v.as_str()).unwrap_or("unknown");
4511                            let table = cmd.get("table").and_then(|v| v.as_str()).unwrap_or("");
4512                            let data = cmd.get("data").and_then(|v| v.as_str()).unwrap_or("");
4513                            let _ = store.log_operation(op, table, &entry.index.to_string(), Some(data));
4514                        }
4515                    }
4516                    drop(store);
4517                    hub_for_raft.raft_advance_applied(count).await;
4518                }
4519            }
4520        });
4521    }
4522
4523    let server = axum::serve(
4524        listener,
4525        app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4526    )
4527    .with_graceful_shutdown(async move {
4528        let mut rx = shutdown_tx.subscribe();
4529        let _ = rx.recv().await;
4530        hub.shutdown();
4531    });
4532
4533    server.await?;
4534    Ok(())
4535}
4536
4537#[cfg(test)]
4538mod tests {
4539    use super::*;
4540    use sha2::Digest;
4541    async fn create_test_user_hub(
4542        hub: &SutureHubServer,
4543        username: &str,
4544        display_name: &str,
4545        role: &str,
4546    ) -> String {
4547        let api_token = generate_api_token();
4548        let store = hub.storage.write().await;
4549        store
4550            .create_user(username, display_name, role, &api_token)
4551            .unwrap();
4552        api_token
4553    }
4554
4555    fn make_auth_header_val(token: &str) -> String {
4556        format!("Bearer {}", token)
4557    }
4558
4559    fn make_hash_proto(hex: &str) -> HashProto {
4560        HashProto {
4561            value: hex.to_string(),
4562        }
4563    }
4564
4565    fn make_patch(hex: &str, op: &str, parents: &[String], author: &str) -> PatchProto {
4566        PatchProto {
4567            id: make_hash_proto(hex),
4568            operation_type: op.to_string(),
4569            touch_set: vec!["f".to_string()],
4570            target_path: Some("f".to_string()),
4571            payload: String::new(),
4572            parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
4573            author: author.to_string(),
4574            message: format!("patch {}", hex),
4575            timestamp: 0,
4576        }
4577    }
4578
4579    fn make_branch(name: &str, target: &str) -> BranchProto {
4580        BranchProto {
4581            name: name.to_string(),
4582            target_id: make_hash_proto(target),
4583        }
4584    }
4585
4586    async fn start_test_hub() -> (Arc<SutureHubServer>, u16, String) {
4587        let hub = Arc::new(SutureHubServer::new_in_memory());
4588
4589        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4590        let port = listener.local_addr().unwrap().port();
4591        let base = format!("http://127.0.0.1:{}", port);
4592
4593        let app = axum::Router::new()
4594            .route("/", axum::routing::get(serve_index))
4595            .route("/push", axum::routing::post(push_handler))
4596            .route(
4597                "/push/compressed",
4598                axum::routing::post(push_compressed_handler),
4599            )
4600            .route("/pull", axum::routing::post(pull_handler))
4601            .route(
4602                "/pull/compressed",
4603                axum::routing::post(pull_compressed_handler),
4604            )
4605            .route("/repos", axum::routing::get(list_repos_handler))
4606            .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4607            .route(
4608                "/repos/{repo_id}/branches",
4609                axum::routing::get(repo_branches_handler),
4610            )
4611            .route(
4612                "/repos/{repo_id}/patches",
4613                axum::routing::get(repo_patches_handler),
4614            )
4615            .route("/handshake", axum::routing::get(handshake_get_handler))
4616            .route("/handshake", axum::routing::post(handshake_handler))
4617            .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4618            .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4619            .route("/v2/pull", axum::routing::post(v2_pull_handler))
4620            .route("/v2/push", axum::routing::post(v2_push_handler))
4621            .route("/auth/token", axum::routing::post(create_token_handler))
4622            .route("/auth/verify", axum::routing::post(verify_token_handler))
4623            .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4624            .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4625            .route(
4626                "/mirror/status",
4627                axum::routing::get(mirror_status_get_handler),
4628            )
4629            .route("/mirror/status", axum::routing::post(mirror_status_handler))
4630            .route(
4631                "/repos/{repo_id}/protect/{branch}",
4632                axum::routing::post(protect_branch_handler),
4633            )
4634            .route(
4635                "/repos/{repo_id}/unprotect/{branch}",
4636                axum::routing::post(unprotect_branch_handler),
4637            )
4638            .route("/auth/register", axum::routing::post(register_handler))
4639            .route("/users", axum::routing::get(list_users_handler))
4640            .route("/users/{username}", axum::routing::get(get_user_handler))
4641            .route(
4642                "/users/{username}/role",
4643                axum::routing::patch(update_role_handler),
4644            )
4645            .route(
4646                "/users/{username}",
4647                axum::routing::delete(delete_user_handler),
4648            )
4649            .route("/static/{*path}", axum::routing::get(serve_static_file))
4650            .route("/replication/peers", axum::routing::post(add_peer_handler))
4651            .route("/replication/peers", axum::routing::get(list_peers_handler))
4652            .route(
4653                "/replication/peers/{id}",
4654                axum::routing::delete(remove_peer_handler),
4655            )
4656            .route(
4657                "/replication/status",
4658                axum::routing::get(replication_status_handler),
4659            )
4660            .route(
4661                "/replication/sync",
4662                axum::routing::post(replication_sync_handler),
4663            )
4664            .route("/repos", axum::routing::post(create_repo_handler))
4665            .route(
4666                "/repos/{repo_id}",
4667                axum::routing::delete(delete_repo_handler),
4668            )
4669            .route(
4670                "/repos/{repo_id}/branches",
4671                axum::routing::post(create_branch_handler),
4672            )
4673            .route(
4674                "/repos/{repo_id}/branches/{branch}",
4675                axum::routing::delete(delete_branch_handler),
4676            )
4677            .route(
4678                "/repos/{repo_id}/blobs/{hash}",
4679                axum::routing::get(get_blob_handler),
4680            )
4681            .route(
4682                "/repos/{repo_id}/tree/{branch}",
4683                axum::routing::get(repo_tree_handler),
4684            )
4685            .route("/auth/login", axum::routing::post(login_handler))
4686            .route("/search", axum::routing::get(search_handler))
4687            .route("/activity", axum::routing::get(activity_handler))
4688            .route(
4689                "/mirrors/{id}",
4690                axum::routing::delete(delete_mirror_handler),
4691            )
4692            .route(
4693                "/webhooks/{repo_id}",
4694                axum::routing::post(create_webhook_handler),
4695            )
4696            .route(
4697                "/webhooks/{repo_id}",
4698                axum::routing::get(list_webhooks_handler),
4699            )
4700            .route(
4701                "/webhooks/{repo_id}/{id}",
4702                axum::routing::delete(delete_webhook_handler),
4703            )
4704            .route(
4705                "/repos/{repo_id}/patches/batch",
4706                axum::routing::post(batch_push_handler),
4707            )
4708            .with_state(Arc::clone(&hub));
4709
4710        tokio::spawn(async move {
4711            axum::serve(
4712                listener,
4713                app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4714            )
4715            .await
4716            .unwrap();
4717        });
4718
4719        for _ in 0..50 {
4720            if reqwest::Client::new()
4721                .get(format!("{}/repos", &base))
4722                .send()
4723                .await
4724                .is_ok()
4725            {
4726                return (hub, port, base);
4727            }
4728            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4729        }
4730        panic!("test server did not start in time");
4731    }
4732
4733    async fn start_test_hub_with_lfs(
4734        hub: Arc<SutureHubServer>,
4735    ) -> (Arc<SutureHubServer>, u16, String) {
4736        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4737        let port = listener.local_addr().unwrap().port();
4738        let base = format!("http://127.0.0.1:{}", port);
4739
4740        let app = axum::Router::new()
4741            .route("/", axum::routing::get(serve_index))
4742            .route("/push", axum::routing::post(push_handler))
4743            .route(
4744                "/push/compressed",
4745                axum::routing::post(push_compressed_handler),
4746            )
4747            .route("/pull", axum::routing::post(pull_handler))
4748            .route(
4749                "/pull/compressed",
4750                axum::routing::post(pull_compressed_handler),
4751            )
4752            .route("/repos", axum::routing::get(list_repos_handler))
4753            .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4754            .route(
4755                "/repos/{repo_id}/branches",
4756                axum::routing::get(repo_branches_handler),
4757            )
4758            .route(
4759                "/repos/{repo_id}/patches",
4760                axum::routing::get(repo_patches_handler),
4761            )
4762            .route("/handshake", axum::routing::get(handshake_get_handler))
4763            .route("/handshake", axum::routing::post(handshake_handler))
4764            .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4765            .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4766            .route("/v2/pull", axum::routing::post(v2_pull_handler))
4767            .route("/v2/push", axum::routing::post(v2_push_handler))
4768            .route("/auth/token", axum::routing::post(create_token_handler))
4769            .route("/auth/verify", axum::routing::post(verify_token_handler))
4770            .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4771            .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4772            .route(
4773                "/mirror/status",
4774                axum::routing::get(mirror_status_get_handler),
4775            )
4776            .route("/mirror/status", axum::routing::post(mirror_status_handler))
4777            .route(
4778                "/repos/{repo_id}/protect/{branch}",
4779                axum::routing::post(protect_branch_handler),
4780            )
4781            .route(
4782                "/repos/{repo_id}/unprotect/{branch}",
4783                axum::routing::post(unprotect_branch_handler),
4784            )
4785            .route("/auth/register", axum::routing::post(register_handler))
4786            .route("/users", axum::routing::get(list_users_handler))
4787            .route("/users/{username}", axum::routing::get(get_user_handler))
4788            .route(
4789                "/users/{username}/role",
4790                axum::routing::patch(update_role_handler),
4791            )
4792            .route(
4793                "/users/{username}",
4794                axum::routing::delete(delete_user_handler),
4795            )
4796            .route("/static/{*path}", axum::routing::get(serve_static_file))
4797            .route("/replication/peers", axum::routing::post(add_peer_handler))
4798            .route("/replication/peers", axum::routing::get(list_peers_handler))
4799            .route(
4800                "/replication/peers/{id}",
4801                axum::routing::delete(remove_peer_handler),
4802            )
4803            .route(
4804                "/replication/status",
4805                axum::routing::get(replication_status_handler),
4806            )
4807            .route(
4808                "/replication/sync",
4809                axum::routing::post(replication_sync_handler),
4810            )
4811            .route("/repos", axum::routing::post(create_repo_handler))
4812            .route(
4813                "/repos/{repo_id}",
4814                axum::routing::delete(delete_repo_handler),
4815            )
4816            .route(
4817                "/repos/{repo_id}/branches",
4818                axum::routing::post(create_branch_handler),
4819            )
4820            .route(
4821                "/repos/{repo_id}/branches/{branch}",
4822                axum::routing::delete(delete_branch_handler),
4823            )
4824            .route(
4825                "/repos/{repo_id}/blobs/{hash}",
4826                axum::routing::get(get_blob_handler),
4827            )
4828            .route(
4829                "/repos/{repo_id}/tree/{branch}",
4830                axum::routing::get(repo_tree_handler),
4831            )
4832            .route("/auth/login", axum::routing::post(login_handler))
4833            .route("/search", axum::routing::get(search_handler))
4834            .route("/activity", axum::routing::get(activity_handler))
4835            .route(
4836                "/mirrors/{id}",
4837                axum::routing::delete(delete_mirror_handler),
4838            )
4839            .route(
4840                "/webhooks/{repo_id}",
4841                axum::routing::post(create_webhook_handler),
4842            )
4843            .route(
4844                "/webhooks/{repo_id}",
4845                axum::routing::get(list_webhooks_handler),
4846            )
4847            .route(
4848                "/webhooks/{repo_id}/{id}",
4849                axum::routing::delete(delete_webhook_handler),
4850            )
4851            .route(
4852                "/repos/{repo_id}/patches/batch",
4853                axum::routing::post(batch_push_handler),
4854            )
4855            .route("/lfs/batch", axum::routing::post(lfs_batch_handler))
4856            .route(
4857                "/lfs/objects/{repo_id}/{oid}",
4858                axum::routing::put(lfs_upload_handler),
4859            )
4860            .route(
4861                "/lfs/objects/{repo_id}/{oid}",
4862                axum::routing::get(lfs_download_handler),
4863            )
4864            .with_state(Arc::clone(&hub));
4865
4866        tokio::spawn(async move {
4867            axum::serve(
4868                listener,
4869                app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4870            )
4871            .await
4872            .unwrap();
4873        });
4874
4875        for _ in 0..50 {
4876            if reqwest::Client::new()
4877                .get(format!("{}/repos", &base))
4878                .send()
4879                .await
4880                .is_ok()
4881            {
4882                return (hub, port, base);
4883            }
4884            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4885        }
4886        panic!("test server did not start in time");
4887    }
4888
4889    async fn start_test_hub_auth() -> (Arc<SutureHubServer>, u16, String) {
4890        let (hub, port, base) = start_test_hub().await;
4891        // Pre-create an admin user for auth tests
4892        create_test_user_hub(&hub, "test-admin", "Test Admin", "admin").await;
4893        (hub, port, base)
4894    }
4895
4896    fn post_json(
4897        client: &reqwest::Client,
4898        url: &str,
4899        body: &serde_json::Value,
4900    ) -> reqwest::RequestBuilder {
4901        client
4902            .post(url)
4903            .header("Content-Type", "application/json")
4904            .body(body.to_string())
4905    }
4906
4907    fn patch_json(
4908        client: &reqwest::Client,
4909        url: &str,
4910        body: &serde_json::Value,
4911    ) -> reqwest::RequestBuilder {
4912        client
4913            .patch(url)
4914            .header("Content-Type", "application/json")
4915            .body(body.to_string())
4916    }
4917
4918    #[tokio::test]
4919    async fn test_http_index() {
4920        let (_hub, _port, base) = start_test_hub().await;
4921        let client = reqwest::Client::new();
4922        let resp = client.get(format!("{}/", &base)).send().await.unwrap();
4923        assert_eq!(resp.status(), 200);
4924        let body = resp.text().await.unwrap();
4925        assert!(body.contains("Suture Hub"));
4926    }
4927
4928    #[tokio::test]
4929    async fn test_http_handshake() {
4930        let (_hub, _port, base) = start_test_hub().await;
4931        let client = reqwest::Client::new();
4932        let resp = client
4933            .post(format!("{}/handshake", &base))
4934            .json(&serde_json::json!({"client_version": 1, "client_name": "test"}))
4935            .send()
4936            .await
4937            .unwrap();
4938        assert_eq!(resp.status(), 200);
4939        let data: serde_json::Value = resp.json().await.unwrap();
4940        assert_eq!(data["server_version"], 1);
4941        assert_eq!(data["compatible"], true);
4942    }
4943
4944    #[tokio::test]
4945    async fn test_http_repos_empty_and_populated() {
4946        let (hub, _port, base) = start_test_hub().await;
4947        let client = reqwest::Client::new();
4948
4949        let resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
4950        assert_eq!(resp.status(), 200);
4951        let data: serde_json::Value = resp.json().await.unwrap();
4952        assert_eq!(data["repo_ids"].as_array().unwrap().len(), 0);
4953
4954        let a_hex = "a".repeat(64);
4955        let push_body = serde_json::json!({
4956            "repo_id": "http-repo",
4957            "patches": [{
4958                "id": {"value": &a_hex},
4959                "operation_type": "Create",
4960                "touch_set": ["f"],
4961                "target_path": "f",
4962                "payload": "",
4963                "parent_ids": [],
4964                "author": "alice",
4965                "message": "p",
4966                "timestamp": 0
4967            }],
4968            "branches": [],
4969            "blobs": []
4970        });
4971        let push_resp = client
4972            .post(format!("{}/push", &base))
4973            .json(&push_body)
4974            .send()
4975            .await
4976            .unwrap();
4977        assert_eq!(push_resp.status(), 200);
4978
4979        let resp2 = client.get(format!("{}/repos", &base)).send().await.unwrap();
4980        let data2: serde_json::Value = resp2.json().await.unwrap();
4981        assert_eq!(data2["repo_ids"].as_array().unwrap().len(), 1);
4982
4983        drop(hub);
4984    }
4985
4986    #[tokio::test]
4987    async fn test_http_repo_info() {
4988        let (hub, _port, base) = start_test_hub().await;
4989        let client = reqwest::Client::new();
4990
4991        let resp = client
4992            .get(format!("{}/repo/nonexistent", &base))
4993            .send()
4994            .await
4995            .unwrap();
4996        assert_eq!(resp.status(), 404);
4997
4998        let a_hex = "a".repeat(64);
4999        hub.handle_push(PushRequest {
5000            repo_id: "info-repo".to_string(),
5001            patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5002            branches: vec![make_branch("main", &a_hex)],
5003            blobs: vec![],
5004            signature: None,
5005            known_branches: None,
5006            force: false,
5007        })
5008        .await
5009        .unwrap();
5010
5011        let resp2 = client
5012            .get(format!("{}/repo/info-repo", &base))
5013            .send()
5014            .await
5015            .unwrap();
5016        assert_eq!(resp2.status(), 200);
5017        let data: serde_json::Value = resp2.json().await.unwrap();
5018        assert_eq!(data["patch_count"], 1);
5019        assert_eq!(data["branches"].as_array().unwrap().len(), 1);
5020    }
5021
5022    #[tokio::test]
5023    async fn test_http_repo_branches() {
5024        let (hub, _port, base) = start_test_hub().await;
5025        let client = reqwest::Client::new();
5026        let a_hex = "a".repeat(64);
5027
5028        hub.handle_push(PushRequest {
5029            repo_id: "branch-repo".to_string(),
5030            patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5031            branches: vec![make_branch("main", &a_hex), make_branch("dev", &a_hex)],
5032            blobs: vec![],
5033            signature: None,
5034            known_branches: None,
5035            force: false,
5036        })
5037        .await
5038        .unwrap();
5039
5040        let resp = client
5041            .get(format!("{}/repos/branch-repo/branches", &base))
5042            .send()
5043            .await
5044            .unwrap();
5045        assert_eq!(resp.status(), 200);
5046        let data: serde_json::Value = resp.json().await.unwrap();
5047        assert_eq!(data.as_array().unwrap().len(), 2);
5048    }
5049
5050    #[tokio::test]
5051    async fn test_http_repo_patches() {
5052        let (hub, _port, base) = start_test_hub().await;
5053        let client = reqwest::Client::new();
5054
5055        for i in 0..3u32 {
5056            let hex = format!("{:064x}", i);
5057            let parents: Vec<String> = if i > 0 {
5058                vec![format!("{:064x}", i - 1)]
5059            } else {
5060                vec![]
5061            };
5062            hub.handle_push(PushRequest {
5063                repo_id: "patch-repo".to_string(),
5064                patches: vec![PatchProto {
5065                    id: make_hash_proto(&hex),
5066                    operation_type: "Create".to_string(),
5067                    touch_set: vec![format!("f{i}")],
5068                    target_path: Some(format!("f{i}")),
5069                    payload: String::new(),
5070                    parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
5071                    author: "alice".to_string(),
5072                    message: format!("p{i}"),
5073                    timestamp: 0,
5074                }],
5075                branches: vec![],
5076                blobs: vec![],
5077                signature: None,
5078                known_branches: None,
5079                force: false,
5080            })
5081            .await
5082            .unwrap();
5083        }
5084
5085        let resp = client
5086            .get(format!(
5087                "{}/repos/patch-repo/patches?offset=1&limit=1",
5088                &base
5089            ))
5090            .send()
5091            .await
5092            .unwrap();
5093        assert_eq!(resp.status(), 200);
5094        let data: serde_json::Value = resp.json().await.unwrap();
5095        assert_eq!(data["patches"].as_array().unwrap().len(), 1);
5096        assert!(!data["next_cursor"].as_str().unwrap().is_empty());
5097
5098        let resp2 = client
5099            .get(format!(
5100                "{}/repos/patch-repo/patches?limit=1&cursor={}",
5101                &base,
5102                data["next_cursor"].as_str().unwrap()
5103            ))
5104            .send()
5105            .await
5106            .unwrap();
5107        assert_eq!(resp2.status(), 200);
5108        let data2: serde_json::Value = resp2.json().await.unwrap();
5109        assert_eq!(data2["patches"].as_array().unwrap().len(), 1);
5110
5111        let resp3 = client
5112            .get(format!("{}/repos/patch-repo/patches?limit=50", &base,))
5113            .send()
5114            .await
5115            .unwrap();
5116        assert_eq!(resp3.status(), 200);
5117        let data3: serde_json::Value = resp3.json().await.unwrap();
5118        assert_eq!(data3["patches"].as_array().unwrap().len(), 3);
5119        assert!(data3["next_cursor"].as_str().unwrap().is_empty());
5120    }
5121
5122    #[tokio::test]
5123    async fn test_http_push_pull_roundtrip() {
5124        let (_hub, _port, base) = start_test_hub().await;
5125        let client = reqwest::Client::new();
5126        let a_hex = "a".repeat(64);
5127        let b_hex = "b".repeat(64);
5128
5129        let push_body = serde_json::json!({
5130            "repo_id": "roundtrip-repo",
5131            "patches": [
5132                {
5133                    "id": {"value": &a_hex},
5134                    "operation_type": "Create",
5135                    "touch_set": ["file_a"],
5136                    "target_path": "file_a",
5137                    "payload": "",
5138                    "parent_ids": [],
5139                    "author": "alice",
5140                    "message": "first patch",
5141                    "timestamp": 100
5142                },
5143                {
5144                    "id": {"value": &b_hex},
5145                    "operation_type": "Modify",
5146                    "touch_set": ["file_a"],
5147                    "target_path": "file_a",
5148                    "payload": "",
5149                    "parent_ids": [{"value": &a_hex}],
5150                    "author": "bob",
5151                    "message": "second patch",
5152                    "timestamp": 200
5153                }
5154            ],
5155            "branches": [{"name": "main", "target_id": {"value": &b_hex}}],
5156            "blobs": []
5157        });
5158
5159        let push_resp = client
5160            .post(format!("{}/push", &base))
5161            .json(&push_body)
5162            .send()
5163            .await
5164            .unwrap();
5165        assert_eq!(push_resp.status(), 200);
5166        let push_data: serde_json::Value = push_resp.json().await.unwrap();
5167        assert_eq!(push_data["success"], true);
5168
5169        let pull_resp = client
5170            .post(format!("{}/pull", &base))
5171            .json(&serde_json::json!({
5172                "repo_id": "roundtrip-repo",
5173                "known_branches": [],
5174                "max_depth": null
5175            }))
5176            .send()
5177            .await
5178            .unwrap();
5179        assert_eq!(pull_resp.status(), 200);
5180        let pull_data: serde_json::Value = pull_resp.json().await.unwrap();
5181        assert_eq!(pull_data["success"], true);
5182        assert_eq!(pull_data["patches"].as_array().unwrap().len(), 2);
5183        assert_eq!(pull_data["branches"].as_array().unwrap().len(), 1);
5184    }
5185
5186    #[tokio::test]
5187    async fn test_http_push_compressed() {
5188        let (_hub, _port, base) = start_test_hub().await;
5189        let client = reqwest::Client::new();
5190        let a_hex = "a".repeat(64);
5191        let blob_data = b"compressed test data";
5192        let blob_hash = "cafebabe".repeat(8);
5193        let compressed = suture_protocol::compress(blob_data).unwrap();
5194
5195        let push_body = serde_json::json!({
5196            "repo_id": "comp-repo",
5197            "patches": [{
5198                "id": {"value": &a_hex},
5199                "operation_type": "Create",
5200                "touch_set": ["f"],
5201                "target_path": "f",
5202                "payload": &blob_hash,
5203                "parent_ids": [],
5204                "author": "alice",
5205                "message": "p",
5206                "timestamp": 0
5207            }],
5208            "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5209            "blobs": [{"hash": {"value": &blob_hash}, "data": base64_encode(&compressed)}]
5210        });
5211
5212        let resp = client
5213            .post(format!("{}/push/compressed", &base))
5214            .json(&push_body)
5215            .send()
5216            .await
5217            .unwrap();
5218        assert_eq!(resp.status(), 200);
5219        let data: serde_json::Value = resp.json().await.unwrap();
5220        assert_eq!(data["success"], true);
5221    }
5222
5223    #[tokio::test]
5224    async fn test_http_v2_handshake() {
5225        let (_hub, _port, base) = start_test_hub().await;
5226        let client = reqwest::Client::new();
5227
5228        let resp = client
5229            .post(format!("{}/v2/handshake", &base))
5230            .json(&serde_json::json!({
5231                "client_version": 2,
5232                "client_name": "test-v2",
5233                "capabilities": {
5234                    "supports_delta": true,
5235                    "supports_compression": false,
5236                    "max_blob_size": 0
5237                }
5238            }))
5239            .send()
5240            .await
5241            .unwrap();
5242        assert_eq!(resp.status(), 200);
5243        let data: serde_json::Value = resp.json().await.unwrap();
5244        assert_eq!(data["server_version"], 2);
5245        assert_eq!(data["compatible"], true);
5246        assert_eq!(data["server_capabilities"]["supports_delta"], true);
5247    }
5248
5249    #[tokio::test]
5250    async fn test_http_v2_push_pull() {
5251        let (_hub, _port, base) = start_test_hub().await;
5252        let client = reqwest::Client::new();
5253        let a_hex = "a".repeat(64);
5254        let f_hash = blake3::hash(b"f").to_hex().to_string();
5255
5256        let push_body = serde_json::json!({
5257            "repo_id": "v2-repo",
5258            "patches": [{
5259                "id": {"value": &a_hex},
5260                "operation_type": "Create",
5261                "touch_set": ["f"],
5262                "target_path": "f",
5263                "payload": "hello world",
5264                "parent_ids": [],
5265                "author": "alice",
5266                "message": "v2 patch",
5267                "timestamp": 0
5268            }],
5269            "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5270            "blobs": [{"hash": {"value": &f_hash}, "data": "aGVsbG8gd29ybGQ="}],
5271            "deltas": [],
5272            "signature": null,
5273            "known_branches": null,
5274            "force": false
5275        });
5276
5277        let push_resp = post_json(&client, &format!("{}/v2/push", &base), &push_body)
5278            .send()
5279            .await
5280            .unwrap();
5281        assert_eq!(
5282            push_resp.status(),
5283            200,
5284            "V2 push failed: {}",
5285            push_resp.status()
5286        );
5287
5288        let pull_body = serde_json::json!({
5289            "repo_id": "v2-repo",
5290            "known_branches": [],
5291            "max_depth": null,
5292            "known_blob_hashes": [],
5293            "capabilities": {
5294                "supports_delta": false,
5295                "supports_compression": false,
5296                "max_blob_size": 0
5297            }
5298        });
5299
5300        let pull_resp = post_json(&client, &format!("{}/v2/pull", &base), &pull_body)
5301            .send()
5302            .await
5303            .unwrap();
5304        assert_eq!(pull_resp.status(), 200);
5305        let pull_data: serde_json::Value = pull_resp.json().await.unwrap();
5306        assert_eq!(pull_data["success"], true);
5307        assert_eq!(pull_data["patches"].as_array().unwrap().len(), 1);
5308        assert_eq!(pull_data["protocol_version"], 2);
5309    }
5310
5311    #[tokio::test]
5312    async fn test_http_auth_token_bootstrap() {
5313        let (_hub, _port, base) = start_test_hub().await;
5314        let client = reqwest::Client::new();
5315
5316        let resp = client
5317            .post(format!("{}/auth/token", &base))
5318            .send()
5319            .await
5320            .unwrap();
5321        assert_eq!(resp.status(), 200);
5322        let data: serde_json::Value = resp.json().await.unwrap();
5323        assert!(!data["token"].as_str().unwrap().is_empty());
5324        assert!(data["created_at"].as_u64().unwrap() > 0);
5325    }
5326
5327    #[tokio::test]
5328    async fn test_http_auth_verify() {
5329        let (_hub, _port, base) = start_test_hub().await;
5330        let client = reqwest::Client::new();
5331
5332        let token_resp = client
5333            .post(format!("{}/auth/token", &base))
5334            .send()
5335            .await
5336            .unwrap();
5337        let token_data: serde_json::Value = token_resp.json().await.unwrap();
5338        let token = token_data["token"].as_str().unwrap().to_string();
5339
5340        let verify_resp = client
5341            .post(format!("{}/auth/verify", &base))
5342            .json(&serde_json::json!({
5343                "method": {"Token": &token},
5344                "timestamp": 0
5345            }))
5346            .send()
5347            .await
5348            .unwrap();
5349        assert_eq!(verify_resp.status(), 200);
5350        let verify_data: serde_json::Value = verify_resp.json().await.unwrap();
5351        assert_eq!(verify_data["valid"], true);
5352
5353        let bad_resp = client
5354            .post(format!("{}/auth/verify", &base))
5355            .json(&serde_json::json!({
5356                "method": {"Token": "invalid-token-xyz"},
5357                "timestamp": 0
5358            }))
5359            .send()
5360            .await
5361            .unwrap();
5362        let bad_data: serde_json::Value = bad_resp.json().await.unwrap();
5363        assert_eq!(bad_data["valid"], false);
5364    }
5365
5366    #[tokio::test]
5367    async fn test_http_auth_register() {
5368        let (hub, _port, base) = start_test_hub_auth().await;
5369        let client = reqwest::Client::new();
5370        let admin_token = create_test_user_hub(&hub, "http-admin", "HTTP Admin", "admin").await;
5371
5372        let resp = post_json(
5373            &client,
5374            &format!("{}/auth/register", &base),
5375            &serde_json::json!({
5376                "username": "new-http-user",
5377                "display_name": "New HTTP User",
5378                "role": "member"
5379            }),
5380        )
5381        .header("Authorization", make_auth_header_val(&admin_token))
5382        .send()
5383        .await
5384        .unwrap();
5385        assert_eq!(resp.status(), 201);
5386        let data: serde_json::Value = resp.json().await.unwrap();
5387        assert_eq!(data["success"], true);
5388        assert_eq!(data["user"]["username"], "new-http-user");
5389    }
5390
5391    #[tokio::test]
5392    async fn test_http_users_list() {
5393        let (hub, _port, base) = start_test_hub_auth().await;
5394        let client = reqwest::Client::new();
5395        let admin_token = create_test_user_hub(&hub, "ul-admin", "UL Admin", "admin").await;
5396
5397        let resp = client
5398            .get(format!("{}/users", &base))
5399            .header("Authorization", make_auth_header_val(&admin_token))
5400            .send()
5401            .await
5402            .unwrap();
5403        assert_eq!(resp.status(), 200);
5404        let data: serde_json::Value = resp.json().await.unwrap();
5405        assert_eq!(data["success"], true);
5406        assert!(!data["users"].as_array().unwrap().is_empty());
5407    }
5408
5409    #[tokio::test]
5410    async fn test_http_user_crud() {
5411        let (hub, _port, base) = start_test_hub_auth().await;
5412        let client = reqwest::Client::new();
5413        let admin_token = create_test_user_hub(&hub, "crud-admin", "CRUD Admin", "admin").await;
5414        create_test_user_hub(&hub, "crud-target", "CRUD Target", "reader").await;
5415        let auth = make_auth_header_val(&admin_token);
5416
5417        let get_resp = client
5418            .get(format!("{}/users/crud-target", &base))
5419            .header("Authorization", &auth)
5420            .send()
5421            .await
5422            .unwrap();
5423        assert_eq!(get_resp.status(), 200);
5424        let get_data: serde_json::Value = get_resp.json().await.unwrap();
5425        assert_eq!(get_data["user"]["username"], "crud-target");
5426        assert_eq!(get_data["user"]["role"], "reader");
5427
5428        let patch_resp = patch_json(
5429            &client,
5430            &format!("{}/users/crud-target/role", &base),
5431            &serde_json::json!({"role": "admin"}),
5432        )
5433        .header("Authorization", &auth)
5434        .send()
5435        .await
5436        .unwrap();
5437        assert_eq!(patch_resp.status(), 200);
5438        let patch_data: serde_json::Value = patch_resp.json().await.unwrap();
5439        assert_eq!(patch_data["success"], true);
5440
5441        let del_resp = client
5442            .delete(format!("{}/users/crud-target", &base))
5443            .header("Authorization", &auth)
5444            .send()
5445            .await
5446            .unwrap();
5447        assert_eq!(del_resp.status(), 200);
5448        let del_data: serde_json::Value = del_resp.json().await.unwrap();
5449        assert_eq!(del_data["success"], true);
5450    }
5451
5452    #[tokio::test]
5453    async fn test_http_mirror_setup_and_status() {
5454        let (_hub, _port, base) = start_test_hub().await;
5455        let client = reqwest::Client::new();
5456
5457        let setup_resp = post_json(
5458            &client,
5459            &format!("{}/mirror/setup", &base),
5460            &serde_json::json!({
5461                "repo_name": "mirrored",
5462                "upstream_url": "http://example.com",
5463                "upstream_repo": "upstream/repo"
5464            }),
5465        )
5466        .send()
5467        .await
5468        .unwrap();
5469        assert_eq!(setup_resp.status(), 200);
5470        let setup_data: serde_json::Value = setup_resp.json().await.unwrap();
5471        assert_eq!(setup_data["success"], true);
5472
5473        let status_resp = post_json(
5474            &client,
5475            &format!("{}/mirror/status", &base),
5476            &serde_json::json!({}),
5477        )
5478        .send()
5479        .await
5480        .unwrap();
5481        assert_eq!(status_resp.status(), 200);
5482        let status_data: serde_json::Value = status_resp.json().await.unwrap();
5483        assert_eq!(status_data["mirrors"].as_array().unwrap().len(), 1);
5484    }
5485
5486    #[tokio::test]
5487    async fn test_http_replication() {
5488        let (hub, _port, base) = start_test_hub().await;
5489        let client = reqwest::Client::new();
5490        hub.set_replication_role("leader");
5491
5492        let add_resp = client
5493            .post(format!("{}/replication/peers", &base))
5494            .json(&serde_json::json!({
5495                "peer_url": "http://follower1:8080",
5496                "role": "follower"
5497            }))
5498            .send()
5499            .await
5500            .unwrap();
5501        assert_eq!(add_resp.status(), 200);
5502        let add_data: serde_json::Value = add_resp.json().await.unwrap();
5503        assert_eq!(add_data["success"], true);
5504
5505        let list_resp = client
5506            .get(format!("{}/replication/peers", &base))
5507            .send()
5508            .await
5509            .unwrap();
5510        assert_eq!(list_resp.status(), 200);
5511        let list_data: serde_json::Value = list_resp.json().await.unwrap();
5512        assert_eq!(list_data["peers"].as_array().unwrap().len(), 1);
5513
5514        let status_resp = client
5515            .get(format!("{}/replication/status", &base))
5516            .send()
5517            .await
5518            .unwrap();
5519        assert_eq!(status_resp.status(), 200);
5520        let status_data: serde_json::Value = status_resp.json().await.unwrap();
5521        assert_eq!(status_data["status"]["peer_count"], 1);
5522    }
5523
5524    #[tokio::test]
5525    async fn test_http_branch_protection() {
5526        let (_hub, _port, base) = start_test_hub().await;
5527        let client = reqwest::Client::new();
5528
5529        let protect_resp = client
5530            .post(format!("{}/repos/prot-repo/protect/main", &base))
5531            .send()
5532            .await
5533            .unwrap();
5534        assert_eq!(protect_resp.status(), 200);
5535        let protect_data: serde_json::Value = protect_resp.json().await.unwrap();
5536        assert_eq!(protect_data["success"], true);
5537
5538        let unprotect_resp = client
5539            .post(format!("{}/repos/prot-repo/unprotect/main", &base))
5540            .send()
5541            .await
5542            .unwrap();
5543        assert_eq!(unprotect_resp.status(), 200);
5544        let unprotect_data: serde_json::Value = unprotect_resp.json().await.unwrap();
5545        assert_eq!(unprotect_data["success"], true);
5546    }
5547
5548    #[tokio::test]
5549    async fn test_http_404_unknown_route() {
5550        let (_hub, _port, base) = start_test_hub().await;
5551        let client = reqwest::Client::new();
5552
5553        let resp = client
5554            .get(format!("{}/nonexistent", &base))
5555            .send()
5556            .await
5557            .unwrap();
5558        assert_eq!(resp.status(), 404);
5559    }
5560
5561    // === v1.3 new route tests ===
5562
5563    #[tokio::test]
5564    async fn test_http_create_repo() {
5565        let (hub, _port, base) = start_test_hub_auth().await;
5566        let client = reqwest::Client::new();
5567        let admin_token = create_test_user_hub(&hub, "repo-admin", "Repo Admin", "admin").await;
5568
5569        // Store a token in the tokens table to activate auth enforcement
5570        {
5571            let store = hub.storage.write().await;
5572            store
5573                .store_token(&admin_token, 1000, "test token", i64::MAX)
5574                .unwrap();
5575        }
5576
5577        // Create a repo via POST (authenticated)
5578        let resp = post_json(
5579            &client,
5580            &format!("{}/repos", &base),
5581            &serde_json::json!({
5582                "repo_id": "new-repo"
5583            }),
5584        )
5585        .header("Authorization", make_auth_header_val(&admin_token))
5586        .send()
5587        .await
5588        .unwrap();
5589        assert_eq!(resp.status(), 201);
5590        let data: serde_json::Value = resp.json().await.unwrap();
5591        assert_eq!(data["success"], true);
5592        assert_eq!(data["repo_id"], "new-repo");
5593
5594        // Verify it shows up in list
5595        let list_resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
5596        let list_data: serde_json::Value = list_resp.json().await.unwrap();
5597        assert!(
5598            list_data["repo_ids"]
5599                .as_array()
5600                .unwrap()
5601                .contains(&serde_json::json!("new-repo"))
5602        );
5603
5604        // Creating duplicate should still succeed (idempotent)
5605        let resp2 = post_json(
5606            &client,
5607            &format!("{}/repos", &base),
5608            &serde_json::json!({
5609                "repo_id": "new-repo"
5610            }),
5611        )
5612        .header("Authorization", make_auth_header_val(&admin_token))
5613        .send()
5614        .await
5615        .unwrap();
5616        assert_eq!(resp2.status(), 201);
5617
5618        // Unauthenticated should fail (now that tokens table is populated)
5619        let resp3 = post_json(
5620            &client,
5621            &format!("{}/repos", &base),
5622            &serde_json::json!({
5623                "repo_id": "noauth-repo"
5624            }),
5625        )
5626        .send()
5627        .await
5628        .unwrap();
5629        assert_eq!(resp3.status(), 401);
5630    }
5631
5632    #[tokio::test]
5633    async fn test_http_delete_repo() {
5634        let (hub, _port, base) = start_test_hub_auth().await;
5635        let client = reqwest::Client::new();
5636        let admin_token = create_test_user_hub(&hub, "del-admin", "Del Admin", "admin").await;
5637        let a_hex = "a".repeat(64);
5638
5639        // Create a repo with data
5640        hub.handle_push(PushRequest {
5641            repo_id: "delete-me".to_string(),
5642            patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5643            branches: vec![make_branch("main", &a_hex)],
5644            blobs: vec![],
5645            signature: None,
5646            known_branches: None,
5647            force: false,
5648        })
5649        .await
5650        .unwrap();
5651
5652        // Verify it exists
5653        let list_resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
5654        let list_data: serde_json::Value = list_resp.json().await.unwrap();
5655        assert!(
5656            list_data["repo_ids"]
5657                .as_array()
5658                .unwrap()
5659                .contains(&serde_json::json!("delete-me"))
5660        );
5661
5662        // Delete it
5663        let del_resp = client
5664            .delete(format!("{}/repos/delete-me", &base))
5665            .header("Authorization", make_auth_header_val(&admin_token))
5666            .send()
5667            .await
5668            .unwrap();
5669        assert_eq!(del_resp.status(), 200);
5670        let del_data: serde_json::Value = del_resp.json().await.unwrap();
5671        assert_eq!(del_data["success"], true);
5672
5673        // Verify it's gone
5674        let list_resp2 = client.get(format!("{}/repos", &base)).send().await.unwrap();
5675        let list_data2: serde_json::Value = list_resp2.json().await.unwrap();
5676        assert!(
5677            !list_data2["repo_ids"]
5678                .as_array()
5679                .unwrap()
5680                .contains(&serde_json::json!("delete-me"))
5681        );
5682    }
5683
5684    #[tokio::test]
5685    async fn test_http_create_branch() {
5686        let (hub, _port, base) = start_test_hub_auth().await;
5687        let client = reqwest::Client::new();
5688        let admin_token = create_test_user_hub(&hub, "branch-admin", "Branch Admin", "admin").await;
5689        let a_hex = "a".repeat(64);
5690
5691        // Create repo with initial data
5692        hub.handle_push(PushRequest {
5693            repo_id: "branch-repo-2".to_string(),
5694            patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5695            branches: vec![make_branch("main", &a_hex)],
5696            blobs: vec![],
5697            signature: None,
5698            known_branches: None,
5699            force: false,
5700        })
5701        .await
5702        .unwrap();
5703
5704        // Create a new branch
5705        let resp = post_json(
5706            &client,
5707            &format!("{}/repos/branch-repo-2/branches", &base),
5708            &serde_json::json!({
5709                "name": "feature",
5710                "target": &a_hex
5711            }),
5712        )
5713        .header("Authorization", make_auth_header_val(&admin_token))
5714        .send()
5715        .await
5716        .unwrap();
5717        assert_eq!(resp.status(), 201);
5718        let data: serde_json::Value = resp.json().await.unwrap();
5719        assert_eq!(data["success"], true);
5720
5721        // Verify both branches exist
5722        let br_resp = client
5723            .get(format!("{}/repos/branch-repo-2/branches", &base))
5724            .send()
5725            .await
5726            .unwrap();
5727        let br_data: serde_json::Value = br_resp.json().await.unwrap();
5728        assert_eq!(br_data.as_array().unwrap().len(), 2);
5729    }
5730
5731    #[tokio::test]
5732    async fn test_http_delete_branch() {
5733        let (hub, _port, base) = start_test_hub_auth().await;
5734        let client = reqwest::Client::new();
5735        let admin_token = create_test_user_hub(&hub, "delbr-admin", "DelBr Admin", "admin").await;
5736        let a_hex = "a".repeat(64);
5737
5738        // Create repo with two branches
5739        hub.handle_push(PushRequest {
5740            repo_id: "delbr-repo".to_string(),
5741            patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5742            branches: vec![make_branch("main", &a_hex), make_branch("dev", &a_hex)],
5743            blobs: vec![],
5744            signature: None,
5745            known_branches: None,
5746            force: false,
5747        })
5748        .await
5749        .unwrap();
5750
5751        // Delete dev branch
5752        let resp = client
5753            .delete(format!("{}/repos/delbr-repo/branches/dev", &base))
5754            .header("Authorization", make_auth_header_val(&admin_token))
5755            .send()
5756            .await
5757            .unwrap();
5758        assert_eq!(resp.status(), 200);
5759        let data: serde_json::Value = resp.json().await.unwrap();
5760        assert_eq!(data["success"], true);
5761
5762        // Verify only main remains
5763        let br_resp = client
5764            .get(format!("{}/repos/delbr-repo/branches", &base))
5765            .send()
5766            .await
5767            .unwrap();
5768        let br_data: serde_json::Value = br_resp.json().await.unwrap();
5769        assert_eq!(br_data.as_array().unwrap().len(), 1);
5770        assert_eq!(br_data[0]["name"], "main");
5771    }
5772
5773    #[tokio::test]
5774    async fn test_http_get_blob() {
5775        let (_hub, _port, base) = start_test_hub().await;
5776        let client = reqwest::Client::new();
5777        let a_hex = "a".repeat(64);
5778        let f_hash = blake3::hash(b"hello blob").to_hex().to_string();
5779        let blob_bytes = b"hello blob";
5780        let compressed = suture_protocol::compress(blob_bytes).unwrap();
5781
5782        // Push with compressed blob — server decompresses before storing
5783        let push_body = serde_json::json!({
5784            "repo_id": "blob-repo",
5785            "patches": [{
5786                "id": {"value": &a_hex},
5787                "operation_type": "Create",
5788                "touch_set": ["f"],
5789                "target_path": "f",
5790                "payload": &f_hash,
5791                "parent_ids": [],
5792                "author": "alice",
5793                "message": "p",
5794                "timestamp": 0
5795            }],
5796            "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5797            "blobs": [{"hash": {"value": &f_hash}, "data": base64_encode(&compressed)}]
5798        });
5799        let push_resp = client
5800            .post(format!("{}/push/compressed", &base))
5801            .json(&push_body)
5802            .send()
5803            .await
5804            .unwrap();
5805        assert_eq!(push_resp.status(), 200);
5806
5807        // Get blob — returns base64-encoded raw bytes (already decompressed by push handler)
5808        let blob_resp = client
5809            .get(format!("{}/repos/blob-repo/blobs/{}", &base, &f_hash))
5810            .send()
5811            .await
5812            .unwrap();
5813        assert_eq!(blob_resp.status(), 200);
5814        let blob_data: serde_json::Value = blob_resp.json().await.unwrap();
5815        assert_eq!(blob_data["success"], true);
5816        let decoded = base64_decode(blob_data["data"].as_str().unwrap()).unwrap();
5817        assert_eq!(decoded, blob_bytes);
5818
5819        // Nonexistent blob
5820        let miss_resp = client
5821            .get(format!(
5822                "{}/repos/blob-repo/blobs/{}",
5823                &base,
5824                "0".repeat(64)
5825            ))
5826            .send()
5827            .await
5828            .unwrap();
5829        assert_eq!(miss_resp.status(), 404);
5830    }
5831
5832    #[tokio::test]
5833    async fn test_http_login() {
5834        let (hub, _port, base) = start_test_hub().await;
5835        let client = reqwest::Client::new();
5836
5837        // Create a user and store token in BOTH users and tokens tables
5838        let token = create_test_user_hub(&hub, "login-user", "Login User", "member").await;
5839        {
5840            let store = hub.storage.write().await;
5841            store
5842                .store_token(&token, 1000, "login test token", i64::MAX)
5843                .unwrap();
5844        }
5845
5846        // Login with the token
5847        let resp = post_json(
5848            &client,
5849            &format!("{}/auth/login", &base),
5850            &serde_json::json!({
5851                "username": "login-user",
5852                "token": &token
5853            }),
5854        )
5855        .send()
5856        .await
5857        .unwrap();
5858        assert_eq!(resp.status(), 200);
5859        let data: serde_json::Value = resp.json().await.unwrap();
5860        assert_eq!(data["success"], true);
5861        assert_eq!(data["user"]["username"], "login-user");
5862
5863        // Login with invalid token
5864        let bad_resp = post_json(
5865            &client,
5866            &format!("{}/auth/login", &base),
5867            &serde_json::json!({
5868                "username": "login-user",
5869                "token": "invalid-token"
5870            }),
5871        )
5872        .send()
5873        .await
5874        .unwrap();
5875        assert_eq!(bad_resp.status(), 401);
5876    }
5877
5878    #[tokio::test]
5879    async fn test_http_search() {
5880        let (_hub, _port, base) = start_test_hub().await;
5881        let client = reqwest::Client::new();
5882        let a_hex = "a".repeat(64);
5883
5884        // Push to create searchable data
5885        let push_body = serde_json::json!({
5886            "repo_id": "search-test-repo",
5887            "patches": [{
5888                "id": {"value": &a_hex},
5889                "operation_type": "Create",
5890                "touch_set": ["README.md"],
5891                "target_path": "README.md",
5892                "payload": "",
5893                "parent_ids": [],
5894                "author": "searcher",
5895                "message": "initial commit for search",
5896                "timestamp": 0
5897            }],
5898            "branches": [],
5899            "blobs": []
5900        });
5901        let push_resp = client
5902            .post(format!("{}/push", &base))
5903            .json(&push_body)
5904            .send()
5905            .await
5906            .unwrap();
5907        assert_eq!(push_resp.status(), 200);
5908
5909        // Search for repo by name
5910        let resp = client
5911            .get(format!("{}/search?q=search-test", &base))
5912            .send()
5913            .await
5914            .unwrap();
5915        assert_eq!(resp.status(), 200);
5916        let data: serde_json::Value = resp.json().await.unwrap();
5917        assert_eq!(data["repos"].as_array().unwrap().len(), 1);
5918
5919        // Search for patches — same query must match repo name (search only
5920        // searches patches within repos that match the query).
5921        // "search" matches repo "search-test-repo", author "searcher", and message "initial commit for search"
5922        let resp2 = client
5923            .get(format!("{}/search?q=search", &base))
5924            .send()
5925            .await
5926            .unwrap();
5927        assert_eq!(resp2.status(), 200);
5928        let data2: serde_json::Value = resp2.json().await.unwrap();
5929        assert!(!data2["patches"].as_array().unwrap().is_empty());
5930
5931        // Empty search
5932        let resp3 = client
5933            .get(format!("{}/search?q=nonexistent_xyz", &base))
5934            .send()
5935            .await
5936            .unwrap();
5937        assert_eq!(resp3.status(), 200);
5938        let data3: serde_json::Value = resp3.json().await.unwrap();
5939        assert_eq!(data3["repos"].as_array().unwrap().len(), 0);
5940    }
5941
5942    #[tokio::test]
5943    async fn test_http_activity() {
5944        let (_hub, _port, base) = start_test_hub().await;
5945        let client = reqwest::Client::new();
5946
5947        // Activity should work even with no data
5948        let resp = client
5949            .get(format!("{}/activity", &base))
5950            .send()
5951            .await
5952            .unwrap();
5953        assert_eq!(resp.status(), 200);
5954        let data: serde_json::Value = resp.json().await.unwrap();
5955        // entries should be an array (may be empty)
5956        assert!(data["entries"].is_array());
5957    }
5958
5959    #[tokio::test]
5960    async fn test_http_delete_mirror() {
5961        let (hub, _port, base) = start_test_hub_auth().await;
5962        let client = reqwest::Client::new();
5963        let admin_token = create_test_user_hub(&hub, "mirror-admin", "Mirror Admin", "admin").await;
5964
5965        // Setup a mirror
5966        let setup_resp = post_json(
5967            &client,
5968            &format!("{}/mirror/setup", &base),
5969            &serde_json::json!({
5970                "repo_name": "mirrored-del",
5971                "upstream_url": "http://example.com/del",
5972                "upstream_repo": "upstream/del"
5973            }),
5974        )
5975        .send()
5976        .await
5977        .unwrap();
5978        assert_eq!(setup_resp.status(), 200);
5979        let setup_data: serde_json::Value = setup_resp.json().await.unwrap();
5980        let mirror_id = setup_data["mirror_id"].as_i64().unwrap();
5981
5982        // Delete the mirror
5983        let del_resp = client
5984            .delete(format!("{}/mirrors/{}", &base, mirror_id))
5985            .header("Authorization", make_auth_header_val(&admin_token))
5986            .send()
5987            .await
5988            .unwrap();
5989        assert_eq!(del_resp.status(), 200);
5990        let del_data: serde_json::Value = del_resp.json().await.unwrap();
5991        assert_eq!(del_data["success"], true);
5992
5993        // Verify mirror is gone
5994        let status_resp = post_json(
5995            &client,
5996            &format!("{}/mirror/status", &base),
5997            &serde_json::json!({}),
5998        )
5999        .send()
6000        .await
6001        .unwrap();
6002        let status_data: serde_json::Value = status_resp.json().await.unwrap();
6003        assert_eq!(status_data["mirrors"].as_array().unwrap().len(), 0);
6004    }
6005
6006    #[tokio::test]
6007    async fn test_http_mirror_status_get() {
6008        let (_hub, _port, base) = start_test_hub().await;
6009        let client = reqwest::Client::new();
6010
6011        // GET /mirror/status should work (no body needed)
6012        let resp = client
6013            .get(format!("{}/mirror/status", &base))
6014            .send()
6015            .await
6016            .unwrap();
6017        assert_eq!(resp.status(), 200);
6018        let data: serde_json::Value = resp.json().await.unwrap();
6019        assert!(data["mirrors"].is_array());
6020    }
6021
6022    #[tokio::test]
6023    async fn test_http_repo_tree() {
6024        let (hub, _port, base) = start_test_hub().await;
6025        let client = reqwest::Client::new();
6026        let a_hex = "a".repeat(64);
6027        let b_hex = "b".repeat(64);
6028        let c_hex = "c".repeat(64);
6029        let d_hex = "d".repeat(64);
6030
6031        hub.handle_push(PushRequest {
6032            repo_id: "tree-repo".to_string(),
6033            patches: vec![
6034                PatchProto {
6035                    id: make_hash_proto(&a_hex),
6036                    operation_type: "Create".to_string(),
6037                    touch_set: vec!["src/main.rs".to_string()],
6038                    target_path: Some("src/main.rs".to_string()),
6039                    payload: "blob_aaa".to_string(),
6040                    parent_ids: vec![],
6041                    author: "alice".to_string(),
6042                    message: "create main".to_string(),
6043                    timestamp: 100,
6044                },
6045                PatchProto {
6046                    id: make_hash_proto(&b_hex),
6047                    operation_type: "Create".to_string(),
6048                    touch_set: vec!["src/lib.rs".to_string()],
6049                    target_path: Some("src/lib.rs".to_string()),
6050                    payload: "blob_bbb".to_string(),
6051                    parent_ids: vec![make_hash_proto(&a_hex)],
6052                    author: "alice".to_string(),
6053                    message: "create lib".to_string(),
6054                    timestamp: 200,
6055                },
6056                PatchProto {
6057                    id: make_hash_proto(&c_hex),
6058                    operation_type: "Delete".to_string(),
6059                    touch_set: vec!["src/main.rs".to_string()],
6060                    target_path: Some("src/main.rs".to_string()),
6061                    payload: String::new(),
6062                    parent_ids: vec![make_hash_proto(&b_hex)],
6063                    author: "alice".to_string(),
6064                    message: "delete main".to_string(),
6065                    timestamp: 300,
6066                },
6067                PatchProto {
6068                    id: make_hash_proto(&d_hex),
6069                    operation_type: "Modify".to_string(),
6070                    touch_set: vec!["src/lib.rs".to_string()],
6071                    target_path: Some("src/lib.rs".to_string()),
6072                    payload: "blob_ddd".to_string(),
6073                    parent_ids: vec![make_hash_proto(&c_hex)],
6074                    author: "bob".to_string(),
6075                    message: "modify lib".to_string(),
6076                    timestamp: 400,
6077                },
6078            ],
6079            branches: vec![make_branch("main", &d_hex)],
6080            blobs: vec![],
6081            signature: None,
6082            known_branches: None,
6083            force: false,
6084        })
6085        .await
6086        .unwrap();
6087
6088        let resp = client
6089            .get(format!("{}/repos/tree-repo/tree/main", &base))
6090            .send()
6091            .await
6092            .unwrap();
6093        assert_eq!(resp.status(), 200);
6094        let data: serde_json::Value = resp.json().await.unwrap();
6095        assert_eq!(data["success"], true);
6096        let files = data["files"].as_array().unwrap();
6097        assert_eq!(files.len(), 1);
6098        assert_eq!(files[0]["path"], "src/lib.rs");
6099        assert_eq!(files[0]["content_hash"], "blob_ddd");
6100    }
6101
6102    #[tokio::test]
6103    async fn test_http_repo_tree_empty() {
6104        let (_hub, _port, base) = start_test_hub().await;
6105        let client = reqwest::Client::new();
6106
6107        let resp = client
6108            .get(format!("{}/repos/nonexistent/tree/main", &base))
6109            .send()
6110            .await
6111            .unwrap();
6112        assert_eq!(resp.status(), 200);
6113        let data: serde_json::Value = resp.json().await.unwrap();
6114        assert_eq!(data["success"], true);
6115        let files = data["files"].as_array().unwrap();
6116        assert_eq!(files.len(), 0);
6117    }
6118
6119    #[tokio::test]
6120    async fn test_lfs_batch_upload_none() {
6121        let tmp = tempfile::tempdir().unwrap();
6122        let repo_id = "test-repo";
6123        let oid = "aabbccdd";
6124        let prefix = &oid[..2];
6125        let obj_path = tmp
6126            .path()
6127            .join(repo_id)
6128            .join("objects")
6129            .join(prefix)
6130            .join(oid);
6131        std::fs::create_dir_all(obj_path.parent().unwrap()).unwrap();
6132        std::fs::write(&obj_path, b"existing data").unwrap();
6133
6134        let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6135        let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6136        let client = reqwest::Client::new();
6137
6138        let resp = post_json(
6139            &client,
6140            &format!("{}/lfs/batch", &base),
6141            &serde_json::json!({
6142                "repo_id": repo_id,
6143                "operation": "upload",
6144                "objects": [{"oid": oid, "size": 12}],
6145            }),
6146        )
6147        .send()
6148        .await
6149        .unwrap();
6150
6151        assert_eq!(resp.status(), 200);
6152        let data: serde_json::Value = resp.json().await.unwrap();
6153        assert_eq!(data["objects"][0]["action"], "none");
6154    }
6155
6156    #[tokio::test]
6157    async fn test_lfs_upload_download_roundtrip() {
6158        let tmp = tempfile::tempdir().unwrap();
6159        let repo_id = "test-repo";
6160
6161        let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6162        let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6163        let client = reqwest::Client::new();
6164
6165        let payload = b"hello lfs world".repeat(1000);
6166        let hash = sha2::Sha256::digest(&payload);
6167        let oid = hex::encode(hash);
6168
6169        let resp = post_json(
6170            &client,
6171            &format!("{}/lfs/batch", &base),
6172            &serde_json::json!({
6173                "repo_id": repo_id,
6174                "operation": "upload",
6175                "objects": [{"oid": &oid, "size": payload.len()}],
6176            }),
6177        )
6178        .send()
6179        .await
6180        .unwrap();
6181        assert_eq!(resp.status(), 200);
6182        let data: serde_json::Value = resp.json().await.unwrap();
6183        assert_eq!(data["objects"][0]["action"], "upload");
6184
6185        let resp = client
6186            .put(format!("{}/lfs/objects/{}/{}", &base, repo_id, &oid))
6187            .body(payload.clone())
6188            .send()
6189            .await
6190            .unwrap();
6191        assert_eq!(resp.status(), 200);
6192
6193        let resp = post_json(
6194            &client,
6195            &format!("{}/lfs/batch", &base),
6196            &serde_json::json!({
6197                "repo_id": repo_id,
6198                "operation": "download",
6199                "objects": [{"oid": &oid, "size": payload.len()}],
6200            }),
6201        )
6202        .send()
6203        .await
6204        .unwrap();
6205        assert_eq!(resp.status(), 200);
6206        let data: serde_json::Value = resp.json().await.unwrap();
6207        assert_eq!(data["objects"][0]["action"], "download");
6208
6209        let resp = client
6210            .get(format!("{}/lfs/objects/{}/{}", &base, repo_id, &oid))
6211            .send()
6212            .await
6213            .unwrap();
6214        assert_eq!(resp.status(), 200);
6215        let downloaded = resp.bytes().await.unwrap();
6216        assert_eq!(downloaded.as_ref(), payload.as_slice());
6217
6218        let resp = client
6219            .put(format!(
6220                "{}/lfs/objects/{}/{}",
6221                &base, repo_id, "badbadbadbadbadbadbadbadbadbadbadbadbadbadbad"
6222            ))
6223            .body(payload.clone())
6224            .send()
6225            .await
6226            .unwrap();
6227        assert_eq!(resp.status(), 400);
6228    }
6229
6230    #[tokio::test]
6231    async fn test_lfs_batch_download_missing() {
6232        let tmp = tempfile::tempdir().unwrap();
6233        let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6234        let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6235        let client = reqwest::Client::new();
6236
6237        let resp = post_json(&client, &format!("{}/lfs/batch", &base), &serde_json::json!({
6238            "repo_id": "test-repo",
6239            "operation": "download",
6240            "objects": [{"oid": "ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00", "size": 100}],
6241        }))
6242        .send().await.unwrap();
6243
6244        assert_eq!(resp.status(), 200);
6245        let data: serde_json::Value = resp.json().await.unwrap();
6246        assert_eq!(data["objects"][0]["action"], "none");
6247    }
6248
6249    #[tokio::test]
6250    async fn test_lfs_no_storage_configured() {
6251        let hub = SutureHubServer::new_in_memory();
6252        let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6253        let client = reqwest::Client::new();
6254
6255        let resp = post_json(&client, &format!("{}/lfs/batch", &base), &serde_json::json!({
6256            "repo_id": "test-repo",
6257            "operation": "upload",
6258            "objects": [{"oid": "aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00", "size": 10}],
6259        }))
6260        .send().await.unwrap();
6261        assert_eq!(resp.status(), 503);
6262
6263        let resp = client
6264            .put(format!("{base}/lfs/objects/test-repo/aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00"))
6265            .body(b"some data".to_vec())
6266            .send()
6267            .await
6268            .unwrap();
6269        assert_eq!(resp.status(), 503);
6270    }
6271
6272    #[cfg(feature = "s3-backend")]
6273    #[test]
6274    fn test_blob_backend_used_when_set() {
6275        use crate::blob_backend::BlobBackend;
6276
6277        struct MockBackend {
6278            store_called: std::sync::atomic::AtomicBool,
6279            get_called: std::sync::atomic::AtomicBool,
6280        }
6281
6282        impl MockBackend {
6283            fn new() -> Self {
6284                Self {
6285                    store_called: std::sync::atomic::AtomicBool::new(false),
6286                    get_called: std::sync::atomic::AtomicBool::new(false),
6287                }
6288            }
6289        }
6290
6291        impl BlobBackend for MockBackend {
6292            fn store_blob(
6293                &self,
6294                _repo_id: &str,
6295                _hash_hex: &str,
6296                _data: &[u8],
6297            ) -> Result<(), String> {
6298                self.store_called
6299                    .store(true, std::sync::atomic::Ordering::Relaxed);
6300                Ok(())
6301            }
6302            fn get_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<Option<Vec<u8>>, String> {
6303                self.get_called
6304                    .store(true, std::sync::atomic::Ordering::Relaxed);
6305                Ok(None)
6306            }
6307            fn has_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<bool, String> {
6308                Ok(false)
6309            }
6310            fn delete_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<(), String> {
6311                Ok(())
6312            }
6313            fn list_blobs(&self, _repo_id: &str) -> Result<Vec<String>, String> {
6314                Ok(vec![])
6315            }
6316        }
6317
6318        let mock = Arc::new(MockBackend::new());
6319        let mut hub = SutureHubServer::new();
6320        hub.set_blob_backend(mock.clone());
6321
6322        let rt = tokio::runtime::Runtime::new().unwrap();
6323        rt.block_on(async {
6324            let store = hub.storage.read().await;
6325            hub.blob_store(&store, "test-repo", &"a".repeat(64), b"data")
6326                .unwrap();
6327            assert!(mock.store_called.load(std::sync::atomic::Ordering::Relaxed));
6328
6329            hub.blob_get(&store, "test-repo", &"a".repeat(64)).unwrap();
6330            assert!(mock.get_called.load(std::sync::atomic::Ordering::Relaxed));
6331        });
6332    }
6333
6334    #[cfg(feature = "raft-cluster")]
6335    #[tokio::test]
6336    async fn test_apply_raft_command_create_repo() {
6337        use crate::raft::HubCommand;
6338
6339        let hub = SutureHubServer::new_in_memory();
6340        hub.apply_raft_command(HubCommand::CreateRepo {
6341            repo_id: "raft-repo".to_string(),
6342        })
6343        .await
6344        .unwrap();
6345
6346        let store = hub.storage.read().await;
6347        assert!(store.repo_exists("raft-repo").unwrap_or(false));
6348    }
6349
6350    #[cfg(feature = "raft-cluster")]
6351    #[tokio::test]
6352    async fn test_apply_raft_command_delete_repo() {
6353        use crate::raft::HubCommand;
6354
6355        let hub = SutureHubServer::new_in_memory();
6356        {
6357            let store = hub.storage.write().await;
6358            store.ensure_repo("del-repo").unwrap();
6359        }
6360        hub.apply_raft_command(HubCommand::DeleteRepo {
6361            repo_id: "del-repo".to_string(),
6362        })
6363        .await
6364        .unwrap();
6365
6366        let store = hub.storage.read().await;
6367        assert!(!store.repo_exists("del-repo").unwrap_or(false));
6368    }
6369
6370    #[cfg(feature = "raft-cluster")]
6371    #[tokio::test]
6372    async fn test_apply_raft_command_branch() {
6373        use crate::raft::HubCommand;
6374
6375        let hub = SutureHubServer::new_in_memory();
6376        {
6377            let store = hub.storage.write().await;
6378            store.ensure_repo("br-repo").unwrap();
6379        }
6380
6381        hub.apply_raft_command(HubCommand::CreateBranch {
6382            repo_id: "br-repo".to_string(),
6383            branch: "main".to_string(),
6384            target: "a".repeat(64),
6385        })
6386        .await
6387        .unwrap();
6388
6389        hub.apply_raft_command(HubCommand::UpdateBranch {
6390            repo_id: "br-repo".to_string(),
6391            branch: "main".to_string(),
6392            target: "b".repeat(64),
6393        })
6394        .await
6395        .unwrap();
6396
6397        hub.apply_raft_command(HubCommand::DeleteBranch {
6398            repo_id: "br-repo".to_string(),
6399            branch: "main".to_string(),
6400        })
6401        .await
6402        .unwrap();
6403
6404        let store = hub.storage.read().await;
6405        let branches = store.get_branches("br-repo").unwrap_or_default();
6406        assert!(branches.is_empty());
6407    }
6408}