1use sha2::Digest;
2use axum::{
3 Json,
4 extract::{ConnectInfo, Path, Query, State},
5 http::{HeaderMap, StatusCode},
6 response::{Html, IntoResponse},
7 routing::get,
8};
9use ed25519_dalek::{Signature, Verifier, VerifyingKey};
10use std::collections::HashSet;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14use crate::blob_backend::BlobBackend;
15use crate::middleware::request_id_layer;
16use crate::storage::HubStorage;
17use crate::storage::{ReplicationEntry, ReplicationStatus};
18pub use crate::types::*;
19use crate::webhooks::{Webhook, WebhookManager};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum Role {
23 Admin,
24 Member,
25 Reader,
26}
27
28impl Role {
29 #[must_use]
30 pub fn parse(s: &str) -> Self {
31 match s {
32 "admin" => Self::Admin,
33 "member" => Self::Member,
34 _ => Self::Reader,
35 }
36 }
37
38 #[must_use]
39 pub fn as_str(&self) -> &'static str {
40 match self {
41 Self::Admin => "admin",
42 Self::Member => "member",
43 Self::Reader => "reader",
44 }
45 }
46}
47
48impl PartialOrd for Role {
49 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50 fn rank(r: &Role) -> u8 {
51 match r {
52 Role::Admin => 3,
53 Role::Member => 2,
54 Role::Reader => 1,
55 }
56 }
57 rank(self).partial_cmp(&rank(other))
58 }
59}
60
61#[derive(serde::Deserialize)]
62pub struct PaginationParams {
63 pub offset: Option<u32>,
64 pub limit: Option<u32>,
65 pub cursor: Option<String>,
66}
67
68#[derive(serde::Deserialize)]
69pub struct AuditQueryParams {
70 pub actor: Option<String>,
71 pub action: Option<String>,
72 pub limit: Option<usize>,
73 pub offset: Option<usize>,
74}
75
76#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
77struct CursorData {
78 offset: u64,
79}
80
81fn decode_cursor(cursor: &str) -> Option<u64> {
82 let bytes = base64_decode(cursor).ok()?;
83 let data: CursorData = serde_json::from_slice(&bytes).ok()?;
84 Some(data.offset)
85}
86
87fn encode_cursor(offset: u64) -> String {
88 let data = CursorData { offset };
89 let json = serde_json::to_vec(&data).unwrap_or_default();
90 base64_encode(&json)
91}
92
93pub struct SutureHubServer {
94 pub(crate) storage: Arc<RwLock<HubStorage>>,
95 blob_backend: Option<Arc<dyn BlobBackend>>,
96 no_auth: bool,
97 rate_limits:
98 Arc<std::sync::RwLock<std::collections::HashMap<String, (u32, std::time::Instant)>>>,
99 max_pushes_per_hour: u32,
100 max_pulls_per_hour: u32,
101 max_token_creates_per_minute: u32,
102 rate_limit_window: std::time::Duration,
103 replication_role: Arc<std::sync::RwLock<String>>,
104 webhook_manager: Arc<WebhookManager>,
105 #[allow(dead_code)]
106 rate_limit_db: Option<Arc<tokio::sync::Mutex<rusqlite::Connection>>>,
107 lfs_data_dir: Option<std::path::PathBuf>,
108 #[cfg(feature = "raft-cluster")]
109 raft_node: Arc<tokio::sync::Mutex<suture_raft::RaftNode>>,
110 #[cfg(feature = "raft-cluster")]
111 raft_node_id: u64,
112}
113
114impl Default for SutureHubServer {
115 fn default() -> Self {
116 Self::new_in_memory()
117 }
118}
119
120impl SutureHubServer {
121 #[must_use]
122 pub fn new() -> Self {
123 Self::new_in_memory()
124 }
125
126 #[must_use]
127 pub fn new_in_memory() -> Self {
128 Self {
129 storage: Arc::new(RwLock::new(
130 HubStorage::open_in_memory().expect("in-memory storage must open"),
131 )),
132 blob_backend: None,
133 no_auth: false,
134 rate_limits: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
135 max_pushes_per_hour: 100,
136 max_pulls_per_hour: 1000,
137 max_token_creates_per_minute: 5,
138 rate_limit_window: std::time::Duration::from_secs(60),
139 replication_role: Arc::new(std::sync::RwLock::new("standalone".to_owned())),
140 webhook_manager: Arc::new(WebhookManager::new()),
141 rate_limit_db: None,
142 lfs_data_dir: None,
143 #[cfg(feature = "raft-cluster")]
144 raft_node: Arc::new(tokio::sync::Mutex::new(suture_raft::RaftNode::new(1, vec![]))),
145 #[cfg(feature = "raft-cluster")]
146 raft_node_id: 1,
147 }
148 }
149
150 pub fn with_db(path: &std::path::Path) -> Result<Self, crate::storage::StorageError> {
151 let rate_limit_db_path = path.with_extension("rate.db");
152 let rate_limit_conn = rusqlite::Connection::open(&rate_limit_db_path)?;
153 rate_limit_conn.execute_batch("PRAGMA journal_mode=WAL;")?;
154 rate_limit_conn.execute_batch(
155 "CREATE TABLE IF NOT EXISTS rate_limits (
156 key TEXT PRIMARY KEY,
157 count INTEGER NOT NULL DEFAULT 0,
158 window_start INTEGER NOT NULL
159 );",
160 )?;
161 Ok(Self {
162 storage: Arc::new(RwLock::new(HubStorage::open(path)?)),
163 blob_backend: None,
164 no_auth: false,
165 rate_limits: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
166 max_pushes_per_hour: 100,
167 max_pulls_per_hour: 1000,
168 max_token_creates_per_minute: 5,
169 rate_limit_window: std::time::Duration::from_secs(60),
170 replication_role: Arc::new(std::sync::RwLock::new("standalone".to_owned())),
171 webhook_manager: Arc::new(WebhookManager::new()),
172 rate_limit_db: Some(Arc::new(tokio::sync::Mutex::new(rate_limit_conn))),
173 lfs_data_dir: None,
174 #[cfg(feature = "raft-cluster")]
175 raft_node: Arc::new(tokio::sync::Mutex::new(suture_raft::RaftNode::new(1, vec![]))),
176 #[cfg(feature = "raft-cluster")]
177 raft_node_id: 1,
178 })
179 }
180
181 pub fn set_no_auth(&mut self, no_auth: bool) {
182 self.no_auth = no_auth;
183 }
184
185 #[must_use]
186 pub fn is_no_auth(&self) -> bool {
187 self.no_auth
188 }
189
190 #[must_use]
191 pub fn storage(&self) -> &Arc<RwLock<HubStorage>> {
192 &self.storage
193 }
194
195 pub fn shutdown(&self) {
196 tracing::info!("Hub server shutting down");
197 }
198
199 #[cfg(feature = "raft-cluster")]
205 pub async fn is_leader(&self) -> bool {
206 let raft = self.raft_node.lock().await;
207 matches!(raft.state(), suture_raft::NodeState::Leader)
208 }
209
210 #[cfg(not(feature = "raft-cluster"))]
212 pub async fn is_leader(&self) -> bool {
213 true }
215
216 #[cfg(feature = "raft-cluster")]
218 pub async fn raft_leader(&self) -> Option<u64> {
219 self.raft_node.lock().await.leader()
220 }
221
222 #[cfg(feature = "raft-cluster")]
224 pub async fn raft_state(&self) -> String {
225 let raft = self.raft_node.lock().await;
226 match raft.state() {
227 suture_raft::NodeState::Leader => "leader".to_owned(),
228 suture_raft::NodeState::Follower => "follower".to_owned(),
229 suture_raft::NodeState::Candidate => "candidate".to_owned(),
230 suture_raft::NodeState::PreCandidate => "pre-candidate".to_owned(),
231 }
232 }
233
234 #[cfg(feature = "raft-cluster")]
236 pub async fn raft_propose(&self, command: Vec<u8>) -> Result<(), suture_raft::RaftError> {
237 let mut raft = self.raft_node.lock().await;
238 raft.propose(command)
239 }
240
241 #[cfg(feature = "raft-cluster")]
243 pub async fn raft_committed_entries(&self) -> Vec<suture_raft::LogEntry> {
244 let raft = self.raft_node.lock().await;
245 raft.committed_entries().to_vec()
246 }
247
248 #[cfg(feature = "raft-cluster")]
250 pub async fn raft_advance_applied(&self, count: usize) {
251 let mut raft = self.raft_node.lock().await;
252 raft.advance_applied(count);
253 }
254
255 pub fn set_rate_limit_config(&mut self, pushes: u32, pulls: u32, window: std::time::Duration) {
256 self.max_pushes_per_hour = pushes;
257 self.max_pulls_per_hour = pulls;
258 self.rate_limit_window = window;
259 }
260
261 #[must_use]
262 pub fn with_lfs_dir(mut self, path: std::path::PathBuf) -> Self {
263 if let Err(e) = std::fs::create_dir_all(&path) {
264 tracing::warn!("Failed to create directory {}: {}", path.display(), e);
265 }
266 self.lfs_data_dir = Some(path);
267 self
268 }
269
270 pub fn set_replication_role(&self, role: &str) {
271 *self.replication_role.write().unwrap_or_else(std::sync::PoisonError::into_inner) = role.to_owned();
272 }
273
274 #[must_use]
275 pub fn get_replication_role(&self) -> String {
276 self.replication_role.read().unwrap_or_else(std::sync::PoisonError::into_inner).clone()
277 }
278
279 pub fn set_blob_backend(&mut self, backend: Arc<dyn BlobBackend>) {
280 self.blob_backend = Some(backend);
281 }
282
283 fn blob_store(
284 &self,
285 store: &HubStorage,
286 repo_id: &str,
287 hash_hex: &str,
288 data: &[u8],
289 ) -> Result<(), String> {
290 self.blob_backend.as_ref().map_or_else(
291 || store
292 .store_blob(repo_id, hash_hex, data)
293 .map_err(|e| e.to_string()),
294 |backend| backend.store_blob(repo_id, hash_hex, data),
295 )
296 }
297
298 fn blob_get(
299 &self,
300 store: &HubStorage,
301 repo_id: &str,
302 hash_hex: &str,
303 ) -> Result<Option<Vec<u8>>, String> {
304 self.blob_backend.as_ref().map_or_else(
305 || store.get_blob(repo_id, hash_hex).map_err(|e| e.to_string()),
306 |backend| backend.get_blob(repo_id, hash_hex),
307 )
308 }
309
310 pub async fn log_write(
311 &self,
312 operation: &str,
313 table_name: &str,
314 row_id: &str,
315 data: Option<&str>,
316 ) -> Result<i64, crate::storage::StorageError> {
317 let store = self.storage.write().await;
318 store.log_operation(operation, table_name, row_id, data)
319 }
320
321 pub async fn handle_add_peer(&self, req: AddPeerRequest) -> AddPeerResponse {
322 let store = self.storage.write().await;
323 match store.add_replication_peer(&req.peer_url, &req.role) {
324 Ok(peer_id) => AddPeerResponse {
325 success: true,
326 peer_id: Some(peer_id),
327 error: None,
328 },
329 Err(e) => AddPeerResponse {
330 success: false,
331 peer_id: None,
332 error: Some(format!("{e}")),
333 },
334 }
335 }
336
337 pub async fn handle_remove_peer(&self, id: i64) -> RemovePeerResponse {
338 let store = self.storage.write().await;
339 match store.remove_replication_peer(id) {
340 Ok(()) => RemovePeerResponse {
341 success: true,
342 error: None,
343 },
344 Err(e) => RemovePeerResponse {
345 success: false,
346 error: Some(format!("{e}")),
347 },
348 }
349 }
350
351 pub async fn handle_list_peers(&self) -> ListPeersResponse {
352 let store = self.storage.read().await;
353 ListPeersResponse {
354 peers: store.list_replication_peers().unwrap_or_else(|e| {
355 tracing::warn!("store list_replication_peers failed: {e}");
356 Default::default()
357 }),
358 }
359 }
360
361 pub async fn handle_replication_status(&self) -> ReplicationStatusResponse {
362 let store = self.storage.read().await;
363 let status = match store.get_replication_status() {
364 Ok(s) => s,
365 Err(e) => {
366 tracing::error!("Failed to get replication status: {e}");
367 return ReplicationStatusResponse {
368 status: ReplicationStatus {
369 current_seq: 0,
370 peer_count: 0,
371 peers: vec![],
372 },
373 };
374 }
375 };
376 ReplicationStatusResponse { status }
377 }
378
379 pub async fn handle_replication_sync(&self, entries: Vec<ReplicationEntry>) -> SyncResponse {
380 let store = self.storage.write().await;
381 match store.apply_replication_entries(&entries) {
382 Ok(()) => SyncResponse {
383 success: true,
384 applied: entries.len(),
385 error: None,
386 },
387 Err(e) => SyncResponse {
388 success: false,
389 applied: 0,
390 error: Some(format!("{e}")),
391 },
392 }
393 }
394
395 pub fn check_rate_limit(&self, ip: &str, key: &str) -> Result<(), u64> {
396 let window = self.rate_limit_window;
397 if window.is_zero() {
398 return Ok(());
399 }
400
401 let full_key = format!("{key}:{ip}");
402 let now = std::time::Instant::now();
403 let mut limits = self.rate_limits.write().unwrap_or_else(std::sync::PoisonError::into_inner);
404
405 limits.retain(|_, (_, start)| now.duration_since(*start) < window);
406
407 let limit = match key {
408 "push" => self.max_pushes_per_hour,
409 "pull" => self.max_pulls_per_hour,
410 "token_create" => self.max_token_creates_per_minute,
411 _ => return Ok(()),
412 };
413
414 if let Some(&(count, window_start)) = limits.get(&full_key) {
415 if count >= limit {
416 let elapsed = now.duration_since(window_start);
417 let remaining = window.saturating_sub(elapsed);
418 let retry_after = remaining.as_secs().max(1);
419 return Err(retry_after);
420 }
421 limits.insert(full_key, (count + 1, window_start));
422 } else {
423 limits.insert(full_key, (1, now));
424 }
425
426 Ok(())
427 }
428
429 pub async fn handle_repo_patches_cursor(
430 &self,
431 repo_id: &str,
432 offset: u64,
433 limit: u32,
434 ) -> (Vec<PatchProto>, Option<String>) {
435 let store = self.storage.read().await;
436 let effective_limit = limit.min(200) as usize;
437 let offset = offset as usize;
438 let patches = store.get_all_patches(repo_id, offset, effective_limit + 1).unwrap_or_else(|e| {
439 tracing::warn!("store get_all_patches failed: {e}");
440 Default::default()
441 });
442 let has_more = patches.len() > effective_limit;
443 let mut collected = patches;
444 if has_more {
445 collected.truncate(effective_limit);
446 }
447 let next_cursor = if has_more {
448 Some(encode_cursor(offset as u64 + limit as u64))
449 } else {
450 None
451 };
452 (collected, next_cursor)
453 }
454
455 pub async fn add_authorized_key(
456 &self,
457 author: &str,
458 public_key_bytes: &[u8],
459 ) -> Result<(), crate::storage::StorageError> {
460 let store = self.storage.write().await;
461 store.add_authorized_key(author, public_key_bytes)
462 }
463
464 pub async fn handle_push(
465 &self,
466 req: PushRequest,
467 ) -> Result<PushResponse, (StatusCode, PushResponse)> {
468 if let Some(ref sig_bytes) = req.signature {
469 let store = self.storage.read().await;
470 let has_keys = match store.has_authorized_keys() {
471 Ok(v) => v,
472 Err(e) => {
473 tracing::error!("Failed to check authorized keys: {e}");
474 return Err((
475 StatusCode::INTERNAL_SERVER_ERROR,
476 PushResponse {
477 success: false,
478 error: Some("database error".to_owned()),
479 existing_patches: vec![],
480 },
481 ));
482 }
483 };
484 if has_keys
485 && let Err(e) = verify_push_signature(&store, &req, sig_bytes)
486 {
487 return Err((
488 StatusCode::FORBIDDEN,
489 PushResponse {
490 success: false,
491 error: Some(format!("authentication failed: {e}")),
492 existing_patches: vec![],
493 },
494 ));
495 }
496 } else if !self.no_auth {
497 let store = self.storage.read().await;
498 let has_keys = store.has_authorized_keys().unwrap_or_else(|e| {
499 tracing::error!("Failed to check authorized keys: {e}");
500 true
501 });
502 let has_tokens = store.has_tokens().unwrap_or_else(|e| {
503 tracing::error!("Failed to check tokens: {e}");
504 true
505 });
506 if has_keys || has_tokens {
507 return Err((
508 StatusCode::FORBIDDEN,
509 PushResponse {
510 success: false,
511 error: Some("authentication required: no signature provided".to_owned()),
512 existing_patches: vec![],
513 },
514 ));
515 }
516 }
517
518 let mut existing_patches = Vec::new();
519
520 let store = self.storage.write().await;
521 if let Err(e) = store.ensure_repo(&req.repo_id) {
522 return Err((
523 StatusCode::INTERNAL_SERVER_ERROR,
524 PushResponse {
525 success: false,
526 error: Some(format!("storage error: {e}")),
527 existing_patches: vec![],
528 },
529 ));
530 }
531
532 for blob in &req.blobs {
533 let hex = hash_to_hex(&blob.hash);
534 let data = match base64_decode(&blob.data) {
535 Ok(d) => d,
536 Err(e) => {
537 return Err((
538 StatusCode::BAD_REQUEST,
539 PushResponse {
540 success: false,
541 error: Some(format!("invalid base64 in blob: {e}")),
542 existing_patches: vec![],
543 },
544 ));
545 }
546 };
547 if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
548 return Err((
549 StatusCode::INTERNAL_SERVER_ERROR,
550 PushResponse {
551 success: false,
552 error: Some(format!("storage error: {e}")),
553 existing_patches: vec![],
554 },
555 ));
556 }
557 }
558
559 for patch in &req.patches {
560 let inserted = match store.insert_patch(&req.repo_id, patch) {
561 Ok(i) => i,
562 Err(e) => {
563 return Err((
564 StatusCode::INTERNAL_SERVER_ERROR,
565 PushResponse {
566 success: false,
567 error: Some(format!("storage error: {e}")),
568 existing_patches: vec![],
569 },
570 ));
571 }
572 };
573 if !inserted {
574 existing_patches.push(patch.id.clone());
575 }
576 }
577
578 for patch in &req.patches {
579 if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
580 tracing::warn!("Failed to log operation: {}", e);
581 }
582 }
583
584 for branch in &req.branches {
585 let target_hex = hash_to_hex(&branch.target_id);
586
587 if !req.force
588 && let Some(ref known) = req.known_branches
589 && let Some(known_branch) = known.iter().find(|kb| kb.name == branch.name)
590 {
591 let known_target = hash_to_hex(&known_branch.target_id);
592 if known_target != target_hex
593 && let Ok(Some(current_target)) =
594 store.get_branch_target(&req.repo_id, &branch.name)
595 && !store
596 .is_ancestor(&req.repo_id, ¤t_target, &target_hex)
597 .unwrap_or_else(|e| {
598 tracing::warn!("Failed to check ancestry: {e}");
599 true
600 })
601 {
602 return Err((
603 StatusCode::CONFLICT,
604 PushResponse {
605 success: false,
606 error: Some(format!(
607 "branch '{}' rejected: non-fast-forward push (use --force to override)",
608 branch.name
609 )),
610 existing_patches: vec![],
611 },
612 ));
613 }
614 }
615
616 if store
617 .is_branch_protected(&req.repo_id, &branch.name)
618 .unwrap_or_else(|e| {
619 tracing::warn!("Failed to check branch protection: {e}");
620 false
621 })
622 {
623 let push_authors: std::collections::HashSet<&str> =
624 req.patches.iter().map(|p| p.author.as_str()).collect();
625 let is_owner =
626 push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
627 if !is_owner {
628 return Err((
629 StatusCode::FORBIDDEN,
630 PushResponse {
631 success: false,
632 error: Some(format!(
633 "branch '{}' is protected and can only be updated by its owner",
634 branch.name
635 )),
636 existing_patches: vec![],
637 },
638 ));
639 }
640 }
641
642 if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
643 return Err((
644 StatusCode::INTERNAL_SERVER_ERROR,
645 PushResponse {
646 success: false,
647 error: Some(format!("storage error: {e}")),
648 existing_patches: vec![],
649 },
650 ));
651 }
652 }
653
654 for branch in &req.branches {
655 let target_hex = hash_to_hex(&branch.target_id);
656 if let Err(e) = store.log_operation(
657 "set",
658 "branches",
659 &format!("{}:{}", req.repo_id, branch.name),
660 Some(&target_hex),
661 ) {
662 tracing::warn!("Failed to log operation: {}", e);
663 }
664 }
665
666 let repo_id = req.repo_id.clone();
667 let patch_data = serde_json::json!({
668 "patch_count": req.patches.len(),
669 "branch_count": req.branches.len(),
670 "existing_patches": existing_patches.clone(),
671 });
672 let manager = Arc::clone(&self.webhook_manager);
673 let storage = Arc::clone(&self.storage);
674 tokio::spawn(async move {
675 let hooks = {
676 let store = storage.read().await;
677 store.list_webhooks(&repo_id).unwrap_or_else(|e| {
678 tracing::warn!("store list_webhooks failed: {e}");
679 Default::default()
680 })
681 };
682 if !hooks.is_empty() {
683 let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
684 if result.failed > 0 {
685 tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
686 }
687 }
688 });
689
690 Ok(PushResponse {
691 success: true,
692 error: None,
693 existing_patches,
694 })
695 }
696
697 pub async fn handle_pull(&self, req: PullRequest) -> PullResponse {
698 let store = self.storage.read().await;
699
700 let exists = match store.repo_exists(&req.repo_id) {
701 Ok(e) => e,
702 Err(e) => {
703 tracing::error!("Failed to check repo existence: {e}");
704 return PullResponse {
705 success: false,
706 error: Some(format!("database error: {e}")),
707 patches: vec![],
708 branches: vec![],
709 blobs: vec![],
710 };
711 }
712 };
713 if !exists {
714 return PullResponse {
715 success: false,
716 error: Some(format!("repo not found: {}", req.repo_id)),
717 patches: vec![],
718 branches: vec![],
719 blobs: vec![],
720 };
721 }
722
723 let all_patches = store.get_all_patches_unbounded(&req.repo_id).unwrap_or_else(|e| {
724 tracing::warn!("store get_all_patches failed: {e}");
725 Default::default()
726 });
727 let client_ancestors = collect_ancestors(&all_patches, &req.known_branches);
728 let mut new_patches = collect_new_patches(&all_patches, &client_ancestors);
729
730 if let Some(depth) = req.max_depth {
731 new_patches.truncate(depth as usize);
732 }
733
734 let branches = store.get_branches(&req.repo_id).unwrap_or_else(|e| {
735 tracing::warn!("store get_branches failed: {e}");
736 Default::default()
737 });
738
739 let mut needed_hashes: std::collections::HashSet<String> = std::collections::HashSet::new();
740 for patch in &new_patches {
741 if patch.operation_type == "batch" {
742 if let Ok(decoded) = base64_decode(&patch.payload)
743 && let Ok(changes) = serde_json::from_str::<Vec<serde_json::Value>>(
744 &String::from_utf8_lossy(&decoded),
745 )
746 {
747 for change in &changes {
748 if let Some(payload_val) = change.get("payload").and_then(|v| v.as_array())
749 {
750 let hex_bytes: Vec<u8> = payload_val
751 .iter()
752 .filter_map(|v| v.as_u64().map(|n| n as u8))
753 .collect();
754 if let Ok(hex) = String::from_utf8(hex_bytes) {
755 needed_hashes.insert(hex);
756 }
757 }
758 }
759 }
760 } else if !patch.payload.is_empty() {
761 let hex = if patch.payload.chars().all(|c| c.is_ascii_hexdigit()) {
765 patch.payload.clone()
766 } else if let Ok(decoded) = base64_decode(&patch.payload) {
767 String::from_utf8_lossy(&decoded).to_string()
768 } else {
769 patch.payload.clone()
770 };
771 needed_hashes.insert(hex);
772 }
773 }
774 let blobs = store
775 .get_blobs(&req.repo_id, &needed_hashes)
776 .unwrap_or_else(|e| {
777 tracing::warn!("store get_blobs failed: {e}");
778 Default::default()
779 });
780
781 PullResponse {
782 success: true,
783 error: None,
784 patches: new_patches,
785 branches,
786 blobs,
787 }
788 }
789
790 pub async fn handle_list_repos(&self) -> ListReposResponse {
791 let store = self.storage.read().await;
792 let repo_ids = match store.list_repos() {
793 Ok(r) => r,
794 Err(e) => {
795 tracing::error!("Failed to list repos: {e}");
796 return ListReposResponse { repo_ids: vec![] };
797 }
798 };
799 ListReposResponse { repo_ids }
800 }
801
802 pub async fn handle_repo_info(&self, repo_id: &str) -> RepoInfoResponse {
803 let store = self.storage.read().await;
804
805 let exists = match store.repo_exists(repo_id) {
806 Ok(e) => e,
807 Err(e) => {
808 tracing::error!("Failed to check repo existence: {e}");
809 return RepoInfoResponse {
810 repo_id: repo_id.to_owned(),
811 patch_count: 0,
812 branches: vec![],
813 success: false,
814 error: Some(format!("database error: {e}")),
815 };
816 }
817 };
818 if !exists {
819 return RepoInfoResponse {
820 repo_id: repo_id.to_owned(),
821 patch_count: 0,
822 branches: vec![],
823 success: false,
824 error: Some(format!("repo not found: {repo_id}")),
825 };
826 }
827
828 let patch_count = store.patch_count(repo_id).unwrap_or_else(|e| {
829 tracing::error!("Failed to get patch count: {e}");
830 0
831 });
832 let branches = store.get_branches(repo_id).unwrap_or_else(|e| {
833 tracing::warn!("store get_branches failed: {e}");
834 Default::default()
835 });
836
837 RepoInfoResponse {
838 repo_id: repo_id.to_owned(),
839 patch_count,
840 branches,
841 success: true,
842 error: None,
843 }
844 }
845
846 pub async fn handle_mirror_setup(
847 &self,
848 req: crate::types::MirrorSetupRequest,
849 ) -> crate::types::MirrorSetupResponse {
850 if let Err(e) = validate_mirror_url(&req.upstream_url) {
851 return crate::types::MirrorSetupResponse {
852 success: false,
853 error: Some(format!("invalid upstream URL: {e}")),
854 mirror_id: None,
855 };
856 }
857
858 let store = self.storage.write().await;
859
860 match store.add_mirror(&req.repo_name, &req.upstream_url, &req.upstream_repo) {
861 Ok(mirror_id) => {
862 if let Err(e) = store.ensure_repo(&req.repo_name) {
863 return crate::types::MirrorSetupResponse {
864 success: false,
865 error: Some(format!("failed to create repo: {e}")),
866 mirror_id: None,
867 };
868 }
869 crate::types::MirrorSetupResponse {
870 success: true,
871 error: None,
872 mirror_id: Some(mirror_id),
873 }
874 }
875 Err(e) => crate::types::MirrorSetupResponse {
876 success: false,
877 error: Some(format!("failed to register mirror: {e}")),
878 mirror_id: None,
879 },
880 }
881 }
882
883 pub async fn handle_mirror_sync(
884 &self,
885 req: crate::types::MirrorSyncRequest,
886 ) -> crate::types::MirrorSyncResponse {
887 let store = self.storage.write().await;
888
889 let mirror_info = match store.get_mirror(req.mirror_id) {
890 Ok(Some(info)) => info,
891 Ok(None) => {
892 return crate::types::MirrorSyncResponse {
893 success: false,
894 error: Some(format!("mirror {} not found", req.mirror_id)),
895 patches_synced: 0,
896 branches_synced: 0,
897 };
898 }
899 Err(e) => {
900 return crate::types::MirrorSyncResponse {
901 success: false,
902 error: Some(format!("database error: {e}")),
903 patches_synced: 0,
904 branches_synced: 0,
905 };
906 }
907 };
908
909 let (local_repo, upstream_url, upstream_repo, _, _) = mirror_info;
910
911 if let Err(e) = validate_mirror_url(&upstream_url) {
912 return crate::types::MirrorSyncResponse {
913 success: false,
914 error: Some(format!("invalid upstream URL: {e}")),
915 patches_synced: 0,
916 branches_synced: 0,
917 };
918 }
919
920 if let Err(e) = store.update_mirror_status(req.mirror_id, "syncing", None) {
921 return crate::types::MirrorSyncResponse {
922 success: false,
923 error: Some(format!("failed to update status: {e}")),
924 patches_synced: 0,
925 branches_synced: 0,
926 };
927 }
928
929 drop(store);
930
931 let upstream_pull = crate::types::PullRequest {
932 repo_id: upstream_repo,
933 known_branches: vec![],
934 max_depth: None,
935 };
936
937 let client = reqwest::Client::new();
938 let pull_resp = match client
939 .post(format!("{upstream_url}/pull"))
940 .json(&upstream_pull)
941 .send()
942 .await
943 {
944 Ok(resp) => resp,
945 Err(e) => {
946 let store = self.storage.write().await;
947 if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
948 tracing::warn!("Failed to update mirror status: {}", e);
949 }
950 return crate::types::MirrorSyncResponse {
951 success: false,
952 error: Some(format!("failed to reach upstream: {e}")),
953 patches_synced: 0,
954 branches_synced: 0,
955 };
956 }
957 };
958
959 let pull_result: crate::types::PullResponse = match pull_resp.json().await {
960 Ok(r) => r,
961 Err(e) => {
962 let store = self.storage.write().await;
963 if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
964 tracing::warn!("Failed to update mirror status: {}", e);
965 }
966 return crate::types::MirrorSyncResponse {
967 success: false,
968 error: Some(format!("failed to parse upstream response: {e}")),
969 patches_synced: 0,
970 branches_synced: 0,
971 };
972 }
973 };
974
975 if !pull_result.success {
976 let store = self.storage.write().await;
977 if let Err(e) = store.update_mirror_status(req.mirror_id, "error", None) {
978 tracing::warn!("Failed to update mirror status: {}", e);
979 }
980 return crate::types::MirrorSyncResponse {
981 success: false,
982 error: pull_result.error,
983 patches_synced: 0,
984 branches_synced: 0,
985 };
986 }
987
988 let store = self.storage.write().await;
989 let mut patches_synced = 0u64;
990
991 for blob in &pull_result.blobs {
992 let hex = hash_to_hex(&blob.hash);
993 let Ok(data) = base64_decode(&blob.data) else { continue };
994 if let Err(e) = self.blob_store(&store, &local_repo, &hex, &data) {
995 tracing::warn!("Failed to store blob during mirror sync: {}", e);
996 }
997 }
998
999 for patch in &pull_result.patches {
1000 let inserted = store.insert_patch(&local_repo, patch).unwrap_or_else(|e| {
1001 tracing::warn!("Failed to insert patch during mirror sync: {e}");
1002 false
1003 });
1004 if inserted {
1005 patches_synced += 1;
1006 }
1007 }
1008
1009 let branches_synced = pull_result.branches.len() as u64;
1010 for branch in &pull_result.branches {
1011 let target_hex = hash_to_hex(&branch.target_id);
1012 if let Err(e) = store.set_branch(&local_repo, &branch.name, &target_hex) {
1013 tracing::warn!("Failed to update branch during mirror sync: {}", e);
1014 }
1015 }
1016
1017 let now = std::time::SystemTime::now()
1018 .duration_since(std::time::UNIX_EPOCH)
1019 .unwrap_or_default()
1020 .as_secs() as i64;
1021 if let Err(e) = store.update_mirror_status(req.mirror_id, "idle", Some(now)) {
1022 tracing::warn!("Failed to update mirror status: {}", e);
1023 }
1024
1025 crate::types::MirrorSyncResponse {
1026 success: true,
1027 error: None,
1028 patches_synced,
1029 branches_synced,
1030 }
1031 }
1032
1033 pub async fn handle_mirror_status(
1034 &self,
1035 req: crate::types::MirrorStatusRequest,
1036 ) -> crate::types::MirrorStatusResponse {
1037 let store = self.storage.read().await;
1038
1039 let mirrors = store.list_mirrors().unwrap_or_else(|e| {
1040 tracing::warn!("store list_mirrors failed: {e}");
1041 Default::default()
1042 });
1043
1044 let entries: Vec<crate::types::MirrorStatusEntry> = mirrors
1045 .into_iter()
1046 .filter(|m| {
1047 if let Some(mid) = req.mirror_id
1048 && m.0 != mid
1049 {
1050 return false;
1051 }
1052 if let Some(ref name) = req.repo_name
1053 && &m.1 != name
1054 {
1055 return false;
1056 }
1057 true
1058 })
1059 .map(|m| crate::types::MirrorStatusEntry {
1060 mirror_id: m.0,
1061 repo_name: m.1,
1062 upstream_url: m.2,
1063 upstream_repo: m.3,
1064 last_sync: m.4.map(|v| v as u64),
1065 status: m.5,
1066 })
1067 .collect();
1068
1069 crate::types::MirrorStatusResponse {
1070 success: true,
1071 error: None,
1072 mirrors: entries,
1073 }
1074 }
1075
1076 pub async fn handle_pull_v2(
1077 &self,
1078 req: crate::types::PullRequestV2,
1079 ) -> crate::types::PullResponseV2 {
1080 let store = self.storage.read().await;
1081
1082 let exists = match store.repo_exists(&req.repo_id) {
1083 Ok(e) => e,
1084 Err(e) => {
1085 tracing::error!("Failed to check repo existence: {e}");
1086 return crate::types::PullResponseV2 {
1087 success: false,
1088 error: Some(format!("database error: {e}")),
1089 patches: vec![],
1090 branches: vec![],
1091 blobs: vec![],
1092 deltas: vec![],
1093 protocol_version: crate::types::PROTOCOL_VERSION_V2,
1094 };
1095 }
1096 };
1097 if !exists {
1098 return crate::types::PullResponseV2 {
1099 success: false,
1100 error: Some(format!("repo not found: {}", req.repo_id)),
1101 patches: vec![],
1102 branches: vec![],
1103 blobs: vec![],
1104 deltas: vec![],
1105 protocol_version: crate::types::PROTOCOL_VERSION_V2,
1106 };
1107 }
1108
1109 let all_patches = store.get_all_patches_unbounded(&req.repo_id).unwrap_or_else(|e| {
1110 tracing::warn!("store get_all_patches failed: {e}");
1111 Default::default()
1112 });
1113 let client_ancestors = collect_ancestors(&all_patches, &req.known_branches);
1114 let mut new_patches = collect_new_patches(&all_patches, &client_ancestors);
1115
1116 if let Some(depth) = req.max_depth {
1117 new_patches.truncate(depth as usize);
1118 }
1119
1120 let branches = store.get_branches(&req.repo_id).unwrap_or_else(|e| {
1121 tracing::warn!("store get_branches failed: {e}");
1122 Default::default()
1123 });
1124
1125 let mut needed_hashes: std::collections::HashSet<String> = std::collections::HashSet::new();
1126 for patch in &new_patches {
1127 if patch.operation_type == "batch" {
1128 if let Ok(decoded) = base64_decode(&patch.payload)
1129 && let Ok(changes) = serde_json::from_str::<Vec<serde_json::Value>>(
1130 &String::from_utf8_lossy(&decoded),
1131 )
1132 {
1133 for change in &changes {
1134 if let Some(payload_val) = change.get("payload").and_then(|v| v.as_array())
1135 {
1136 let hex_bytes: Vec<u8> = payload_val
1137 .iter()
1138 .filter_map(|v| v.as_u64().map(|n| n as u8))
1139 .collect();
1140 if let Ok(hex) = String::from_utf8(hex_bytes) {
1141 needed_hashes.insert(hex);
1142 }
1143 }
1144 }
1145 }
1146 } else if !patch.payload.is_empty() {
1147 let hex = if patch.payload.chars().all(|c| c.is_ascii_hexdigit()) {
1148 patch.payload.clone()
1149 } else if let Ok(decoded) = base64_decode(&patch.payload) {
1150 String::from_utf8_lossy(&decoded).to_string()
1151 } else {
1152 patch.payload.clone()
1153 };
1154 needed_hashes.insert(hex);
1155 }
1156 }
1157
1158 let known_hash_set: std::collections::HashSet<String> = req
1159 .known_blob_hashes
1160 .iter()
1161 .map(|h| h.value.clone())
1162 .collect();
1163
1164 let mut blobs = Vec::new();
1165 let mut deltas = Vec::new();
1166
1167 if req.capabilities.supports_delta {
1168 for needed_hash in &needed_hashes {
1169 let Ok(Some(target_data)) = self.blob_get(&store, &req.repo_id, needed_hash) else {
1170 if let Ok(b) = store.get_blobs(
1171 &req.repo_id,
1172 &std::collections::HashSet::from([needed_hash.clone()]),
1173 ) && let Some(blob) = b.into_iter().next()
1174 {
1175 blobs.push(blob);
1176 }
1177 continue;
1178 };
1179
1180 if known_hash_set.contains(needed_hash) {
1181 let Ok(Some(base_data)) = self.blob_get(&store, &req.repo_id, needed_hash) else {
1182 blobs.push(BlobRef {
1183 hash: HashProto {
1184 value: needed_hash.clone(),
1185 },
1186 data: base64_encode(&target_data),
1187 truncated: false,
1188 });
1189 continue;
1190 };
1191
1192 if base_data == target_data {
1193 continue;
1194 }
1195
1196 let (_base_copy, delta_bytes) =
1197 suture_protocol::compute_delta(&base_data, &target_data);
1198
1199 if delta_bytes.len() < target_data.len() {
1200 deltas.push(BlobDelta {
1201 base_hash: HashProto {
1202 value: needed_hash.clone(),
1203 },
1204 target_hash: HashProto {
1205 value: needed_hash.clone(),
1206 },
1207 encoding: DeltaEncoding::BinaryPatch,
1208 delta_data: base64_encode(&delta_bytes),
1209 });
1210 } else {
1211 blobs.push(BlobRef {
1212 hash: HashProto {
1213 value: needed_hash.clone(),
1214 },
1215 data: base64_encode(&target_data),
1216 truncated: false,
1217 });
1218 }
1219 } else {
1220 blobs.push(BlobRef {
1221 hash: HashProto {
1222 value: needed_hash.clone(),
1223 },
1224 data: base64_encode(&target_data),
1225 truncated: false,
1226 });
1227 }
1228 }
1229 } else {
1230 blobs = store
1231 .get_blobs(&req.repo_id, &needed_hashes)
1232 .unwrap_or_else(|e| {
1233 tracing::error!("Failed to get blobs for repo {}: {e}", req.repo_id);
1234 Default::default()
1235 });
1236 }
1237
1238 crate::types::PullResponseV2 {
1239 success: true,
1240 error: None,
1241 patches: new_patches,
1242 branches,
1243 blobs,
1244 deltas,
1245 protocol_version: crate::types::PROTOCOL_VERSION_V2,
1246 }
1247 }
1248
1249 pub async fn handle_push_v2(
1250 &self,
1251 req: crate::types::PushRequestV2,
1252 ) -> Result<PushResponse, (StatusCode, PushResponse)> {
1253 if let Some(ref sig_bytes) = req.signature {
1254 let store = self.storage.read().await;
1255 let has_keys = match store.has_authorized_keys() {
1256 Ok(v) => v,
1257 Err(e) => {
1258 tracing::error!("Failed to check authorized keys: {e}");
1259 return Err((
1260 StatusCode::INTERNAL_SERVER_ERROR,
1261 PushResponse {
1262 success: false,
1263 error: Some("database error".to_owned()),
1264 existing_patches: vec![],
1265 },
1266 ));
1267 }
1268 };
1269 if has_keys {
1270 let v1_req = PushRequest {
1271 repo_id: req.repo_id.clone(),
1272 patches: req.patches.clone(),
1273 branches: req.branches.clone(),
1274 blobs: req.blobs.clone(),
1275 signature: req.signature.clone(),
1276 known_branches: req.known_branches.clone(),
1277 force: req.force,
1278 };
1279 if let Err(e) = verify_push_signature(&store, &v1_req, sig_bytes) {
1280 return Err((
1281 StatusCode::FORBIDDEN,
1282 PushResponse {
1283 success: false,
1284 error: Some(format!("authentication failed: {e}")),
1285 existing_patches: vec![],
1286 },
1287 ));
1288 }
1289 }
1290 } else if !self.no_auth {
1291 let store = self.storage.read().await;
1292 let has_keys = store.has_authorized_keys().unwrap_or_else(|e| {
1293 tracing::error!("Failed to check authorized keys: {e}");
1294 true
1295 });
1296 let has_tokens = store.has_tokens().unwrap_or_else(|e| {
1297 tracing::error!("Failed to check tokens: {e}");
1298 true
1299 });
1300 if has_keys || has_tokens {
1301 return Err((
1302 StatusCode::FORBIDDEN,
1303 PushResponse {
1304 success: false,
1305 error: Some("authentication required: no signature provided".to_owned()),
1306 existing_patches: vec![],
1307 },
1308 ));
1309 }
1310 }
1311
1312 let store = self.storage.write().await;
1313 if let Err(e) = store.ensure_repo(&req.repo_id) {
1314 return Err((
1315 StatusCode::INTERNAL_SERVER_ERROR,
1316 PushResponse {
1317 success: false,
1318 error: Some(format!("storage error: {e}")),
1319 existing_patches: vec![],
1320 },
1321 ));
1322 }
1323
1324 let mut existing_patches = Vec::new();
1325
1326 for delta in &req.deltas {
1327 if matches!(delta.encoding, DeltaEncoding::BinaryPatch) {
1328 let base_hex = hash_to_hex(&delta.base_hash);
1329 let target_hex = hash_to_hex(&delta.target_hash);
1330 let Ok(Some(base_data)) = self.blob_get(&store, &req.repo_id, &base_hex) else { continue };
1331 let Ok(delta_bytes) = base64_decode(&delta.delta_data) else { continue };
1332 let reconstructed = suture_protocol::apply_delta(&base_data, &delta_bytes);
1333 if let Err(e) = self.blob_store(&store, &req.repo_id, &target_hex, &reconstructed) {
1334 return Err((
1335 StatusCode::INTERNAL_SERVER_ERROR,
1336 PushResponse {
1337 success: false,
1338 error: Some(format!("storage error reconstructing delta blob: {e}")),
1339 existing_patches: vec![],
1340 },
1341 ));
1342 }
1343 } else if matches!(delta.encoding, DeltaEncoding::FullBlob) {
1344 let target_hex = hash_to_hex(&delta.target_hash);
1345 let data = match base64_decode(&delta.delta_data) {
1346 Ok(d) => d,
1347 Err(e) => {
1348 return Err((
1349 StatusCode::BAD_REQUEST,
1350 PushResponse {
1351 success: false,
1352 error: Some(format!("invalid base64 in delta blob: {e}")),
1353 existing_patches: vec![],
1354 },
1355 ));
1356 }
1357 };
1358 if let Err(e) = self.blob_store(&store, &req.repo_id, &target_hex, &data) {
1359 return Err((
1360 StatusCode::INTERNAL_SERVER_ERROR,
1361 PushResponse {
1362 success: false,
1363 error: Some(format!("storage error: {e}")),
1364 existing_patches: vec![],
1365 },
1366 ));
1367 }
1368 }
1369 }
1370
1371 for blob in &req.blobs {
1372 let hex = hash_to_hex(&blob.hash);
1373 let data = match base64_decode(&blob.data) {
1374 Ok(d) => d,
1375 Err(e) => {
1376 return Err((
1377 StatusCode::BAD_REQUEST,
1378 PushResponse {
1379 success: false,
1380 error: Some(format!("invalid base64 in blob: {e}")),
1381 existing_patches: vec![],
1382 },
1383 ));
1384 }
1385 };
1386 if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
1387 return Err((
1388 StatusCode::INTERNAL_SERVER_ERROR,
1389 PushResponse {
1390 success: false,
1391 error: Some(format!("storage error: {e}")),
1392 existing_patches: vec![],
1393 },
1394 ));
1395 }
1396 }
1397
1398 for patch in &req.patches {
1399 let inserted = match store.insert_patch(&req.repo_id, patch) {
1400 Ok(i) => i,
1401 Err(e) => {
1402 return Err((
1403 StatusCode::INTERNAL_SERVER_ERROR,
1404 PushResponse {
1405 success: false,
1406 error: Some(format!("storage error: {e}")),
1407 existing_patches: vec![],
1408 },
1409 ));
1410 }
1411 };
1412 if !inserted {
1413 existing_patches.push(patch.id.clone());
1414 }
1415 }
1416
1417 for patch in &req.patches {
1418 if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
1419 tracing::warn!("Failed to log operation: {}", e);
1420 }
1421 }
1422
1423 for branch in &req.branches {
1424 let target_hex = hash_to_hex(&branch.target_id);
1425
1426 if !req.force
1427 && let Some(ref known) = req.known_branches
1428 && let Some(known_branch) = known.iter().find(|kb| kb.name == branch.name)
1429 {
1430 let known_target = hash_to_hex(&known_branch.target_id);
1431 if known_target != target_hex
1432 && let Ok(Some(current_target)) =
1433 store.get_branch_target(&req.repo_id, &branch.name)
1434 && !store
1435 .is_ancestor(&req.repo_id, ¤t_target, &target_hex)
1436 .unwrap_or_else(|e| {
1437 tracing::warn!("Failed to check ancestry: {e}");
1438 true
1439 })
1440 {
1441 return Err((
1442 StatusCode::CONFLICT,
1443 PushResponse {
1444 success: false,
1445 error: Some(format!(
1446 "branch '{}' rejected: non-fast-forward push (use --force to override)",
1447 branch.name
1448 )),
1449 existing_patches: vec![],
1450 },
1451 ));
1452 }
1453 }
1454
1455 if store
1456 .is_branch_protected(&req.repo_id, &branch.name)
1457 .unwrap_or_else(|e| {
1458 tracing::warn!("Failed to check branch protection: {e}");
1459 false
1460 })
1461 {
1462 let push_authors: std::collections::HashSet<&str> =
1463 req.patches.iter().map(|p| p.author.as_str()).collect();
1464 let is_owner =
1465 push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
1466 if !is_owner {
1467 return Err((
1468 StatusCode::FORBIDDEN,
1469 PushResponse {
1470 success: false,
1471 error: Some(format!(
1472 "branch '{}' is protected and can only be updated by its owner",
1473 branch.name
1474 )),
1475 existing_patches: vec![],
1476 },
1477 ));
1478 }
1479 }
1480
1481 if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
1482 return Err((
1483 StatusCode::INTERNAL_SERVER_ERROR,
1484 PushResponse {
1485 success: false,
1486 error: Some(format!("storage error: {e}")),
1487 existing_patches: vec![],
1488 },
1489 ));
1490 }
1491 }
1492
1493 for branch in &req.branches {
1494 let target_hex = hash_to_hex(&branch.target_id);
1495 if let Err(e) = store.log_operation(
1496 "set",
1497 "branches",
1498 &format!("{}:{}", req.repo_id, branch.name),
1499 Some(&target_hex),
1500 ) {
1501 tracing::warn!("Failed to log operation: {}", e);
1502 }
1503 }
1504
1505 let repo_id = req.repo_id.clone();
1506 let patch_data = serde_json::json!({
1507 "patch_count": req.patches.len(),
1508 "branch_count": req.branches.len(),
1509 "existing_patches": existing_patches.clone(),
1510 });
1511 let manager = Arc::clone(&self.webhook_manager);
1512 let storage = Arc::clone(&self.storage);
1513 tokio::spawn(async move {
1514 let hooks = {
1515 let store = storage.read().await;
1516 store.list_webhooks(&repo_id).unwrap_or_else(|e| {
1517 tracing::warn!("store list_webhooks failed: {e}");
1518 Default::default()
1519 })
1520 };
1521 if !hooks.is_empty() {
1522 let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
1523 if result.failed > 0 {
1524 tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
1525 }
1526 }
1527 });
1528
1529 Ok(PushResponse {
1530 success: true,
1531 error: None,
1532 existing_patches,
1533 })
1534 }
1535
1536 pub async fn handle_batch_push(
1537 &self,
1538 req: BatchPatchRequest,
1539 ) -> Result<PushResponse, (StatusCode, PushResponse)> {
1540 let mut existing_patches = Vec::new();
1541
1542 let store = self.storage.write().await;
1543 if let Err(e) = store.ensure_repo(&req.repo_id) {
1544 let msg = format!("storage error: {e}");
1545 return Err((
1546 StatusCode::INTERNAL_SERVER_ERROR,
1547 PushResponse {
1548 success: false,
1549 error: Some(msg),
1550 existing_patches: vec![],
1551 },
1552 ));
1553 }
1554
1555 for blob in &req.blobs {
1556 let hex = hash_to_hex(&blob.hash);
1557 let data = match base64_decode(&blob.data) {
1558 Ok(d) => d,
1559 Err(e) => {
1560 let msg = format!("invalid base64 in blob: {e}");
1561 return Err((
1562 StatusCode::BAD_REQUEST,
1563 PushResponse {
1564 success: false,
1565 error: Some(msg),
1566 existing_patches: vec![],
1567 },
1568 ));
1569 }
1570 };
1571 if let Err(e) = self.blob_store(&store, &req.repo_id, &hex, &data) {
1572 let msg = format!("storage error: {e}");
1573 return Err((
1574 StatusCode::INTERNAL_SERVER_ERROR,
1575 PushResponse {
1576 success: false,
1577 error: Some(msg),
1578 existing_patches: vec![],
1579 },
1580 ));
1581 }
1582 }
1583
1584 for patch in &req.patches {
1585 let inserted = match store.insert_patch(&req.repo_id, patch) {
1586 Ok(i) => i,
1587 Err(e) => {
1588 let msg = format!("storage error: {e}");
1589 return Err((
1590 StatusCode::INTERNAL_SERVER_ERROR,
1591 PushResponse {
1592 success: false,
1593 error: Some(msg),
1594 existing_patches: vec![],
1595 },
1596 ));
1597 }
1598 };
1599 if !inserted {
1600 existing_patches.push(patch.id.clone());
1601 }
1602 }
1603
1604 for patch in &req.patches {
1605 if let Err(e) = store.log_operation("insert", "patches", &hash_to_hex(&patch.id), None) {
1606 tracing::warn!("Failed to log operation: {}", e);
1607 }
1608 }
1609
1610 for branch in &req.branches {
1611 let target_hex = hash_to_hex(&branch.target_id);
1612
1613 if store
1614 .is_branch_protected(&req.repo_id, &branch.name)
1615 .unwrap_or_else(|e| {
1616 tracing::warn!("Failed to check branch protection: {e}");
1617 false
1618 })
1619 {
1620 let push_authors: std::collections::HashSet<&str> =
1621 req.patches.iter().map(|p| p.author.as_str()).collect();
1622 let is_owner =
1623 push_authors.len() == 1 && push_authors.contains(branch.name.as_str());
1624 if !is_owner && !req.force {
1625 let msg = format!(
1626 "branch '{}' is protected and can only be updated by its owner",
1627 branch.name
1628 );
1629 return Err((
1630 StatusCode::FORBIDDEN,
1631 PushResponse {
1632 success: false,
1633 error: Some(msg),
1634 existing_patches: vec![],
1635 },
1636 ));
1637 }
1638 }
1639
1640 if let Err(e) = store.set_branch(&req.repo_id, &branch.name, &target_hex) {
1641 let msg = format!("storage error: {e}");
1642 return Err((
1643 StatusCode::INTERNAL_SERVER_ERROR,
1644 PushResponse {
1645 success: false,
1646 error: Some(msg),
1647 existing_patches: vec![],
1648 },
1649 ));
1650 }
1651 }
1652
1653 for branch in &req.branches {
1654 let target_hex = hash_to_hex(&branch.target_id);
1655 if let Err(e) = store.log_operation(
1656 "set",
1657 "branches",
1658 &format!("{}:{}", req.repo_id, branch.name),
1659 Some(&target_hex),
1660 ) {
1661 tracing::warn!("Failed to log operation: {}", e);
1662 }
1663 }
1664
1665 let repo_id = req.repo_id.clone();
1666 let patch_data = serde_json::json!({
1667 "patch_count": req.patches.len(),
1668 "branch_count": req.branches.len(),
1669 "existing_patches": existing_patches.clone(),
1670 });
1671 let manager = Arc::clone(&self.webhook_manager);
1672 let storage = Arc::clone(&self.storage);
1673 tokio::spawn(async move {
1674 let hooks = {
1675 let store = storage.read().await;
1676 store.list_webhooks(&repo_id).unwrap_or_else(|e| {
1677 tracing::warn!("store list_webhooks failed: {e}");
1678 Default::default()
1679 })
1680 };
1681 if !hooks.is_empty() {
1682 let result = manager.trigger(&hooks, "push", &repo_id, patch_data).await;
1683 if result.failed > 0 {
1684 tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
1685 }
1686 }
1687 });
1688
1689 Ok(PushResponse {
1690 success: true,
1691 error: None,
1692 existing_patches,
1693 })
1694 }
1695
1696 #[cfg(feature = "raft-cluster")]
1697 pub async fn apply_raft_command(&self, cmd: crate::raft::HubCommand) -> Result<(), String> {
1698 use crate::raft::HubCommand;
1699
1700 let store = self.storage.write().await;
1701
1702 match cmd {
1703 HubCommand::CreateRepo { repo_id } => {
1704 store.ensure_repo(&repo_id).map_err(|e| e.to_string())?;
1705 Ok(())
1706 }
1707 HubCommand::DeleteRepo { repo_id } => {
1708 store.delete_repo(&repo_id).map_err(|e| e.to_string())
1709 }
1710 HubCommand::StoreBlob { hash, data } => store
1711 .store_blob("_raft_default", &hash, &data)
1712 .map_err(|e| e.to_string()),
1713 HubCommand::DeleteBlob { hash } => {
1714 if let Err(e) = store.delete_blob("_raft_default", &hash) {
1715 tracing::warn!("Failed to delete blob: {}", e);
1716 }
1717 Ok(())
1718 }
1719 HubCommand::CreateBranch {
1720 repo_id,
1721 branch,
1722 target,
1723 }
1724 | HubCommand::UpdateBranch {
1725 repo_id,
1726 branch,
1727 target,
1728 } => store
1729 .set_branch(&repo_id, &branch, &target)
1730 .map_err(|e| e.to_string()),
1731 HubCommand::DeleteBranch { repo_id, branch } => store
1732 .delete_branch(&repo_id, &branch)
1733 .map_err(|e| e.to_string()),
1734 HubCommand::StorePatch {
1735 repo_id,
1736 patch_id,
1737 patch_data,
1738 } => {
1739 let patch: crate::types::PatchProto = match serde_json::from_slice(&patch_data) {
1740 Ok(p) => p,
1741 Err(e) => return Err(format!("failed to deserialize patch: {e}")),
1742 };
1743 let expected_hex = patch_id;
1744 let actual_hex = hash_to_hex(&patch.id);
1745 if actual_hex != expected_hex {
1746 return Err(format!(
1747 "patch_id mismatch: expected {expected_hex}, got {actual_hex}"
1748 ));
1749 }
1750 store
1751 .insert_patch(&repo_id, &patch)
1752 .map_err(|e| e.to_string())?;
1753 Ok(())
1754 }
1755 }
1756 }
1757}
1758
1759fn validate_mirror_url(url: &str) -> Result<(), &'static str> {
1760 let parsed = url::Url::parse(url).map_err(|_| "invalid URL syntax")?;
1761
1762 match parsed.scheme() {
1763 "http" | "https" => {}
1764 _ => return Err("only http and https URLs are allowed"),
1765 }
1766
1767 let host = parsed.host_str().ok_or("URL must have a host")?;
1768
1769 if let Ok(addr) = host.parse::<std::net::IpAddr>() {
1770 match addr {
1771 std::net::IpAddr::V4(v4) => {
1772 if v4.is_loopback() || v4.is_private() || v4.is_link_local() {
1773 return Err("private/internal IP addresses are not allowed");
1774 }
1775 }
1776 std::net::IpAddr::V6(v6) => {
1777 if v6.is_loopback() || v6.is_unicast_link_local() {
1778 return Err("private/internal IP addresses are not allowed");
1779 }
1780 }
1781 }
1782 }
1783
1784 if host == "169.254.169.254" || host == "metadata.google.internal" {
1785 return Err("metadata endpoints are not allowed");
1786 }
1787
1788 Ok(())
1789}
1790
1791fn verify_push_signature(
1792 store: &HubStorage,
1793 req: &PushRequest,
1794 sig_bytes: &[u8],
1795) -> Result<(), String> {
1796 if sig_bytes.len() != 64 {
1797 return Err("signature must be 64 bytes".to_owned());
1798 }
1799 let signature = Signature::from_bytes(
1800 sig_bytes
1801 .try_into()
1802 .map_err(|_| "invalid signature length")?,
1803 );
1804
1805 let canonical = canonical_push_bytes(req);
1806
1807 let mut authors: HashSet<&str> = HashSet::new();
1808 for patch in &req.patches {
1809 authors.insert(&patch.author);
1810 }
1811
1812 for author in &authors {
1813 let pub_key_bytes = match store.get_authorized_key(author) {
1814 Ok(Some(bytes)) => bytes,
1815 Ok(None) => continue,
1816 Err(e) => {
1817 tracing::warn!("Failed to get authorized key for '{}': {e}", author);
1818 continue;
1819 }
1820 };
1821 if pub_key_bytes.len() != 32 {
1822 continue;
1823 }
1824 let pub_key_array: [u8; 32] = pub_key_bytes
1825 .try_into()
1826 .map_err(|_| "invalid public key length")?;
1827 let verifying_key = VerifyingKey::from_bytes(&pub_key_array)
1828 .map_err(|e| format!("invalid public key: {e}"))?;
1829 if verifying_key.verify(&canonical, &signature).is_ok() {
1830 return Ok(());
1831 }
1832 }
1833
1834 Err("no matching authorized key found for signature".to_owned())
1835}
1836
1837fn collect_ancestors(
1838 all_patches: &[PatchProto],
1839 known_branches: &[BranchProto],
1840) -> HashSet<String> {
1841 let patch_map: std::collections::HashMap<String, &PatchProto> = all_patches
1842 .iter()
1843 .map(|p| (hash_to_hex(&p.id), p))
1844 .collect();
1845
1846 let mut ancestors = HashSet::new();
1847 let mut stack: Vec<String> = known_branches
1848 .iter()
1849 .filter_map(|b| {
1850 let hex = hash_to_hex(&b.target_id);
1851 if patch_map.contains_key(&hex) {
1852 Some(hex)
1853 } else {
1854 None
1855 }
1856 })
1857 .collect();
1858
1859 while let Some(id_hex) = stack.pop() {
1860 if ancestors.insert(id_hex.clone())
1861 && let Some(patch) = patch_map.get(&id_hex)
1862 {
1863 for parent in &patch.parent_ids {
1864 let parent_hex = hash_to_hex(parent);
1865 if !ancestors.contains(&parent_hex) {
1866 stack.push(parent_hex);
1867 }
1868 }
1869 }
1870 }
1871
1872 ancestors
1873}
1874
1875fn collect_new_patches(
1876 all_patches: &[PatchProto],
1877 client_ancestors: &HashSet<String>,
1878) -> Vec<PatchProto> {
1879 let patch_map: std::collections::HashMap<String, &PatchProto> = all_patches
1880 .iter()
1881 .map(|p| (hash_to_hex(&p.id), p))
1882 .collect();
1883
1884 let mut reachable: HashSet<String> = HashSet::new();
1885 let mut stack: Vec<String> = all_patches.iter().map(|p| hash_to_hex(&p.id)).collect();
1886
1887 while let Some(id_hex) = stack.pop() {
1888 if reachable.insert(id_hex.clone())
1889 && let Some(patch) = patch_map.get(&id_hex)
1890 {
1891 for parent in &patch.parent_ids {
1892 let parent_hex = hash_to_hex(parent);
1893 if !reachable.contains(&parent_hex) {
1894 stack.push(parent_hex);
1895 }
1896 }
1897 }
1898 }
1899
1900 let mut new_ids: Vec<String> = reachable
1901 .into_iter()
1902 .filter(|id| !client_ancestors.contains(id))
1903 .collect();
1904 new_ids.sort();
1906
1907 let mut seen: HashSet<String> = HashSet::new();
1908 let mut stack: Vec<String> = new_ids;
1909
1910 while let Some(id_hex) = stack.pop() {
1911 if seen.insert(id_hex.clone())
1912 && let Some(patch) = patch_map.get(&id_hex)
1913 {
1914 for parent in &patch.parent_ids {
1915 let parent_hex = hash_to_hex(parent);
1916 if !client_ancestors.contains(&parent_hex) && !seen.contains(&parent_hex) {
1917 stack.push(parent_hex);
1918 }
1919 }
1920 }
1921 }
1922
1923 let mut result: Vec<PatchProto> = seen
1924 .into_iter()
1925 .filter_map(|id| patch_map.get(&id).map(|p| (*p).clone()))
1926 .collect();
1927
1928 topological_sort(&mut result);
1929 result
1930}
1931
1932fn topological_sort(patches: &mut Vec<PatchProto>) {
1933 let index_map: std::collections::HashMap<String, usize> = patches
1934 .iter()
1935 .enumerate()
1936 .map(|(i, p)| (hash_to_hex(&p.id), i))
1937 .collect();
1938
1939 let n = patches.len();
1940 let mut visited = vec![false; n];
1941 let mut order = Vec::with_capacity(n);
1942
1943 for i in 0..n {
1944 if !visited[i] {
1945 dfs(i, patches, &index_map, &mut visited, &mut order);
1946 }
1947 }
1948
1949 let sorted: Vec<PatchProto> = order.into_iter().map(|i| patches[i].clone()).collect();
1950 *patches = sorted;
1951}
1952
1953fn dfs(
1954 idx: usize,
1955 patches: &[PatchProto],
1956 index_map: &std::collections::HashMap<String, usize>,
1957 visited: &mut [bool],
1958 order: &mut Vec<usize>,
1959) {
1960 visited[idx] = true;
1961 let patch = &patches[idx];
1962 for parent in &patch.parent_ids {
1963 let parent_hex = hash_to_hex(parent);
1964 if let Some(&parent_idx) = index_map.get(&parent_hex)
1965 && !visited[parent_idx]
1966 {
1967 dfs(parent_idx, patches, index_map, visited, order);
1968 }
1969 }
1970 order.push(idx);
1971}
1972
1973async fn check_auth(hub: &SutureHubServer, headers: &HeaderMap) -> Result<(), StatusCode> {
1974 if hub.no_auth {
1975 return Ok(());
1976 }
1977
1978 let store = hub.storage.read().await;
1979 let auth_keys_configured = store.has_authorized_keys().unwrap_or_else(|e| {
1980 tracing::error!("Failed to check authorized keys: {e}");
1981 true
1982 });
1983 let tokens_exist = store.has_tokens().unwrap_or_else(|e| {
1984 tracing::error!("Failed to check tokens: {e}");
1985 true
1986 });
1987 drop(store);
1988
1989 if !auth_keys_configured && !tokens_exist {
1990 return Ok(());
1991 }
1992
1993 if let Some(auth_header) = headers.get("authorization")
1994 && let Ok(auth_str) = auth_header.to_str()
1995 && let Some(token) = auth_str.strip_prefix("Bearer ")
1996 {
1997 let store = hub.storage.read().await;
1998 if store.verify_token(token).unwrap_or_else(|e| {
1999 tracing::error!("Failed to verify token: {e}");
2000 false
2001 }) {
2002 return Ok(());
2003 }
2004 }
2005
2006 Err(StatusCode::UNAUTHORIZED)
2007}
2008
2009async fn resolve_user(hub: &SutureHubServer, headers: &HeaderMap) -> Option<UserInfo> {
2010 if let Some(auth_header) = headers.get("authorization")
2011 && let Ok(auth_str) = auth_header.to_str()
2012 && let Some(token) = auth_str.strip_prefix("Bearer ")
2013 {
2014 let store = hub.storage.read().await;
2015 return match store.get_user_by_token(token) {
2016 Ok(Some(user)) => Some(user),
2017 Ok(None) => None,
2018 Err(e) => {
2019 tracing::warn!("Failed to get user by token: {e}");
2020 None
2021 }
2022 };
2023 }
2024 None
2025}
2026
2027async fn require_role(
2028 hub: &SutureHubServer,
2029 headers: &HeaderMap,
2030 required_role: &Role,
2031) -> Result<UserInfo, StatusCode> {
2032 if hub.no_auth {
2033 return Err(StatusCode::UNAUTHORIZED);
2034 }
2035
2036 let user = resolve_user(hub, headers)
2037 .await
2038 .ok_or(StatusCode::UNAUTHORIZED)?;
2039 let user_role = Role::parse(&user.role);
2040
2041 if user_role >= *required_role {
2042 Ok(user)
2043 } else {
2044 Err(StatusCode::FORBIDDEN)
2045 }
2046}
2047
2048fn generate_api_token() -> String {
2049 let mut bytes = [0u8; 32];
2050 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut bytes);
2051 hex::encode(bytes)
2052}
2053
2054fn generate_random_token() -> String {
2055 let mut bytes = [0u8; 32];
2056 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut bytes);
2057 hex::encode(bytes)
2058}
2059
2060pub async fn push_compressed_handler(
2061 State(hub): State<Arc<SutureHubServer>>,
2062 headers: HeaderMap,
2063 Json(req): Json<PushRequest>,
2064) -> (StatusCode, Json<PushResponse>) {
2065 if let Err(status) = check_auth(&hub, &headers).await {
2066 return (
2067 status,
2068 Json(PushResponse {
2069 success: false,
2070 error: Some("authentication failed".to_owned()),
2071 existing_patches: vec![],
2072 }),
2073 );
2074 }
2075 let mut req = req;
2076 for blob in &mut req.blobs {
2077 let compressed_data = match base64_decode(&blob.data) {
2078 Ok(d) => d,
2079 Err(e) => {
2080 return (
2081 StatusCode::BAD_REQUEST,
2082 Json(PushResponse {
2083 success: false,
2084 error: Some(format!("invalid base64 in compressed blob: {e}")),
2085 existing_patches: vec![],
2086 }),
2087 );
2088 }
2089 };
2090 let decompressed = match suture_protocol::decompress(&compressed_data) {
2091 Ok(d) => d,
2092 Err(e) => {
2093 return (
2094 StatusCode::BAD_REQUEST,
2095 Json(PushResponse {
2096 success: false,
2097 error: Some(e),
2098 existing_patches: vec![],
2099 }),
2100 );
2101 }
2102 };
2103 blob.data = base64_encode(&decompressed);
2104 }
2105 match hub.handle_push(req).await {
2106 Ok(resp) => (StatusCode::OK, Json(resp)),
2107 Err((status, resp)) => (status, Json(resp)),
2108 }
2109}
2110
2111pub async fn pull_compressed_handler(
2112 State(hub): State<Arc<SutureHubServer>>,
2113 headers: HeaderMap,
2114 Json(req): Json<PullRequest>,
2115) -> (StatusCode, Json<PullResponse>) {
2116 if let Err(status) = check_auth(&hub, &headers).await {
2117 return (
2118 status,
2119 Json(PullResponse {
2120 success: false,
2121 error: Some("authentication failed".to_owned()),
2122 patches: vec![],
2123 branches: vec![],
2124 blobs: vec![],
2125 }),
2126 );
2127 }
2128 let mut resp = hub.handle_pull(req).await;
2129 if resp.success {
2130 for blob in &mut resp.blobs {
2131 let Ok(raw) = base64_decode(&blob.data) else { continue };
2132 let Ok(compressed) = suture_protocol::compress(&raw) else { continue };
2133 blob.data = base64_encode(&compressed);
2134 }
2135 }
2136 let status = if resp.success {
2137 StatusCode::OK
2138 } else {
2139 StatusCode::NOT_FOUND
2140 };
2141 (status, Json(resp))
2142}
2143
2144pub async fn push_handler(
2145 State(hub): State<Arc<SutureHubServer>>,
2146 headers: HeaderMap,
2147 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2148 Json(req): Json<PushRequest>,
2149) -> (StatusCode, HeaderMap, Json<PushResponse>) {
2150 let ip = addr.ip().to_string();
2151 if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
2152 let mut hdrs = HeaderMap::new();
2153 if let Ok(val) = retry_after.to_string().parse() {
2154 hdrs.insert(axum::http::header::RETRY_AFTER, val);
2155 }
2156 return (
2157 StatusCode::TOO_MANY_REQUESTS,
2158 hdrs,
2159 Json(PushResponse {
2160 success: false,
2161 error: Some("rate limit exceeded".to_owned()),
2162 existing_patches: vec![],
2163 }),
2164 );
2165 }
2166
2167 if let Err(status) = check_auth(&hub, &headers).await {
2168 return (
2169 status,
2170 HeaderMap::new(),
2171 Json(PushResponse {
2172 success: false,
2173 error: Some("authentication failed".to_owned()),
2174 existing_patches: vec![],
2175 }),
2176 );
2177 }
2178
2179 if !hub.no_auth
2180 && let Some(user) = resolve_user(&hub, &headers).await
2181 && Role::parse(&user.role) < Role::Member
2182 {
2183 return (
2184 StatusCode::FORBIDDEN,
2185 HeaderMap::new(),
2186 Json(PushResponse {
2187 success: false,
2188 error: Some("insufficient permissions: readers cannot push".to_owned()),
2189 existing_patches: vec![],
2190 }),
2191 );
2192 }
2193
2194 match hub.handle_push(req).await {
2195 Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
2196 Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
2197 }
2198}
2199
2200pub async fn pull_handler(
2201 State(hub): State<Arc<SutureHubServer>>,
2202 headers: HeaderMap,
2203 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2204 Json(req): Json<PullRequest>,
2205) -> (StatusCode, HeaderMap, Json<PullResponse>) {
2206 let ip = addr.ip().to_string();
2207 if let Err(retry_after) = hub.check_rate_limit(&ip, "pull") {
2208 let mut hdrs = HeaderMap::new();
2209 if let Ok(val) = retry_after.to_string().parse() {
2210 hdrs.insert(axum::http::header::RETRY_AFTER, val);
2211 }
2212 return (
2213 StatusCode::TOO_MANY_REQUESTS,
2214 hdrs,
2215 Json(PullResponse {
2216 success: false,
2217 error: Some("rate limit exceeded".to_owned()),
2218 patches: vec![],
2219 branches: vec![],
2220 blobs: vec![],
2221 }),
2222 );
2223 }
2224
2225 if let Err(status) = check_auth(&hub, &headers).await {
2226 return (
2227 status,
2228 HeaderMap::new(),
2229 Json(PullResponse {
2230 success: false,
2231 error: Some("authentication failed".to_owned()),
2232 patches: vec![],
2233 branches: vec![],
2234 blobs: vec![],
2235 }),
2236 );
2237 }
2238 let resp = hub.handle_pull(req).await;
2239 let status = if resp.success {
2240 StatusCode::OK
2241 } else {
2242 StatusCode::NOT_FOUND
2243 };
2244 (status, HeaderMap::new(), Json(resp))
2245}
2246
2247pub async fn list_repos_handler(
2248 State(hub): State<Arc<SutureHubServer>>,
2249) -> Json<ListReposResponse> {
2250 Json(hub.handle_list_repos().await)
2251}
2252
2253pub async fn repo_info_handler(
2254 State(hub): State<Arc<SutureHubServer>>,
2255 Path(repo_id): Path<String>,
2256) -> (StatusCode, Json<RepoInfoResponse>) {
2257 let resp = hub.handle_repo_info(&repo_id).await;
2258 let status = if resp.success {
2259 StatusCode::OK
2260 } else {
2261 StatusCode::NOT_FOUND
2262 };
2263 (status, Json(resp))
2264}
2265
2266pub async fn handshake_handler(
2267 Json(req): Json<crate::types::HandshakeRequest>,
2268) -> Json<crate::types::HandshakeResponse> {
2269 let compatible = req.client_version == crate::types::PROTOCOL_VERSION;
2270 Json(crate::types::HandshakeResponse {
2271 server_version: crate::types::PROTOCOL_VERSION,
2272 server_name: "suture-hub".to_owned(),
2273 compatible,
2274 })
2275}
2276
2277pub async fn handshake_get_handler() -> Json<crate::types::HandshakeResponse> {
2280 Json(crate::types::HandshakeResponse {
2281 server_version: crate::types::PROTOCOL_VERSION,
2282 server_name: "suture-hub".to_owned(),
2283 compatible: true,
2284 })
2285}
2286
2287#[derive(Debug, serde::Serialize)]
2288pub struct TokenResponse {
2289 pub token: String,
2290 pub created_at: u64,
2291}
2292
2293pub async fn create_token_handler(
2294 State(hub): State<Arc<SutureHubServer>>,
2295 headers: HeaderMap,
2296 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
2297) -> (StatusCode, HeaderMap, Json<TokenResponse>) {
2298 let ip = addr.ip().to_string();
2299 if let Err(retry_after) = hub.check_rate_limit(&ip, "token_create") {
2300 let mut hdrs = HeaderMap::new();
2301 if let Ok(val) = retry_after.to_string().parse() {
2302 hdrs.insert(axum::http::header::RETRY_AFTER, val);
2303 }
2304 return (
2305 StatusCode::TOO_MANY_REQUESTS,
2306 hdrs,
2307 Json(TokenResponse {
2308 token: String::new(),
2309 created_at: 0,
2310 }),
2311 );
2312 }
2313
2314 if hub.no_auth {
2315 let token = generate_random_token();
2316 let created_at = std::time::SystemTime::now()
2317 .duration_since(std::time::UNIX_EPOCH)
2318 .unwrap_or_default()
2319 .as_secs();
2320 let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2321 let store = hub.storage.write().await;
2322 if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2323 tracing::error!("Failed to store auth token: {}", e);
2324 return (
2325 StatusCode::INTERNAL_SERVER_ERROR,
2326 HeaderMap::new(),
2327 Json(TokenResponse { token: String::new(), created_at: 0 }),
2328 );
2329 }
2330 return (
2331 StatusCode::OK,
2332 HeaderMap::new(),
2333 Json(TokenResponse { token, created_at }),
2334 );
2335 }
2336
2337 let store = hub.storage.read().await;
2338 let tokens_exist = store.has_tokens().unwrap_or_else(|e| {
2339 tracing::error!("Failed to check tokens: {e}");
2340 true
2341 });
2342 let users_exist = store.has_users().unwrap_or_else(|e| {
2343 tracing::error!("Failed to check users: {e}");
2344 true
2345 });
2346 let auth_keys_configured = store.has_authorized_keys().unwrap_or_else(|e| {
2347 tracing::error!("Failed to check authorized keys: {e}");
2348 true
2349 });
2350 drop(store);
2351
2352 if !tokens_exist && !users_exist && !auth_keys_configured {
2353 let token = generate_random_token();
2354 let created_at = std::time::SystemTime::now()
2355 .duration_since(std::time::UNIX_EPOCH)
2356 .unwrap_or_default()
2357 .as_secs();
2358 let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2359 let store = hub.storage.write().await;
2360 if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2361 tracing::error!("Failed to store auth token: {}", e);
2362 return (
2363 StatusCode::INTERNAL_SERVER_ERROR,
2364 HeaderMap::new(),
2365 Json(TokenResponse { token: String::new(), created_at: 0 }),
2366 );
2367 }
2368 return (
2369 StatusCode::OK,
2370 HeaderMap::new(),
2371 Json(TokenResponse { token, created_at }),
2372 );
2373 }
2374
2375 let user = resolve_user(&hub, &headers).await;
2376 if let Some(u) = user {
2377 let role = Role::parse(&u.role);
2378 if role < Role::Admin {
2379 return (
2380 StatusCode::FORBIDDEN,
2381 HeaderMap::new(),
2382 Json(TokenResponse {
2383 token: String::new(),
2384 created_at: 0,
2385 }),
2386 );
2387 }
2388 } else {
2389 let store = hub.storage.read().await;
2390 let valid_token = if let Some(auth_header) = headers.get("authorization")
2391 && let Ok(auth_str) = auth_header.to_str()
2392 && let Some(token) = auth_str.strip_prefix("Bearer ")
2393 {
2394 store.verify_token(token).unwrap_or_else(|e| {
2395 tracing::error!("Failed to verify token: {e}");
2396 false
2397 })
2398 } else {
2399 false
2400 };
2401 drop(store);
2402 if !valid_token {
2403 return (
2404 StatusCode::UNAUTHORIZED,
2405 HeaderMap::new(),
2406 Json(TokenResponse {
2407 token: String::new(),
2408 created_at: 0,
2409 }),
2410 );
2411 }
2412 }
2413
2414 let token = generate_random_token();
2415 let created_at = std::time::SystemTime::now()
2416 .duration_since(std::time::UNIX_EPOCH)
2417 .unwrap_or_default()
2418 .as_secs();
2419 let expires_at = (created_at + (30 * 24 * 60 * 60)) as i64;
2420
2421 let store = hub.storage.write().await;
2422 if let Err(e) = store.store_token(&token, created_at, "cli-generated", expires_at) {
2423 tracing::error!("Failed to store auth token: {e}");
2424 return (
2425 StatusCode::INTERNAL_SERVER_ERROR,
2426 HeaderMap::new(),
2427 Json(TokenResponse {
2428 token: String::new(),
2429 created_at: 0,
2430 }),
2431 );
2432 }
2433
2434 (
2435 StatusCode::OK,
2436 HeaderMap::new(),
2437 Json(TokenResponse { token, created_at }),
2438 )
2439}
2440
2441#[derive(Debug, serde::Serialize)]
2442pub struct VerifyResponse {
2443 pub valid: bool,
2444}
2445
2446pub async fn verify_token_handler(
2447 State(hub): State<Arc<SutureHubServer>>,
2448 Json(auth_req): Json<crate::types::AuthRequest>,
2449) -> Json<VerifyResponse> {
2450 let valid = match &auth_req.method {
2451 crate::types::AuthMethod::Token(token) => {
2452 let store = hub.storage.read().await;
2453 store.verify_token(token).unwrap_or_else(|e| {
2454 tracing::warn!("Failed to verify token: {e}");
2455 false
2456 })
2457 }
2458 _ => false,
2459 };
2460 Json(VerifyResponse { valid })
2461}
2462
2463pub async fn mirror_setup_handler(
2464 State(hub): State<Arc<SutureHubServer>>,
2465 headers: HeaderMap,
2466 Json(req): Json<crate::types::MirrorSetupRequest>,
2467) -> (StatusCode, Json<crate::types::MirrorSetupResponse>) {
2468 if let Err(status) = check_auth(&hub, &headers).await {
2469 let resp = crate::types::MirrorSetupResponse { success: false, mirror_id: None, error: Some("unauthorized".to_owned()) };
2470 return (status, Json(resp));
2471 }
2472 let resp = hub.handle_mirror_setup(req).await;
2473 let status = if resp.success {
2474 StatusCode::OK
2475 } else {
2476 StatusCode::BAD_REQUEST
2477 };
2478 (status, Json(resp))
2479}
2480
2481pub async fn mirror_sync_handler(
2482 State(hub): State<Arc<SutureHubServer>>,
2483 headers: HeaderMap,
2484 Json(req): Json<crate::types::MirrorSyncRequest>,
2485) -> (StatusCode, Json<crate::types::MirrorSyncResponse>) {
2486 if let Err(status) = check_auth(&hub, &headers).await {
2487 let resp = crate::types::MirrorSyncResponse { success: false, error: Some("unauthorized".to_owned()), patches_synced: 0, branches_synced: 0 };
2488 return (status, Json(resp));
2489 }
2490 let store = hub.storage.read().await;
2491 let actual_mirror_id = if req.mirror_id == 0 {
2492 let repo_name = req.local_repo.clone().unwrap_or_default();
2493 match store.get_mirror_by_repo(&repo_name) {
2494 Ok(Some(id)) => id,
2495 _ => {
2496 return (
2497 StatusCode::BAD_REQUEST,
2498 Json(crate::types::MirrorSyncResponse {
2499 success: false,
2500 error: Some("mirror not found by local_repo".to_owned()),
2501 patches_synced: 0,
2502 branches_synced: 0,
2503 }),
2504 );
2505 }
2506 }
2507 } else {
2508 req.mirror_id
2509 };
2510 drop(store);
2511 let actual_req = crate::types::MirrorSyncRequest {
2512 mirror_id: actual_mirror_id,
2513 local_repo: req.local_repo,
2514 remote_url: req.remote_url,
2515 };
2516 let resp = hub.handle_mirror_sync(actual_req).await;
2517 let status = if resp.success {
2518 StatusCode::OK
2519 } else {
2520 StatusCode::INTERNAL_SERVER_ERROR
2521 };
2522 (status, Json(resp))
2523}
2524
2525pub async fn mirror_status_handler(
2526 State(hub): State<Arc<SutureHubServer>>,
2527 Json(req): Json<crate::types::MirrorStatusRequest>,
2528) -> (StatusCode, Json<crate::types::MirrorStatusResponse>) {
2529 let resp = hub.handle_mirror_status(req).await;
2530 (StatusCode::OK, Json(resp))
2531}
2532
2533pub async fn mirror_status_get_handler(
2534 State(hub): State<Arc<SutureHubServer>>,
2535) -> (StatusCode, Json<crate::types::MirrorStatusResponse>) {
2536 let resp = hub
2537 .handle_mirror_status(crate::types::MirrorStatusRequest {
2538 mirror_id: None,
2539 repo_name: None,
2540 })
2541 .await;
2542 (StatusCode::OK, Json(resp))
2543}
2544
2545fn base64_encode(data: &[u8]) -> String {
2546 use base64::Engine;
2547 base64::engine::general_purpose::STANDARD.encode(data)
2548}
2549
2550fn base64_decode(s: &str) -> Result<Vec<u8>, String> {
2551 use base64::Engine;
2552 base64::engine::general_purpose::STANDARD
2553 .decode(s)
2554 .map_err(|e| e.to_string())
2555}
2556
2557pub async fn repo_branches_handler(
2558 State(hub): State<Arc<SutureHubServer>>,
2559 Path(repo_id): Path<String>,
2560) -> (StatusCode, Json<Vec<BranchProto>>) {
2561 let store = hub.storage.read().await;
2562 let branches = store.get_branches(&repo_id).unwrap_or_else(|e| {
2563 tracing::warn!("store get_branches failed: {e}");
2564 Default::default()
2565 });
2566 (StatusCode::OK, Json(branches))
2567}
2568
2569pub async fn repo_patches_handler(
2570 State(hub): State<Arc<SutureHubServer>>,
2571 Path(repo_id): Path<String>,
2572 Query(params): Query<PaginationParams>,
2573) -> (StatusCode, Json<serde_json::Value>) {
2574 let offset = params
2575 .cursor
2576 .as_deref()
2577 .and_then(decode_cursor)
2578 .unwrap_or_else(|| u64::from(params.offset.unwrap_or(0)));
2579 let limit = params.limit.unwrap_or(50);
2580 let (patches, next_cursor) = hub
2581 .handle_repo_patches_cursor(&repo_id, offset, limit)
2582 .await;
2583 (
2584 StatusCode::OK,
2585 Json(serde_json::json!({
2586 "patches": patches,
2587 "next_cursor": next_cursor.unwrap_or_default(),
2588 })),
2589 )
2590}
2591
2592pub async fn repo_tree_handler(
2593 State(hub): State<Arc<SutureHubServer>>,
2594 Path((repo_id, branch)): Path<(String, String)>,
2595) -> (StatusCode, Json<serde_json::Value>) {
2596 let store = hub.storage.read().await;
2597 match store.get_tree_at_branch(&repo_id, &branch) {
2598 Ok(entries) => {
2599 let files: Vec<serde_json::Value> = entries
2600 .into_iter()
2601 .map(|e| {
2602 serde_json::json!({
2603 "path": e.path,
2604 "content_hash": e.content_hash,
2605 })
2606 })
2607 .collect();
2608 (
2609 StatusCode::OK,
2610 Json(serde_json::json!({"success": true, "files": files})),
2611 )
2612 }
2613 Err(e) => (
2614 StatusCode::INTERNAL_SERVER_ERROR,
2615 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2616 ),
2617 }
2618}
2619
2620pub async fn protect_branch_handler(
2621 State(hub): State<Arc<SutureHubServer>>,
2622 headers: HeaderMap,
2623 Path((repo_id, branch)): Path<(String, String)>,
2624) -> (StatusCode, Json<serde_json::Value>) {
2625 if let Err(status) = check_auth(&hub, &headers).await {
2626 return (status, Json(serde_json::json!({"success": false, "error": "unauthorized"})));
2627 }
2628 let store = hub.storage.write().await;
2629 match store.protect_branch(&repo_id, &branch) {
2630 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2631 Err(e) => (
2632 StatusCode::INTERNAL_SERVER_ERROR,
2633 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2634 ),
2635 }
2636}
2637
2638pub async fn unprotect_branch_handler(
2639 State(hub): State<Arc<SutureHubServer>>,
2640 headers: HeaderMap,
2641 Path((repo_id, branch)): Path<(String, String)>,
2642) -> (StatusCode, Json<serde_json::Value>) {
2643 if let Err(status) = check_auth(&hub, &headers).await {
2644 return (status, Json(serde_json::json!({"success": false, "error": "unauthorized"})));
2645 }
2646 let store = hub.storage.write().await;
2647 match store.unprotect_branch(&repo_id, &branch) {
2648 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2649 Err(e) => (
2650 StatusCode::INTERNAL_SERVER_ERROR,
2651 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2652 ),
2653 }
2654}
2655
2656pub async fn create_repo_handler(
2657 State(hub): State<Arc<SutureHubServer>>,
2658 headers: HeaderMap,
2659 Json(req): Json<crate::types::CreateRepoRequest>,
2660) -> (StatusCode, Json<serde_json::Value>) {
2661 if let Err(status) = check_auth(&hub, &headers).await {
2662 return (
2663 status,
2664 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2665 );
2666 }
2667 let store = hub.storage.write().await;
2668 match store.ensure_repo(&req.repo_id) {
2669 Ok(_) => (
2670 StatusCode::CREATED,
2671 Json(serde_json::json!({"success": true, "repo_id": req.repo_id})),
2672 ),
2673 Err(e) => (
2674 StatusCode::INTERNAL_SERVER_ERROR,
2675 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2676 ),
2677 }
2678}
2679
2680pub async fn delete_repo_handler(
2681 State(hub): State<Arc<SutureHubServer>>,
2682 headers: HeaderMap,
2683 Path(repo_id): Path<String>,
2684) -> (StatusCode, Json<serde_json::Value>) {
2685 if let Err(status) = check_auth(&hub, &headers).await {
2686 return (
2687 status,
2688 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2689 );
2690 }
2691 let store = hub.storage.write().await;
2692 match store.delete_repo(&repo_id) {
2693 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2694 Err(e) => (
2695 StatusCode::INTERNAL_SERVER_ERROR,
2696 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2697 ),
2698 }
2699}
2700
2701pub async fn create_branch_handler(
2702 State(hub): State<Arc<SutureHubServer>>,
2703 headers: HeaderMap,
2704 Path(repo_id): Path<String>,
2705 Json(req): Json<crate::types::CreateBranchRequest>,
2706) -> (StatusCode, Json<serde_json::Value>) {
2707 if let Err(status) = check_auth(&hub, &headers).await {
2708 return (
2709 status,
2710 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2711 );
2712 }
2713 let store = hub.storage.write().await;
2714 match store.set_branch(&repo_id, &req.name, &req.target) {
2715 Ok(()) => {
2716 let branch_data = serde_json::json!({"name": req.name, "target": req.target});
2717 let manager = Arc::clone(&hub.webhook_manager);
2718 let storage = Arc::clone(&hub.storage);
2719 let rid = repo_id.clone();
2720 drop(store);
2721 tokio::spawn(async move {
2722 let hooks = {
2723 let store = storage.read().await;
2724 store.list_webhooks(&rid).unwrap_or_else(|e| {
2725 tracing::warn!("store list_webhooks failed: {e}");
2726 Default::default()
2727 })
2728 };
2729 if !hooks.is_empty() {
2730 let result = manager
2731 .trigger(&hooks, "branch.create", &rid, branch_data)
2732 .await;
2733 if result.failed > 0 {
2734 tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
2735 }
2736 }
2737 });
2738 (
2739 StatusCode::CREATED,
2740 Json(serde_json::json!({"success": true})),
2741 )
2742 }
2743 Err(e) => (
2744 StatusCode::INTERNAL_SERVER_ERROR,
2745 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2746 ),
2747 }
2748}
2749
2750pub async fn delete_branch_handler(
2751 State(hub): State<Arc<SutureHubServer>>,
2752 headers: HeaderMap,
2753 Path((repo_id, branch_name)): Path<(String, String)>,
2754) -> (StatusCode, Json<serde_json::Value>) {
2755 if let Err(status) = check_auth(&hub, &headers).await {
2756 return (
2757 status,
2758 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
2759 );
2760 }
2761 let store = hub.storage.write().await;
2762 match store.delete_branch(&repo_id, &branch_name) {
2763 Ok(()) => {
2764 let branch_data = serde_json::json!({"name": branch_name});
2765 let manager = Arc::clone(&hub.webhook_manager);
2766 let storage = Arc::clone(&hub.storage);
2767 let rid = repo_id.clone();
2768 drop(store);
2769 tokio::spawn(async move {
2770 let hooks = {
2771 let store = storage.read().await;
2772 store.list_webhooks(&rid).unwrap_or_else(|e| {
2773 tracing::warn!("store list_webhooks failed: {e}");
2774 Default::default()
2775 })
2776 };
2777 if !hooks.is_empty() {
2778 let result = manager
2779 .trigger(&hooks, "branch.delete", &rid, branch_data)
2780 .await;
2781 if result.failed > 0 {
2782 tracing::warn!("Hook trigger failed: {} of {} webhooks failed", result.failed, result.triggered);
2783 }
2784 }
2785 });
2786 (StatusCode::OK, Json(serde_json::json!({"success": true})))
2787 }
2788 Err(e) => (
2789 StatusCode::INTERNAL_SERVER_ERROR,
2790 Json(serde_json::json!({"success": false, "error": e.to_string()})),
2791 ),
2792 }
2793}
2794
2795pub async fn get_blob_handler(
2796 State(hub): State<Arc<SutureHubServer>>,
2797 Path((repo_id, hash)): Path<(String, String)>,
2798) -> (StatusCode, Json<serde_json::Value>) {
2799 let store = hub.storage.read().await;
2800 match hub.blob_get(&store, &repo_id, &hash) {
2801 Ok(Some(data)) => {
2802 use base64::Engine;
2803 let encoded = base64::engine::general_purpose::STANDARD.encode(&data);
2804 (
2805 StatusCode::OK,
2806 Json(serde_json::json!({"success": true, "data": encoded})),
2807 )
2808 }
2809 Ok(None) => (
2810 StatusCode::NOT_FOUND,
2811 Json(serde_json::json!({"success": false, "error": "blob not found"})),
2812 ),
2813 Err(e) => (
2814 StatusCode::INTERNAL_SERVER_ERROR,
2815 Json(serde_json::json!({"success": false, "error": e})),
2816 ),
2817 }
2818}
2819
2820pub async fn lfs_batch_handler(
2821 State(hub): State<Arc<SutureHubServer>>,
2822 Json(req): Json<suture_protocol::LfsBatchRequest>,
2823) -> (StatusCode, Json<serde_json::Value>) {
2824 if !req.repo_id.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.') {
2826 return (
2827 StatusCode::BAD_REQUEST,
2828 Json(serde_json::json!({
2829 "message": "invalid repo_id: must contain only alphanumeric characters, hyphens, underscores, and dots"
2830 })),
2831 );
2832 }
2833
2834 let lfs_dir = match &hub.lfs_data_dir {
2835 Some(d) => d.clone(),
2836 None => {
2837 return (
2838 StatusCode::SERVICE_UNAVAILABLE,
2839 Json(serde_json::json!({
2840 "message": "LFS storage not configured on this hub"
2841 })),
2842 );
2843 }
2844 };
2845
2846 let repo_dir = lfs_dir.join(&req.repo_id);
2847 let obj_dir = repo_dir.join("objects");
2848 let obj_dir_clone = obj_dir.clone();
2849 if let Err(e) = tokio::task::spawn_blocking(move || std::fs::create_dir_all(obj_dir_clone))
2850 .await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string())))
2851 {
2852 tracing::warn!("Failed to create directory {}: {}", obj_dir.display(), e);
2853 }
2854
2855 let mut actions = Vec::with_capacity(req.objects.len());
2856 for obj in &req.objects {
2857 let oid = &obj.oid;
2858 if !oid.chars().all(|c| c.is_ascii_hexdigit()) || oid.len() > 128 {
2859 continue;
2860 }
2861 let prefix = &oid[..2.min(oid.len())];
2862 let obj_path = obj_dir.join(prefix).join(oid);
2863
2864 let action = match req.operation {
2865 suture_protocol::LfsOperation::Upload => {
2866 let op = obj_path.clone();
2867 let exists = tokio::task::spawn_blocking(move || op.exists())
2868 .await
2869 .unwrap_or(false);
2870 if exists {
2871 suture_protocol::LfsAction::None
2872 } else {
2873 suture_protocol::LfsAction::Upload
2874 }
2875 }
2876 suture_protocol::LfsOperation::Download => {
2877 let op = obj_path.clone();
2878 let exists = tokio::task::spawn_blocking(move || op.exists())
2879 .await
2880 .unwrap_or(false);
2881 if exists {
2882 suture_protocol::LfsAction::Download
2883 } else {
2884 suture_protocol::LfsAction::None
2885 }
2886 }
2887 };
2888
2889 actions.push(suture_protocol::LfsObjectAction {
2890 oid: obj.oid.clone(),
2891 size: obj.size,
2892 action,
2893 href: None,
2894 header: None,
2895 });
2896 }
2897
2898 for action in &mut actions {
2899 if matches!(
2900 action.action,
2901 suture_protocol::LfsAction::Upload | suture_protocol::LfsAction::Download
2902 ) {
2903 action.href = Some(format!("/lfs/objects/{}/{}", req.repo_id, action.oid));
2904 }
2905 }
2906
2907 (
2908 StatusCode::OK,
2909 Json(serde_json::json!({
2910 "objects": actions,
2911 "transfer": "basic",
2912 })),
2913 )
2914}
2915
2916pub async fn lfs_upload_handler(
2917 State(hub): State<Arc<SutureHubServer>>,
2918 headers: HeaderMap,
2919 Path((repo_id, oid)): Path<(String, String)>,
2920 body: bytes::Bytes,
2921) -> (StatusCode, Json<serde_json::Value>) {
2922 if let Err(status) = check_auth(&hub, &headers).await {
2923 return (status, Json(serde_json::json!({ "message": "unauthorized" })));
2924 }
2925 let is_safe = |s: &str| s.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.');
2927 if !is_safe(&repo_id) || !is_safe(&oid) {
2928 return (
2929 StatusCode::BAD_REQUEST,
2930 Json(serde_json::json!({ "message": "invalid repo_id or oid" })),
2931 );
2932 }
2933
2934 let lfs_dir = match &hub.lfs_data_dir {
2935 Some(d) => d.clone(),
2936 None => {
2937 return (
2938 StatusCode::SERVICE_UNAVAILABLE,
2939 Json(serde_json::json!({
2940 "message": "LFS storage not configured"
2941 })),
2942 );
2943 }
2944 };
2945
2946 if body.is_empty() {
2947 return (
2948 StatusCode::BAD_REQUEST,
2949 Json(serde_json::json!({
2950 "message": "empty body"
2951 })),
2952 );
2953 }
2954
2955 use sha2::{Digest, Sha256};
2956 let hash = Sha256::digest(&body);
2957 let hash_hex = hex::encode(hash);
2958 if hash_hex != oid {
2959 return (
2960 StatusCode::BAD_REQUEST,
2961 Json(serde_json::json!({
2962 "message": format!("hash mismatch: expected {}, got {}", oid, hash_hex)
2963 })),
2964 );
2965 }
2966
2967 let prefix = &oid[..2.min(oid.len())];
2968 let obj_path = lfs_dir
2969 .join(&repo_id)
2970 .join("objects")
2971 .join(prefix)
2972 .join(&oid);
2973 if let Some(parent) = obj_path.parent() {
2974 let parent_owned = parent.to_owned();
2975 if let Err(e) = tokio::task::spawn_blocking(move || std::fs::create_dir_all(parent_owned))
2976 .await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string())))
2977 {
2978 tracing::warn!("Failed to create directory {}: {}", parent.display(), e);
2979 }
2980 }
2981 match tokio::task::spawn_blocking({
2982 let obj_path = obj_path.clone();
2983 let body = body.clone();
2984 move || std::fs::write(obj_path, body)
2985 }).await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string()))) {
2986 Ok(()) => (
2987 StatusCode::OK,
2988 Json(serde_json::json!({
2989 "message": "uploaded",
2990 "oid": oid,
2991 "size": body.len(),
2992 })),
2993 ),
2994 Err(e) => (
2995 StatusCode::INTERNAL_SERVER_ERROR,
2996 Json(serde_json::json!({
2997 "message": e.to_string()
2998 })),
2999 ),
3000 }
3001}
3002
3003pub async fn lfs_download_handler(
3004 State(hub): State<Arc<SutureHubServer>>,
3005 Path((repo_id, oid)): Path<(String, String)>,
3006) -> (StatusCode, axum::response::Response) {
3007 let is_safe = |s: &str| s.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.');
3009 if !is_safe(&repo_id) || !is_safe(&oid) {
3010 let body = axum::body::Body::from("{\"message\":\"invalid repo_id or oid\"}");
3011 let response = axum::response::Response::builder().body(body).unwrap_or_else(|_| {
3012 axum::response::Response::new(axum::body::Body::from("{\"message\":\"invalid repo_id or oid\"}"))
3013 });
3014 return (StatusCode::BAD_REQUEST, response.into_response());
3015 }
3016
3017 let lfs_dir = match &hub.lfs_data_dir {
3018 Some(d) => d.clone(),
3019 None => {
3020 return (
3021 StatusCode::SERVICE_UNAVAILABLE,
3022 axum::response::Response::new(axum::body::Body::from(
3023 serde_json::json!({"message": "LFS storage not configured"}).to_string(),
3024 )),
3025 );
3026 }
3027 };
3028
3029 let prefix = &oid[..2.min(oid.len())];
3030 let obj_path = lfs_dir
3031 .join(&repo_id)
3032 .join("objects")
3033 .join(prefix)
3034 .join(&oid);
3035
3036 match tokio::task::spawn_blocking({
3037 let obj_path = obj_path.clone();
3038 move || std::fs::read(obj_path)
3039 }).await.unwrap_or_else(|e| Err(std::io::Error::other(e.to_string()))) {
3040 Err(_) => (
3041 StatusCode::NOT_FOUND,
3042 axum::response::Response::new(axum::body::Body::from(
3043 serde_json::json!({"message": "object not found"}).to_string(),
3044 )),
3045 ),
3046 Ok(data) => {
3047 let len = data.len();
3048 let body = axum::body::Body::from(data);
3049 let response = axum::response::Response::builder()
3050 .status(StatusCode::OK)
3051 .header("Content-Type", "application/octet-stream")
3052 .header("Content-Length", len.to_string())
3053 .body(body)
3054 .unwrap_or_else(|e| {
3055 tracing::error!("failed to build response: {}", e);
3056 axum::response::Response::new(axum::body::Body::from("internal error"))
3057 });
3058 (StatusCode::OK, response)
3059 },
3060 }
3061}
3062
3063pub async fn login_handler(
3064 State(hub): State<Arc<SutureHubServer>>,
3065 Json(req): Json<crate::types::LoginRequest>,
3066) -> (StatusCode, Json<serde_json::Value>) {
3067 let store = hub.storage.read().await;
3068 let valid = store.verify_token(&req.token).unwrap_or_else(|e| {
3069 tracing::warn!("Failed to verify token: {e}");
3070 false
3071 });
3072 if !valid {
3073 return (
3074 StatusCode::UNAUTHORIZED,
3075 Json(serde_json::json!({"success": false, "error": "invalid token"})),
3076 );
3077 }
3078 let user = match store.get_user_by_token(&req.token) {
3079 Ok(Some(u)) => Some(u),
3080 Ok(None) => None,
3081 Err(e) => {
3082 tracing::error!("Failed to get user by token: {e}");
3083 None
3084 }
3085 };
3086 match user {
3087 Some(u) => (
3088 StatusCode::OK,
3089 Json(serde_json::json!({"success": true, "user": u, "token": req.token})),
3090 ),
3091 None => (
3092 StatusCode::UNAUTHORIZED,
3093 Json(serde_json::json!({"success": false, "error": "user not found"})),
3094 ),
3095 }
3096}
3097
3098pub async fn search_handler(
3099 State(hub): State<Arc<SutureHubServer>>,
3100 Query(params): Query<crate::types::SearchParams>,
3101) -> (StatusCode, Json<serde_json::Value>) {
3102 let store = hub.storage.read().await;
3103 let repos = match store.search_repos(¶ms.q) {
3104 Ok(r) => r,
3105 Err(e) => {
3106 tracing::error!("Failed to search repos: {e}");
3107 return (
3108 StatusCode::INTERNAL_SERVER_ERROR,
3109 Json(serde_json::json!({"error": "database error"})),
3110 );
3111 }
3112 };
3113 let mut patches = Vec::new();
3114 for repo_id in &repos {
3115 if let Ok(p) = store.search_patches(repo_id, ¶ms.q) {
3116 patches.extend(p);
3117 }
3118 }
3119 (
3120 StatusCode::OK,
3121 Json(serde_json::json!({"repos": repos, "patches": patches})),
3122 )
3123}
3124
3125#[derive(serde::Deserialize)]
3126pub struct ActivityPaginationParams {
3127 pub limit: Option<u32>,
3128 pub cursor: Option<String>,
3129}
3130
3131pub async fn activity_handler(
3132 State(hub): State<Arc<SutureHubServer>>,
3133 Query(params): Query<ActivityPaginationParams>,
3134) -> (StatusCode, Json<serde_json::Value>) {
3135 let offset = params
3136 .cursor
3137 .as_deref()
3138 .and_then(decode_cursor)
3139 .unwrap_or(0);
3140 let limit = params.limit.unwrap_or(50).min(200) as usize;
3141 let store = hub.storage.read().await;
3142 let entries = store.get_replication_log(0).unwrap_or_else(|e| {
3143 tracing::warn!("store get_replication_log failed: {e}");
3144 Default::default()
3145 });
3146 let mut collected: Vec<_> = entries
3147 .into_iter()
3148 .skip(offset as usize)
3149 .take(limit + 1)
3150 .collect();
3151 let has_more = collected.len() > limit;
3152 if has_more {
3153 collected.truncate(limit);
3154 }
3155 let next_cursor = if has_more {
3156 Some(encode_cursor(offset + limit as u64))
3157 } else {
3158 None
3159 };
3160 (
3161 StatusCode::OK,
3162 Json(serde_json::json!({
3163 "entries": collected,
3164 "next_cursor": next_cursor.unwrap_or_default(),
3165 })),
3166 )
3167}
3168
3169pub async fn delete_mirror_handler(
3170 State(hub): State<Arc<SutureHubServer>>,
3171 headers: HeaderMap,
3172 Path(mirror_id): Path<i64>,
3173) -> (StatusCode, Json<serde_json::Value>) {
3174 if let Err(status) = check_auth(&hub, &headers).await {
3175 return (
3176 status,
3177 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3178 );
3179 }
3180 let store = hub.storage.write().await;
3181 match store.delete_mirror(mirror_id) {
3182 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
3183 Err(e) => (
3184 StatusCode::INTERNAL_SERVER_ERROR,
3185 Json(serde_json::json!({"success": false, "error": e.to_string()})),
3186 ),
3187 }
3188}
3189
3190async fn serve_index() -> Html<&'static str> {
3191 Html(include_str!("../static/index.html"))
3192}
3193
3194async fn serve_static_file(
3195 axum::extract::Path(path): axum::extract::Path<String>,
3196) -> impl IntoResponse {
3197 let content_type = if path.ends_with(".css") {
3198 "text/css"
3199 } else if path.ends_with(".js") {
3200 "application/javascript"
3201 } else if path.ends_with(".html") {
3202 "text/html"
3203 } else if path.ends_with(".json") {
3204 "application/json"
3205 } else {
3206 "application/octet-stream"
3207 };
3208
3209 let static_dir = std::path::Path::new("static");
3210 let file_path = static_dir.join(&path);
3211
3212 let Ok(canonical_static) = tokio::fs::canonicalize(static_dir).await else { return StatusCode::NOT_FOUND.into_response() };
3213 let Ok(canonical_file) = tokio::fs::canonicalize(&file_path).await else { return StatusCode::NOT_FOUND.into_response() };
3214
3215 if !canonical_file.starts_with(&canonical_static) {
3216 return StatusCode::FORBIDDEN.into_response();
3217 }
3218
3219 tokio::fs::read_to_string(&canonical_file).await.map_or_else(
3220 |_| StatusCode::NOT_FOUND.into_response(),
3221 |contents| {
3222 let headers = [(axum::http::header::CONTENT_TYPE, content_type)];
3223 (StatusCode::OK, headers, contents).into_response()
3224 },
3225 )
3226}
3227
3228pub async fn register_handler(
3229 State(hub): State<Arc<SutureHubServer>>,
3230 headers: HeaderMap,
3231 Json(req): Json<crate::types::RegisterRequest>,
3232) -> (StatusCode, Json<crate::types::RegisterResponse>) {
3233 match require_role(&hub, &headers, &Role::Admin).await {
3234 Ok(_) => {}
3235 Err(status) => {
3236 return (
3237 status,
3238 Json(crate::types::RegisterResponse {
3239 success: false,
3240 error: Some("admin access required".to_owned()),
3241 user: None,
3242 }),
3243 );
3244 }
3245 }
3246
3247 let role = req.role.as_deref().unwrap_or("member");
3248 if !matches!(role, "admin" | "member" | "reader") {
3249 return (
3250 StatusCode::BAD_REQUEST,
3251 Json(crate::types::RegisterResponse {
3252 success: false,
3253 error: Some("role must be admin, member, or reader".to_owned()),
3254 user: None,
3255 }),
3256 );
3257 }
3258
3259 let api_token = generate_api_token();
3260
3261 let store = hub.storage.write().await;
3262 match store.create_user(&req.username, &req.display_name, role, &api_token) {
3263 Ok(()) => {
3264 let mut user = match store.get_user(&req.username) {
3265 Ok(Some(u)) => Some(u),
3266 Ok(None) => None,
3267 Err(e) => {
3268 tracing::error!("Failed to get user after creation: {e}");
3269 None
3270 }
3271 };
3272 if let Some(ref mut u) = user {
3273 u.api_token = Some(api_token);
3274 }
3275 (
3276 StatusCode::CREATED,
3277 Json(crate::types::RegisterResponse {
3278 success: true,
3279 error: None,
3280 user,
3281 }),
3282 )
3283 }
3284 Err(e) => (
3285 StatusCode::CONFLICT,
3286 Json(crate::types::RegisterResponse {
3287 success: false,
3288 error: Some(format!("failed to create user: {e}")),
3289 user: None,
3290 }),
3291 ),
3292 }
3293}
3294
3295pub async fn list_users_handler(
3296 State(hub): State<Arc<SutureHubServer>>,
3297 headers: HeaderMap,
3298) -> (StatusCode, Json<crate::types::ListUsersResponse>) {
3299 match require_role(&hub, &headers, &Role::Admin).await {
3300 Ok(_) => {}
3301 Err(status) => {
3302 return (
3303 status,
3304 Json(crate::types::ListUsersResponse {
3305 success: false,
3306 error: Some("admin access required".to_owned()),
3307 users: vec![],
3308 }),
3309 );
3310 }
3311 }
3312
3313 let store = hub.storage.read().await;
3314 match store.list_users() {
3315 Ok(users) => {
3316 let users: Vec<_> = users
3319 .into_iter()
3320 .map(|mut u| {
3321 u.api_token = None;
3322 u
3323 })
3324 .collect();
3325 (
3326 StatusCode::OK,
3327 Json(crate::types::ListUsersResponse {
3328 success: true,
3329 error: None,
3330 users,
3331 }),
3332 )
3333 }
3334 Err(e) => (
3335 StatusCode::INTERNAL_SERVER_ERROR,
3336 Json(crate::types::ListUsersResponse {
3337 success: false,
3338 error: Some(format!("database error: {e}")),
3339 users: vec![],
3340 }),
3341 ),
3342 }
3343}
3344
3345pub async fn get_user_handler(
3346 State(hub): State<Arc<SutureHubServer>>,
3347 headers: HeaderMap,
3348 Path(username): Path<String>,
3349) -> (StatusCode, Json<crate::types::GetUserResponse>) {
3350 let requesting_user = resolve_user(&hub, &headers).await;
3351 let is_admin = requesting_user
3352 .as_ref()
3353 .is_some_and(|u| u.role == "admin");
3354 let is_self = requesting_user
3355 .as_ref()
3356 .is_some_and(|u| u.username == username);
3357
3358 if !is_admin && !is_self {
3359 return (
3360 StatusCode::FORBIDDEN,
3361 Json(crate::types::GetUserResponse {
3362 success: false,
3363 error: Some("access denied".to_owned()),
3364 user: None,
3365 }),
3366 );
3367 }
3368
3369 let store = hub.storage.read().await;
3370 match store.get_user(&username) {
3371 Ok(Some(user)) => {
3372 let mut resp_user = user;
3373 if is_self && !is_admin {
3374 resp_user.api_token = None;
3375 }
3376 (
3377 StatusCode::OK,
3378 Json(crate::types::GetUserResponse {
3379 success: true,
3380 error: None,
3381 user: Some(resp_user),
3382 }),
3383 )
3384 }
3385 Ok(None) => (
3386 StatusCode::NOT_FOUND,
3387 Json(crate::types::GetUserResponse {
3388 success: false,
3389 error: Some("user not found".to_owned()),
3390 user: None,
3391 }),
3392 ),
3393 Err(e) => (
3394 StatusCode::INTERNAL_SERVER_ERROR,
3395 Json(crate::types::GetUserResponse {
3396 success: false,
3397 error: Some(format!("database error: {e}")),
3398 user: None,
3399 }),
3400 ),
3401 }
3402}
3403
3404pub async fn update_role_handler(
3405 State(hub): State<Arc<SutureHubServer>>,
3406 headers: HeaderMap,
3407 Path(username): Path<String>,
3408 Json(req): Json<crate::types::UpdateRoleRequest>,
3409) -> (StatusCode, Json<crate::types::UpdateRoleResponse>) {
3410 match require_role(&hub, &headers, &Role::Admin).await {
3411 Ok(_) => {}
3412 Err(status) => {
3413 return (
3414 status,
3415 Json(crate::types::UpdateRoleResponse {
3416 success: false,
3417 error: Some("admin access required".to_owned()),
3418 }),
3419 );
3420 }
3421 }
3422
3423 if !matches!(req.role.as_str(), "admin" | "member" | "reader") {
3424 return (
3425 StatusCode::BAD_REQUEST,
3426 Json(crate::types::UpdateRoleResponse {
3427 success: false,
3428 error: Some("role must be admin, member, or reader".to_owned()),
3429 }),
3430 );
3431 }
3432
3433 let store = hub.storage.write().await;
3434 match store.update_user_role(&username, &req.role) {
3435 Ok(()) => (
3436 StatusCode::OK,
3437 Json(crate::types::UpdateRoleResponse {
3438 success: true,
3439 error: None,
3440 }),
3441 ),
3442 Err(e) => (
3443 StatusCode::INTERNAL_SERVER_ERROR,
3444 Json(crate::types::UpdateRoleResponse {
3445 success: false,
3446 error: Some(format!("database error: {e}")),
3447 }),
3448 ),
3449 }
3450}
3451
3452pub async fn delete_user_handler(
3453 State(hub): State<Arc<SutureHubServer>>,
3454 headers: HeaderMap,
3455 Path(username): Path<String>,
3456) -> (StatusCode, Json<crate::types::DeleteUserResponse>) {
3457 match require_role(&hub, &headers, &Role::Admin).await {
3458 Ok(_) => {}
3459 Err(status) => {
3460 return (
3461 status,
3462 Json(crate::types::DeleteUserResponse {
3463 success: false,
3464 error: Some("admin access required".to_owned()),
3465 }),
3466 );
3467 }
3468 }
3469
3470 let store = hub.storage.write().await;
3471 match store.delete_user(&username) {
3472 Ok(()) => (
3473 StatusCode::OK,
3474 Json(crate::types::DeleteUserResponse {
3475 success: true,
3476 error: None,
3477 }),
3478 ),
3479 Err(e) => (
3480 StatusCode::INTERNAL_SERVER_ERROR,
3481 Json(crate::types::DeleteUserResponse {
3482 success: false,
3483 error: Some(format!("database error: {e}")),
3484 }),
3485 ),
3486 }
3487}
3488
3489pub async fn handshake_v2_handler(
3490 Json(req): Json<crate::types::HandshakeRequestV2>,
3491) -> Json<crate::types::HandshakeResponseV2> {
3492 let compatible = req.client_version == crate::types::PROTOCOL_VERSION_V2;
3493 Json(crate::types::HandshakeResponseV2 {
3494 server_version: crate::types::PROTOCOL_VERSION_V2,
3495 server_name: "suture-hub".to_owned(),
3496 compatible,
3497 server_capabilities: crate::types::ServerCapabilities {
3498 supports_delta: true,
3499 supports_compression: true,
3500 max_blob_size: 50 * 1024 * 1024,
3501 protocol_versions: vec![
3502 crate::types::PROTOCOL_VERSION,
3503 crate::types::PROTOCOL_VERSION_V2,
3504 ],
3505 },
3506 })
3507}
3508
3509pub async fn v2_pull_handler(
3510 State(hub): State<Arc<SutureHubServer>>,
3511 headers: HeaderMap,
3512 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3513 Json(req): Json<crate::types::PullRequestV2>,
3514) -> (StatusCode, HeaderMap, Json<crate::types::PullResponseV2>) {
3515 let ip = addr.ip().to_string();
3516 if let Err(retry_after) = hub.check_rate_limit(&ip, "pull") {
3517 let mut hdrs = HeaderMap::new();
3518 if let Ok(val) = retry_after.to_string().parse() {
3519 hdrs.insert(axum::http::header::RETRY_AFTER, val);
3520 }
3521 return (
3522 StatusCode::TOO_MANY_REQUESTS,
3523 hdrs,
3524 Json(crate::types::PullResponseV2 {
3525 success: false,
3526 error: Some("rate limit exceeded".to_owned()),
3527 patches: vec![],
3528 branches: vec![],
3529 blobs: vec![],
3530 deltas: vec![],
3531 protocol_version: crate::types::PROTOCOL_VERSION_V2,
3532 }),
3533 );
3534 }
3535
3536 if let Err(status) = check_auth(&hub, &headers).await {
3537 return (
3538 status,
3539 HeaderMap::new(),
3540 Json(crate::types::PullResponseV2 {
3541 success: false,
3542 error: Some("authentication failed".to_owned()),
3543 patches: vec![],
3544 branches: vec![],
3545 blobs: vec![],
3546 deltas: vec![],
3547 protocol_version: crate::types::PROTOCOL_VERSION_V2,
3548 }),
3549 );
3550 }
3551
3552 let resp = hub.handle_pull_v2(req).await;
3553 let status = if resp.success {
3554 StatusCode::OK
3555 } else {
3556 StatusCode::NOT_FOUND
3557 };
3558 (status, HeaderMap::new(), Json(resp))
3559}
3560
3561pub async fn v2_push_handler(
3562 State(hub): State<Arc<SutureHubServer>>,
3563 headers: HeaderMap,
3564 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3565 Json(req): Json<crate::types::PushRequestV2>,
3566) -> (StatusCode, HeaderMap, Json<PushResponse>) {
3567 let ip = addr.ip().to_string();
3568 if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
3569 let mut hdrs = HeaderMap::new();
3570 if let Ok(val) = retry_after.to_string().parse() {
3571 hdrs.insert(axum::http::header::RETRY_AFTER, val);
3572 }
3573 return (
3574 StatusCode::TOO_MANY_REQUESTS,
3575 hdrs,
3576 Json(PushResponse {
3577 success: false,
3578 error: Some("rate limit exceeded".to_owned()),
3579 existing_patches: vec![],
3580 }),
3581 );
3582 }
3583
3584 if let Err(status) = check_auth(&hub, &headers).await {
3585 return (
3586 status,
3587 HeaderMap::new(),
3588 Json(PushResponse {
3589 success: false,
3590 error: Some("authentication failed".to_owned()),
3591 existing_patches: vec![],
3592 }),
3593 );
3594 }
3595
3596 if !hub.no_auth
3597 && let Some(user) = resolve_user(&hub, &headers).await
3598 && Role::parse(&user.role) < Role::Member
3599 {
3600 return (
3601 StatusCode::FORBIDDEN,
3602 HeaderMap::new(),
3603 Json(PushResponse {
3604 success: false,
3605 error: Some("insufficient permissions: readers cannot push".to_owned()),
3606 existing_patches: vec![],
3607 }),
3608 );
3609 }
3610
3611 match hub.handle_push_v2(req).await {
3612 Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
3613 Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
3614 }
3615}
3616
3617pub async fn add_peer_handler(
3618 State(hub): State<Arc<SutureHubServer>>,
3619 headers: HeaderMap,
3620 Json(req): Json<AddPeerRequest>,
3621) -> (StatusCode, Json<AddPeerResponse>) {
3622 if let Err(status) = check_auth(&hub, &headers).await {
3623 return (status, Json(AddPeerResponse { success: false, peer_id: None, error: Some("unauthorized".to_owned()) }));
3624 }
3625 let role = hub.get_replication_role();
3626 if role != "leader" {
3627 return (
3628 StatusCode::FORBIDDEN,
3629 Json(AddPeerResponse {
3630 success: false,
3631 peer_id: None,
3632 error: Some("only the leader can manage peers".to_owned()),
3633 }),
3634 );
3635 }
3636 let resp = hub.handle_add_peer(req).await;
3637 let status = if resp.success {
3638 StatusCode::OK
3639 } else {
3640 StatusCode::BAD_REQUEST
3641 };
3642 (status, Json(resp))
3643}
3644
3645pub async fn remove_peer_handler(
3646 State(hub): State<Arc<SutureHubServer>>,
3647 headers: HeaderMap,
3648 Path(id): Path<i64>,
3649) -> (StatusCode, Json<RemovePeerResponse>) {
3650 if let Err(status) = check_auth(&hub, &headers).await {
3651 return (status, Json(RemovePeerResponse { success: false, error: Some("unauthorized".to_owned()) }));
3652 }
3653 let role = hub.get_replication_role();
3654 if role != "leader" {
3655 return (
3656 StatusCode::FORBIDDEN,
3657 Json(RemovePeerResponse {
3658 success: false,
3659 error: Some("only the leader can manage peers".to_owned()),
3660 }),
3661 );
3662 }
3663 let resp = hub.handle_remove_peer(id).await;
3664 let status = if resp.success {
3665 StatusCode::OK
3666 } else {
3667 StatusCode::BAD_REQUEST
3668 };
3669 (status, Json(resp))
3670}
3671
3672pub async fn list_peers_handler(
3673 State(hub): State<Arc<SutureHubServer>>,
3674) -> (StatusCode, Json<ListPeersResponse>) {
3675 let resp = hub.handle_list_peers().await;
3676 (StatusCode::OK, Json(resp))
3677}
3678
3679pub async fn replication_status_handler(
3680 State(hub): State<Arc<SutureHubServer>>,
3681) -> (StatusCode, Json<ReplicationStatusResponse>) {
3682 let resp = hub.handle_replication_status().await;
3683 (StatusCode::OK, Json(resp))
3684}
3685
3686pub async fn replication_sync_handler(
3687 State(hub): State<Arc<SutureHubServer>>,
3688 Json(entries): Json<Vec<ReplicationEntry>>,
3689) -> (StatusCode, Json<SyncResponse>) {
3690 let role = hub.get_replication_role();
3691 if role != "follower" && role != "standalone" {
3692 return (
3693 StatusCode::FORBIDDEN,
3694 Json(SyncResponse {
3695 success: false,
3696 applied: 0,
3697 error: Some("sync endpoint is for followers only".to_owned()),
3698 }),
3699 );
3700 }
3701 let resp = hub.handle_replication_sync(entries).await;
3702 let status = if resp.success {
3703 StatusCode::OK
3704 } else {
3705 StatusCode::INTERNAL_SERVER_ERROR
3706 };
3707 (status, Json(resp))
3708}
3709
3710pub async fn batch_push_handler(
3711 State(hub): State<Arc<SutureHubServer>>,
3712 headers: HeaderMap,
3713 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
3714 Json(req): Json<BatchPatchRequest>,
3715) -> (StatusCode, HeaderMap, Json<PushResponse>) {
3716 let ip = addr.ip().to_string();
3717 if let Err(retry_after) = hub.check_rate_limit(&ip, "push") {
3718 let mut hdrs = HeaderMap::new();
3719 if let Ok(val) = retry_after.to_string().parse() {
3720 hdrs.insert(axum::http::header::RETRY_AFTER, val);
3721 }
3722 return (
3723 StatusCode::TOO_MANY_REQUESTS,
3724 hdrs,
3725 Json(PushResponse {
3726 success: false,
3727 error: Some("rate limit exceeded".to_owned()),
3728 existing_patches: vec![],
3729 }),
3730 );
3731 }
3732
3733 if let Err(status) = check_auth(&hub, &headers).await {
3734 return (
3735 status,
3736 HeaderMap::new(),
3737 Json(PushResponse {
3738 success: false,
3739 error: Some("authentication failed".to_owned()),
3740 existing_patches: vec![],
3741 }),
3742 );
3743 }
3744
3745 match hub.handle_batch_push(req).await {
3746 Ok(resp) => (StatusCode::OK, HeaderMap::new(), Json(resp)),
3747 Err((status, resp)) => (status, HeaderMap::new(), Json(resp)),
3748 }
3749}
3750
3751async fn replication_background_task(hub: Arc<SutureHubServer>) {
3752 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
3753 loop {
3754 interval.tick().await;
3755
3756 let role = hub.get_replication_role();
3757 if role != "leader" {
3758 continue;
3759 }
3760
3761 let peers = {
3762 let store = hub.storage.read().await;
3763 match store.list_replication_peers() {
3764 Ok(p) => p,
3765 Err(e) => {
3766 tracing::warn!("replication: failed to list peers: {e}");
3767 continue;
3768 }
3769 }
3770 };
3771
3772 for peer in &peers {
3773 if peer.status != "active" {
3774 continue;
3775 }
3776
3777 let entries = {
3778 let store = hub.storage.read().await;
3779 match store.get_replication_log(peer.last_sync_seq) {
3780 Ok(e) => e,
3781 Err(e) => {
3782 tracing::warn!("replication: failed to get log for peer {}: {e}", peer.id);
3783 continue;
3784 }
3785 }
3786 };
3787
3788 if entries.is_empty() {
3789 continue;
3790 }
3791
3792 let last_seq = entries.last().map_or(peer.last_sync_seq, |e| e.seq);
3793
3794 let client = match reqwest::Client::builder()
3795 .timeout(std::time::Duration::from_secs(10))
3796 .build()
3797 {
3798 Ok(c) => c,
3799 Err(e) => {
3800 tracing::warn!(
3801 "replication: failed to build client for peer {}: {e}",
3802 peer.id
3803 );
3804 continue;
3805 }
3806 };
3807
3808 let sync_url = format!("{}/replication/sync", peer.peer_url.trim_end_matches('/'));
3809 match client.post(&sync_url).json(&entries).send().await {
3810 Ok(resp) if resp.status().is_success() => {
3811 tracing::info!(
3812 "replication: synced {} entries to peer {} (seq {}-{})",
3813 entries.len(),
3814 peer.id,
3815 peer.last_sync_seq,
3816 last_seq
3817 );
3818 let store = hub.storage.write().await;
3819 if let Err(e) = store.update_peer_sync_seq(peer.id, last_seq) {
3820 tracing::warn!("Failed to update peer sync sequence: {}", e);
3821 }
3822 }
3823 Ok(resp) => {
3824 tracing::warn!(
3825 "replication: sync to peer {} returned {}",
3826 peer.id,
3827 resp.status()
3828 );
3829 }
3830 Err(e) => {
3831 tracing::warn!("replication: failed to sync to peer {}: {e}", peer.id);
3832 }
3833 }
3834 }
3835 }
3836}
3837
3838#[derive(serde::Deserialize)]
3839pub struct CreateWebhookRequest {
3840 pub url: String,
3841 pub events: Vec<String>,
3842 pub secret: Option<String>,
3843}
3844
3845pub async fn create_webhook_handler(
3846 State(hub): State<Arc<SutureHubServer>>,
3847 headers: HeaderMap,
3848 Path(repo_id): Path<String>,
3849 Json(req): Json<CreateWebhookRequest>,
3850) -> (StatusCode, Json<serde_json::Value>) {
3851 if let Err(status) = check_auth(&hub, &headers).await {
3852 return (
3853 status,
3854 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3855 );
3856 }
3857 if req.events.is_empty() {
3858 return (
3859 StatusCode::BAD_REQUEST,
3860 Json(serde_json::json!({"success": false, "error": "events must not be empty"})),
3861 );
3862 }
3863 let random_bytes: [u8; 16] = rand::random();
3864 let id = format!("wh_{}", hex::encode(random_bytes));
3865 let created_at = std::time::SystemTime::now()
3866 .duration_since(std::time::UNIX_EPOCH)
3867 .unwrap_or_default()
3868 .as_secs();
3869 let webhook = Webhook {
3870 id: id.clone(),
3871 repo_id: repo_id.clone(),
3872 url: req.url,
3873 events: req.events,
3874 secret: req.secret,
3875 created_at,
3876 active: true,
3877 };
3878 let store = hub.storage.write().await;
3879 match store.create_webhook(&webhook) {
3880 Ok(()) => (
3881 StatusCode::CREATED,
3882 Json(serde_json::json!({"success": true, "id": id})),
3883 ),
3884 Err(e) => (
3885 StatusCode::INTERNAL_SERVER_ERROR,
3886 Json(serde_json::json!({"success": false, "error": e.to_string()})),
3887 ),
3888 }
3889}
3890
3891pub async fn list_webhooks_handler(
3892 State(hub): State<Arc<SutureHubServer>>,
3893 headers: HeaderMap,
3894 Path(repo_id): Path<String>,
3895) -> (StatusCode, Json<serde_json::Value>) {
3896 if let Err(status) = check_auth(&hub, &headers).await {
3897 return (
3898 status,
3899 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3900 );
3901 }
3902 let store = hub.storage.read().await;
3903 match store.list_webhooks(&repo_id) {
3904 Ok(hooks) => (
3905 StatusCode::OK,
3906 Json(serde_json::json!({"success": true, "webhooks": hooks})),
3907 ),
3908 Err(e) => (
3909 StatusCode::INTERNAL_SERVER_ERROR,
3910 Json(serde_json::json!({"success": false, "error": e.to_string()})),
3911 ),
3912 }
3913}
3914
3915pub async fn delete_webhook_handler(
3916 State(hub): State<Arc<SutureHubServer>>,
3917 headers: HeaderMap,
3918 Path((_repo_id, id)): Path<(String, String)>,
3919) -> (StatusCode, Json<serde_json::Value>) {
3920 if let Err(status) = check_auth(&hub, &headers).await {
3921 return (
3922 status,
3923 Json(serde_json::json!({"success": false, "error": "unauthorized"})),
3924 );
3925 }
3926 let store = hub.storage.write().await;
3927 match store.delete_webhook(&id) {
3928 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
3929 Err(e) => (
3930 StatusCode::NOT_FOUND,
3931 Json(serde_json::json!({"success": false, "error": e.to_string()})),
3932 ),
3933 }
3934}
3935
3936pub async fn health_check() -> Json<serde_json::Value> {
3937 Json(serde_json::json!({"status": "ok"}))
3938}
3939
3940pub async fn sso_list_providers_handler(
3944 State(hub): State<Arc<SutureHubServer>>,
3945 headers: HeaderMap,
3946) -> (StatusCode, Json<serde_json::Value>) {
3947 if let Err(status) = check_auth(&hub, &headers).await {
3948 return (status, Json(serde_json::json!({"error": "unauthorized"})));
3949 }
3950 let store = hub.storage.read().await;
3951 match store.list_oidc_configs() {
3952 Ok(configs) => {
3953 let names: Vec<&str> = configs.iter().map(|c| c.provider_name.as_str()).collect();
3954 (StatusCode::OK, Json(serde_json::json!({"providers": names})))
3955 }
3956 Err(e) => (
3957 StatusCode::INTERNAL_SERVER_ERROR,
3958 Json(serde_json::json!({"error": e.to_string()})),
3959 ),
3960 }
3961}
3962
3963pub async fn sso_configure_provider_handler(
3965 State(hub): State<Arc<SutureHubServer>>,
3966 headers: HeaderMap,
3967 Json(config): Json<crate::sso::OidcConfig>,
3968) -> (StatusCode, Json<serde_json::Value>) {
3969 if let Err(status) = check_auth(&hub, &headers).await {
3970 return (status, Json(serde_json::json!({"error": "unauthorized"})));
3971 }
3972 if config.provider_name.is_empty() {
3974 return (
3975 StatusCode::BAD_REQUEST,
3976 Json(serde_json::json!({"error": "provider_name is required"})),
3977 );
3978 }
3979 if config.issuer_url.is_empty() {
3980 return (
3981 StatusCode::BAD_REQUEST,
3982 Json(serde_json::json!({"error": "issuer_url is required"})),
3983 );
3984 }
3985 if config.client_id.is_empty() {
3986 return (
3987 StatusCode::BAD_REQUEST,
3988 Json(serde_json::json!({"error": "client_id is required"})),
3989 );
3990 }
3991 let store = hub.storage.read().await;
3992 match store.set_oidc_config(&config) {
3993 Ok(()) => (
3994 StatusCode::OK,
3995 Json(serde_json::json!({"success": true, "provider": config.provider_name})),
3996 ),
3997 Err(e) => (
3998 StatusCode::INTERNAL_SERVER_ERROR,
3999 Json(serde_json::json!({"error": e.to_string()})),
4000 ),
4001 }
4002}
4003
4004pub async fn sso_delete_provider_handler(
4006 State(hub): State<Arc<SutureHubServer>>,
4007 headers: HeaderMap,
4008 Path(provider_name): Path<String>,
4009) -> (StatusCode, Json<serde_json::Value>) {
4010 if let Err(status) = check_auth(&hub, &headers).await {
4011 return (status, Json(serde_json::json!({"error": "unauthorized"})));
4012 }
4013 let store = hub.storage.read().await;
4014 match store.delete_oidc_config(&provider_name) {
4015 Ok(true) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
4016 Ok(false) => (
4017 StatusCode::NOT_FOUND,
4018 Json(serde_json::json!({"error": format!("provider '{provider_name}' not found")})),
4019 ),
4020 Err(e) => (
4021 StatusCode::INTERNAL_SERVER_ERROR,
4022 Json(serde_json::json!({"error": e.to_string()})),
4023 ),
4024 }
4025}
4026
4027pub async fn sso_authorize_handler(
4029 State(hub): State<Arc<SutureHubServer>>,
4030 headers: HeaderMap,
4031 Json(body): Json<serde_json::Value>,
4032) -> (StatusCode, Json<serde_json::Value>) {
4033 if let Err(status) = check_auth(&hub, &headers).await {
4034 return (status, Json(serde_json::json!({"error": "unauthorized"})));
4035 }
4036 let provider_name = match body.get("provider") {
4037 Some(v) => match v.as_str() {
4038 Some(s) => s.to_owned(),
4039 None => {
4040 return (
4041 StatusCode::BAD_REQUEST,
4042 Json(serde_json::json!({"error": "provider must be a string"})),
4043 );
4044 }
4045 },
4046 None => {
4047 return (
4048 StatusCode::BAD_REQUEST,
4049 Json(serde_json::json!({"error": "provider is required"})),
4050 );
4051 }
4052 };
4053
4054 let store = hub.storage.read().await;
4055 let config = match store.get_oidc_config(&provider_name) {
4056 Ok(Some(c)) => c,
4057 Ok(None) => {
4058 return (
4059 StatusCode::NOT_FOUND,
4060 Json(serde_json::json!({"error": format!("provider '{provider_name}' not configured")})),
4061 );
4062 }
4063 Err(e) => {
4064 return (
4065 StatusCode::INTERNAL_SERVER_ERROR,
4066 Json(serde_json::json!({"error": e.to_string()})),
4067 );
4068 }
4069 };
4070 drop(store);
4071
4072 let state = crate::sso::generate_state();
4073 let nonce = crate::sso::generate_nonce();
4074
4075 let store = hub.storage.read().await;
4077 if let Err(e) = store.store_sso_state(&state, &provider_name, &nonce) {
4078 return (
4079 StatusCode::INTERNAL_SERVER_ERROR,
4080 Json(serde_json::json!({"error": format!("failed to store SSO state: {e}")})),
4081 );
4082 }
4083 drop(store);
4084
4085 let url = crate::sso::authorization_url(&config, &state, &nonce);
4086
4087 (StatusCode::OK, Json(serde_json::json!({
4088 "authorization_url": url,
4089 "state": state,
4090 })))
4091}
4092
4093pub async fn sso_callback_handler(
4096 State(hub): State<Arc<SutureHubServer>>,
4097 headers: HeaderMap,
4098 Json(body): Json<serde_json::Value>,
4099) -> (StatusCode, Json<serde_json::Value>) {
4100 if let Err(status) = check_auth(&hub, &headers).await {
4101 return (status, Json(serde_json::json!({"error": "unauthorized"})));
4102 }
4103 let provider_name = body.get("provider").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4104 let code = body.get("code").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4105 let state = body.get("state").and_then(|v| v.as_str()).unwrap_or("").to_owned();
4106
4107 if provider_name.is_empty() {
4108 return (
4109 StatusCode::BAD_REQUEST,
4110 Json(serde_json::json!({"error": "provider is required"})),
4111 );
4112 }
4113 if code.is_empty() {
4114 return (
4115 StatusCode::BAD_REQUEST,
4116 Json(serde_json::json!({"error": "code is required"})),
4117 );
4118 }
4119 if state.is_empty() {
4120 return (
4121 StatusCode::BAD_REQUEST,
4122 Json(serde_json::json!({"error": "state is required"})),
4123 );
4124 }
4125
4126 let store = hub.storage.read().await;
4128 let (stored_provider, nonce) = match store.consume_sso_state(&state) {
4129 Ok(Some((p, n))) => (p, n),
4130 Ok(None) => {
4131 return (
4132 StatusCode::BAD_REQUEST,
4133 Json(serde_json::json!({"error": "invalid or expired state parameter"})),
4134 );
4135 }
4136 Err(e) => {
4137 return (
4138 StatusCode::INTERNAL_SERVER_ERROR,
4139 Json(serde_json::json!({"error": e.to_string()})),
4140 );
4141 }
4142 };
4143
4144 if stored_provider != provider_name {
4145 return (
4146 StatusCode::BAD_REQUEST,
4147 Json(serde_json::json!({"error": "state/provider mismatch"})),
4148 );
4149 }
4150 drop(store);
4151
4152 let store = hub.storage.read().await;
4154 let config = match store.get_oidc_config(&provider_name) {
4155 Ok(Some(c)) => c,
4156 Ok(None) => {
4157 return (
4158 StatusCode::NOT_FOUND,
4159 Json(serde_json::json!({"error": format!("provider '{provider_name}' not configured")})),
4160 );
4161 }
4162 Err(e) => {
4163 return (
4164 StatusCode::INTERNAL_SERVER_ERROR,
4165 Json(serde_json::json!({"error": e.to_string()})),
4166 );
4167 }
4168 };
4169 drop(store);
4170
4171 let result = match crate::sso::complete_callback(&config, &code, &state, &nonce).await {
4173 Ok(r) => r,
4174 Err(e) => {
4175 tracing::warn!("SSO callback failed for provider '{provider_name}': {e}");
4176 return (
4177 StatusCode::BAD_GATEWAY,
4178 Json(serde_json::json!({"error": format!("SSO authentication failed: {e}")})),
4179 );
4180 }
4181 };
4182
4183 let user_sub = result.user.sub.clone();
4185 let user_email = result.user.email.clone();
4186 let user_name = result.user.name.clone();
4187 let username = user_email
4188 .as_deref()
4189 .unwrap_or("")
4190 .to_owned();
4191 let username = if username.is_empty() {
4192 format!("{}:{}", provider_name, user_sub)
4193 } else {
4194 username
4195 };
4196 let display_name = user_name
4197 .as_deref()
4198 .unwrap_or(&username)
4199 .to_owned();
4200
4201 let store = hub.storage.read().await;
4202 match store.upsert_sso_user(
4203 &provider_name,
4204 &user_sub,
4205 &username,
4206 &display_name,
4207 ) {
4208 Ok(created_username) => {
4209 let token_hash =
4211 format!("{:x}", sha2::Sha256::digest(result.session_token.as_bytes()));
4212 if let Err(e) = store.update_user_token(&created_username, &token_hash) {
4213 tracing::warn!("failed to set SSO session token: {e}");
4214 }
4215 drop(store);
4216
4217 (StatusCode::OK, Json(serde_json::json!({
4218 "status": "authenticated",
4219 "username": created_username,
4220 "display_name": display_name,
4221 "provider": provider_name,
4222 "session_token": result.session_token,
4223 "email": user_email,
4224 })))
4225 }
4226 Err(e) => {
4227 tracing::warn!("failed to upsert SSO user: {e}");
4228 (
4229 StatusCode::INTERNAL_SERVER_ERROR,
4230 Json(serde_json::json!({"error": format!("failed to create user: {e}")})),
4231 )
4232 }
4233 }
4234}
4235
4236pub async fn audit_log_handler(
4237 State(hub): State<Arc<SutureHubServer>>,
4238 headers: HeaderMap,
4239 Query(params): Query<AuditQueryParams>,
4240) -> (StatusCode, Json<serde_json::Value>) {
4241 if let Err(status) = check_auth(&hub, &headers).await {
4242 return (status, Json(serde_json::json!({"error": "unauthorized"})));
4243 }
4244 let store = hub.storage.read().await;
4245 let entries = store
4246 .query_audit_log(
4247 params.actor.as_deref(),
4248 params.action.as_deref(),
4249 params.limit.unwrap_or(100),
4250 params.offset.unwrap_or(0),
4251 )
4252 .unwrap_or_default();
4253 (
4254 StatusCode::OK,
4255 Json(serde_json::json!({
4256 "entries": entries,
4257 "count": entries.len(),
4258 })),
4259 )
4260}
4261
4262pub async fn raft_status_handler(
4264 State(hub): State<Arc<SutureHubServer>>,
4265 headers: HeaderMap,
4266) -> (StatusCode, Json<serde_json::Value>) {
4267 if let Err(status) = check_auth(&hub, &headers).await {
4268 return (status, Json(serde_json::json!({"error": "unauthorized"})));
4269 }
4270
4271 #[cfg(feature = "raft-cluster")]
4272 {
4273 let state = hub.raft_state().await;
4274 let leader = hub.raft_leader().await;
4275 let term = {
4276 let raft = hub.raft_node.lock().await;
4277 raft.term()
4278 };
4279 let node_id = hub.raft_node_id;
4280 let is_leader = hub.is_leader().await;
4281
4282 let response = serde_json::json!({
4283 "raft_enabled": true,
4284 "node_id": node_id,
4285 "state": state,
4286 "term": term,
4287 "leader": leader,
4288 "is_leader": is_leader,
4289 });
4290 (StatusCode::OK, Json(response))
4291 }
4292
4293 #[cfg(not(feature = "raft-cluster"))]
4294 {
4295 let response = serde_json::json!({
4296 "raft_enabled": false,
4297 "state": "standalone",
4298 "is_leader": true,
4299 });
4300 (StatusCode::OK, Json(response))
4301 }
4302}
4303
4304pub async fn run_server(
4305 hub: SutureHubServer,
4306 addr: &str,
4307) -> Result<(), Box<dyn std::error::Error>> {
4308 let hub = Arc::new(hub);
4309
4310 {
4311 let hub_clone = Arc::clone(&hub);
4312 tokio::spawn(async move {
4313 replication_background_task(hub_clone).await;
4314 });
4315 }
4316
4317 let (set_request_id, propagate_request_id) = request_id_layer();
4318 let app = axum::Router::new()
4319 .route("/healthz", get(health_check))
4320 .route("/", axum::routing::get(serve_index))
4321 .route("/push", axum::routing::post(push_handler))
4322 .route(
4323 "/push/compressed",
4324 axum::routing::post(push_compressed_handler),
4325 )
4326 .route("/pull", axum::routing::post(pull_handler))
4327 .route(
4328 "/pull/compressed",
4329 axum::routing::post(pull_compressed_handler),
4330 )
4331 .route("/repos", axum::routing::get(list_repos_handler))
4332 .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4333 .route(
4334 "/repos/{repo_id}/branches",
4335 axum::routing::get(repo_branches_handler),
4336 )
4337 .route(
4338 "/repos/{repo_id}/patches",
4339 axum::routing::get(repo_patches_handler),
4340 )
4341 .route("/handshake", axum::routing::get(handshake_get_handler))
4342 .route("/handshake", axum::routing::post(handshake_handler))
4343 .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4344 .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4345 .route("/v2/pull", axum::routing::post(v2_pull_handler))
4346 .route("/v2/push", axum::routing::post(v2_push_handler))
4347 .route("/auth/token", axum::routing::post(create_token_handler))
4348 .route("/auth/verify", axum::routing::post(verify_token_handler))
4349 .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4350 .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4351 .route(
4352 "/mirror/status",
4353 axum::routing::get(mirror_status_get_handler),
4354 )
4355 .route("/mirror/status", axum::routing::post(mirror_status_handler))
4356 .route(
4357 "/repos/{repo_id}/protect/{branch}",
4358 axum::routing::post(protect_branch_handler),
4359 )
4360 .route(
4361 "/repos/{repo_id}/unprotect/{branch}",
4362 axum::routing::post(unprotect_branch_handler),
4363 )
4364 .route("/auth/register", axum::routing::post(register_handler))
4365 .route("/users", axum::routing::get(list_users_handler))
4366 .route("/users/{username}", axum::routing::get(get_user_handler))
4367 .route(
4368 "/users/{username}/role",
4369 axum::routing::patch(update_role_handler),
4370 )
4371 .route(
4372 "/users/{username}",
4373 axum::routing::delete(delete_user_handler),
4374 )
4375 .route("/static/{*path}", axum::routing::get(serve_static_file))
4376 .route("/replication/peers", axum::routing::post(add_peer_handler))
4377 .route("/replication/peers", axum::routing::get(list_peers_handler))
4378 .route(
4379 "/replication/peers/{id}",
4380 axum::routing::delete(remove_peer_handler),
4381 )
4382 .route(
4383 "/replication/status",
4384 axum::routing::get(replication_status_handler),
4385 )
4386 .route(
4387 "/replication/sync",
4388 axum::routing::post(replication_sync_handler),
4389 )
4390 .route("/repos", axum::routing::post(create_repo_handler))
4391 .route(
4392 "/repos/{repo_id}",
4393 axum::routing::delete(delete_repo_handler),
4394 )
4395 .route(
4396 "/repos/{repo_id}/branches",
4397 axum::routing::post(create_branch_handler),
4398 )
4399 .route(
4400 "/repos/{repo_id}/branches/{branch}",
4401 axum::routing::delete(delete_branch_handler),
4402 )
4403 .route(
4404 "/repos/{repo_id}/blobs/{hash}",
4405 axum::routing::get(get_blob_handler),
4406 )
4407 .route(
4408 "/repos/{repo_id}/tree/{branch}",
4409 axum::routing::get(repo_tree_handler),
4410 )
4411 .route("/auth/login", axum::routing::post(login_handler))
4412 .route("/search", axum::routing::get(search_handler))
4413 .route("/activity", axum::routing::get(activity_handler))
4414 .route(
4415 "/mirrors/{id}",
4416 axum::routing::delete(delete_mirror_handler),
4417 )
4418 .route(
4419 "/webhooks/{repo_id}",
4420 axum::routing::post(create_webhook_handler),
4421 )
4422 .route(
4423 "/webhooks/{repo_id}",
4424 axum::routing::get(list_webhooks_handler),
4425 )
4426 .route(
4427 "/webhooks/{repo_id}/{id}",
4428 axum::routing::delete(delete_webhook_handler),
4429 )
4430 .route(
4431 "/repos/{repo_id}/patches/batch",
4432 axum::routing::post(batch_push_handler),
4433 )
4434 .route("/lfs/batch", axum::routing::post(lfs_batch_handler))
4435 .route(
4436 "/lfs/objects/{repo_id}/{oid}",
4437 axum::routing::put(lfs_upload_handler),
4438 )
4439 .route(
4440 "/lfs/objects/{repo_id}/{oid}",
4441 axum::routing::get(lfs_download_handler),
4442 )
4443 .route("/sso/providers", axum::routing::get(sso_list_providers_handler))
4445 .route(
4446 "/sso/providers",
4447 axum::routing::post(sso_configure_provider_handler),
4448 )
4449 .route(
4450 "/sso/providers/{provider_name}",
4451 axum::routing::delete(sso_delete_provider_handler),
4452 )
4453 .route("/sso/authorize", axum::routing::post(sso_authorize_handler))
4454 .route("/sso/callback", axum::routing::post(sso_callback_handler))
4455 .route("/audit/log", axum::routing::get(audit_log_handler))
4456 .route("/raft/status", axum::routing::get(raft_status_handler))
4458 .layer(axum::middleware::from_fn_with_state(
4459 Arc::clone(&hub),
4460 crate::audit::audit_middleware,
4461 ))
4462 .with_state(Arc::clone(&hub))
4463 .layer(set_request_id)
4464 .layer(propagate_request_id);
4465
4466 let listener = tokio::net::TcpListener::bind(addr).await?;
4467 tracing::info!("Suture Hub listening on {addr}");
4468
4469 let shutdown_tx = tokio::sync::broadcast::channel::<()>(1).0;
4470 let shutdown_tx_ctrlc = shutdown_tx.clone();
4471
4472 tokio::spawn(async move {
4473 tokio::signal::ctrl_c().await.ok();
4474 tracing::info!("received ctrl-c, initiating graceful shutdown");
4475 let _ = shutdown_tx_ctrlc.send(());
4476 });
4477
4478 #[cfg(feature = "raft-cluster")]
4480 {
4481 let raft_node = Arc::clone(&hub.raft_node);
4482 let hub_for_raft = Arc::clone(&hub);
4483 tokio::spawn(async move {
4484 let mut interval = tokio::time::interval(
4485 std::time::Duration::from_millis(10), );
4487 loop {
4488 interval.tick().await;
4489 let messages = {
4490 let mut raft = raft_node.lock().await;
4491 raft.tick()
4492 };
4493 if !messages.is_empty() {
4496 tracing::trace!(
4497 "[raft] tick produced {} messages",
4498 messages.len()
4499 );
4500 }
4501 let entries = hub_for_raft.raft_committed_entries().await;
4503 if !entries.is_empty() {
4504 let count = entries.len();
4505 let store = hub_for_raft.storage.read().await;
4506 for entry in &entries {
4507 if let Ok(cmd) = serde_json::from_slice::<serde_json::Value>(&entry.command) {
4510 let op = cmd.get("operation").and_then(|v| v.as_str()).unwrap_or("unknown");
4511 let table = cmd.get("table").and_then(|v| v.as_str()).unwrap_or("");
4512 let data = cmd.get("data").and_then(|v| v.as_str()).unwrap_or("");
4513 let _ = store.log_operation(op, table, &entry.index.to_string(), Some(data));
4514 }
4515 }
4516 drop(store);
4517 hub_for_raft.raft_advance_applied(count).await;
4518 }
4519 }
4520 });
4521 }
4522
4523 let server = axum::serve(
4524 listener,
4525 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4526 )
4527 .with_graceful_shutdown(async move {
4528 let mut rx = shutdown_tx.subscribe();
4529 let _ = rx.recv().await;
4530 hub.shutdown();
4531 });
4532
4533 server.await?;
4534 Ok(())
4535}
4536
4537#[cfg(test)]
4538mod tests {
4539 use super::*;
4540 use sha2::Digest;
4541 async fn create_test_user_hub(
4542 hub: &SutureHubServer,
4543 username: &str,
4544 display_name: &str,
4545 role: &str,
4546 ) -> String {
4547 let api_token = generate_api_token();
4548 let store = hub.storage.write().await;
4549 store
4550 .create_user(username, display_name, role, &api_token)
4551 .unwrap();
4552 api_token
4553 }
4554
4555 fn make_auth_header_val(token: &str) -> String {
4556 format!("Bearer {}", token)
4557 }
4558
4559 fn make_hash_proto(hex: &str) -> HashProto {
4560 HashProto {
4561 value: hex.to_string(),
4562 }
4563 }
4564
4565 fn make_patch(hex: &str, op: &str, parents: &[String], author: &str) -> PatchProto {
4566 PatchProto {
4567 id: make_hash_proto(hex),
4568 operation_type: op.to_string(),
4569 touch_set: vec!["f".to_string()],
4570 target_path: Some("f".to_string()),
4571 payload: String::new(),
4572 parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
4573 author: author.to_string(),
4574 message: format!("patch {}", hex),
4575 timestamp: 0,
4576 }
4577 }
4578
4579 fn make_branch(name: &str, target: &str) -> BranchProto {
4580 BranchProto {
4581 name: name.to_string(),
4582 target_id: make_hash_proto(target),
4583 }
4584 }
4585
4586 async fn start_test_hub() -> (Arc<SutureHubServer>, u16, String) {
4587 let hub = Arc::new(SutureHubServer::new_in_memory());
4588
4589 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4590 let port = listener.local_addr().unwrap().port();
4591 let base = format!("http://127.0.0.1:{}", port);
4592
4593 let app = axum::Router::new()
4594 .route("/", axum::routing::get(serve_index))
4595 .route("/push", axum::routing::post(push_handler))
4596 .route(
4597 "/push/compressed",
4598 axum::routing::post(push_compressed_handler),
4599 )
4600 .route("/pull", axum::routing::post(pull_handler))
4601 .route(
4602 "/pull/compressed",
4603 axum::routing::post(pull_compressed_handler),
4604 )
4605 .route("/repos", axum::routing::get(list_repos_handler))
4606 .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4607 .route(
4608 "/repos/{repo_id}/branches",
4609 axum::routing::get(repo_branches_handler),
4610 )
4611 .route(
4612 "/repos/{repo_id}/patches",
4613 axum::routing::get(repo_patches_handler),
4614 )
4615 .route("/handshake", axum::routing::get(handshake_get_handler))
4616 .route("/handshake", axum::routing::post(handshake_handler))
4617 .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4618 .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4619 .route("/v2/pull", axum::routing::post(v2_pull_handler))
4620 .route("/v2/push", axum::routing::post(v2_push_handler))
4621 .route("/auth/token", axum::routing::post(create_token_handler))
4622 .route("/auth/verify", axum::routing::post(verify_token_handler))
4623 .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4624 .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4625 .route(
4626 "/mirror/status",
4627 axum::routing::get(mirror_status_get_handler),
4628 )
4629 .route("/mirror/status", axum::routing::post(mirror_status_handler))
4630 .route(
4631 "/repos/{repo_id}/protect/{branch}",
4632 axum::routing::post(protect_branch_handler),
4633 )
4634 .route(
4635 "/repos/{repo_id}/unprotect/{branch}",
4636 axum::routing::post(unprotect_branch_handler),
4637 )
4638 .route("/auth/register", axum::routing::post(register_handler))
4639 .route("/users", axum::routing::get(list_users_handler))
4640 .route("/users/{username}", axum::routing::get(get_user_handler))
4641 .route(
4642 "/users/{username}/role",
4643 axum::routing::patch(update_role_handler),
4644 )
4645 .route(
4646 "/users/{username}",
4647 axum::routing::delete(delete_user_handler),
4648 )
4649 .route("/static/{*path}", axum::routing::get(serve_static_file))
4650 .route("/replication/peers", axum::routing::post(add_peer_handler))
4651 .route("/replication/peers", axum::routing::get(list_peers_handler))
4652 .route(
4653 "/replication/peers/{id}",
4654 axum::routing::delete(remove_peer_handler),
4655 )
4656 .route(
4657 "/replication/status",
4658 axum::routing::get(replication_status_handler),
4659 )
4660 .route(
4661 "/replication/sync",
4662 axum::routing::post(replication_sync_handler),
4663 )
4664 .route("/repos", axum::routing::post(create_repo_handler))
4665 .route(
4666 "/repos/{repo_id}",
4667 axum::routing::delete(delete_repo_handler),
4668 )
4669 .route(
4670 "/repos/{repo_id}/branches",
4671 axum::routing::post(create_branch_handler),
4672 )
4673 .route(
4674 "/repos/{repo_id}/branches/{branch}",
4675 axum::routing::delete(delete_branch_handler),
4676 )
4677 .route(
4678 "/repos/{repo_id}/blobs/{hash}",
4679 axum::routing::get(get_blob_handler),
4680 )
4681 .route(
4682 "/repos/{repo_id}/tree/{branch}",
4683 axum::routing::get(repo_tree_handler),
4684 )
4685 .route("/auth/login", axum::routing::post(login_handler))
4686 .route("/search", axum::routing::get(search_handler))
4687 .route("/activity", axum::routing::get(activity_handler))
4688 .route(
4689 "/mirrors/{id}",
4690 axum::routing::delete(delete_mirror_handler),
4691 )
4692 .route(
4693 "/webhooks/{repo_id}",
4694 axum::routing::post(create_webhook_handler),
4695 )
4696 .route(
4697 "/webhooks/{repo_id}",
4698 axum::routing::get(list_webhooks_handler),
4699 )
4700 .route(
4701 "/webhooks/{repo_id}/{id}",
4702 axum::routing::delete(delete_webhook_handler),
4703 )
4704 .route(
4705 "/repos/{repo_id}/patches/batch",
4706 axum::routing::post(batch_push_handler),
4707 )
4708 .with_state(Arc::clone(&hub));
4709
4710 tokio::spawn(async move {
4711 axum::serve(
4712 listener,
4713 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4714 )
4715 .await
4716 .unwrap();
4717 });
4718
4719 for _ in 0..50 {
4720 if reqwest::Client::new()
4721 .get(format!("{}/repos", &base))
4722 .send()
4723 .await
4724 .is_ok()
4725 {
4726 return (hub, port, base);
4727 }
4728 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4729 }
4730 panic!("test server did not start in time");
4731 }
4732
4733 async fn start_test_hub_with_lfs(
4734 hub: Arc<SutureHubServer>,
4735 ) -> (Arc<SutureHubServer>, u16, String) {
4736 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4737 let port = listener.local_addr().unwrap().port();
4738 let base = format!("http://127.0.0.1:{}", port);
4739
4740 let app = axum::Router::new()
4741 .route("/", axum::routing::get(serve_index))
4742 .route("/push", axum::routing::post(push_handler))
4743 .route(
4744 "/push/compressed",
4745 axum::routing::post(push_compressed_handler),
4746 )
4747 .route("/pull", axum::routing::post(pull_handler))
4748 .route(
4749 "/pull/compressed",
4750 axum::routing::post(pull_compressed_handler),
4751 )
4752 .route("/repos", axum::routing::get(list_repos_handler))
4753 .route("/repo/{repo_id}", axum::routing::get(repo_info_handler))
4754 .route(
4755 "/repos/{repo_id}/branches",
4756 axum::routing::get(repo_branches_handler),
4757 )
4758 .route(
4759 "/repos/{repo_id}/patches",
4760 axum::routing::get(repo_patches_handler),
4761 )
4762 .route("/handshake", axum::routing::get(handshake_get_handler))
4763 .route("/handshake", axum::routing::post(handshake_handler))
4764 .route("/v2/handshake", axum::routing::get(handshake_v2_handler))
4765 .route("/v2/handshake", axum::routing::post(handshake_v2_handler))
4766 .route("/v2/pull", axum::routing::post(v2_pull_handler))
4767 .route("/v2/push", axum::routing::post(v2_push_handler))
4768 .route("/auth/token", axum::routing::post(create_token_handler))
4769 .route("/auth/verify", axum::routing::post(verify_token_handler))
4770 .route("/mirror/setup", axum::routing::post(mirror_setup_handler))
4771 .route("/mirror/sync", axum::routing::post(mirror_sync_handler))
4772 .route(
4773 "/mirror/status",
4774 axum::routing::get(mirror_status_get_handler),
4775 )
4776 .route("/mirror/status", axum::routing::post(mirror_status_handler))
4777 .route(
4778 "/repos/{repo_id}/protect/{branch}",
4779 axum::routing::post(protect_branch_handler),
4780 )
4781 .route(
4782 "/repos/{repo_id}/unprotect/{branch}",
4783 axum::routing::post(unprotect_branch_handler),
4784 )
4785 .route("/auth/register", axum::routing::post(register_handler))
4786 .route("/users", axum::routing::get(list_users_handler))
4787 .route("/users/{username}", axum::routing::get(get_user_handler))
4788 .route(
4789 "/users/{username}/role",
4790 axum::routing::patch(update_role_handler),
4791 )
4792 .route(
4793 "/users/{username}",
4794 axum::routing::delete(delete_user_handler),
4795 )
4796 .route("/static/{*path}", axum::routing::get(serve_static_file))
4797 .route("/replication/peers", axum::routing::post(add_peer_handler))
4798 .route("/replication/peers", axum::routing::get(list_peers_handler))
4799 .route(
4800 "/replication/peers/{id}",
4801 axum::routing::delete(remove_peer_handler),
4802 )
4803 .route(
4804 "/replication/status",
4805 axum::routing::get(replication_status_handler),
4806 )
4807 .route(
4808 "/replication/sync",
4809 axum::routing::post(replication_sync_handler),
4810 )
4811 .route("/repos", axum::routing::post(create_repo_handler))
4812 .route(
4813 "/repos/{repo_id}",
4814 axum::routing::delete(delete_repo_handler),
4815 )
4816 .route(
4817 "/repos/{repo_id}/branches",
4818 axum::routing::post(create_branch_handler),
4819 )
4820 .route(
4821 "/repos/{repo_id}/branches/{branch}",
4822 axum::routing::delete(delete_branch_handler),
4823 )
4824 .route(
4825 "/repos/{repo_id}/blobs/{hash}",
4826 axum::routing::get(get_blob_handler),
4827 )
4828 .route(
4829 "/repos/{repo_id}/tree/{branch}",
4830 axum::routing::get(repo_tree_handler),
4831 )
4832 .route("/auth/login", axum::routing::post(login_handler))
4833 .route("/search", axum::routing::get(search_handler))
4834 .route("/activity", axum::routing::get(activity_handler))
4835 .route(
4836 "/mirrors/{id}",
4837 axum::routing::delete(delete_mirror_handler),
4838 )
4839 .route(
4840 "/webhooks/{repo_id}",
4841 axum::routing::post(create_webhook_handler),
4842 )
4843 .route(
4844 "/webhooks/{repo_id}",
4845 axum::routing::get(list_webhooks_handler),
4846 )
4847 .route(
4848 "/webhooks/{repo_id}/{id}",
4849 axum::routing::delete(delete_webhook_handler),
4850 )
4851 .route(
4852 "/repos/{repo_id}/patches/batch",
4853 axum::routing::post(batch_push_handler),
4854 )
4855 .route("/lfs/batch", axum::routing::post(lfs_batch_handler))
4856 .route(
4857 "/lfs/objects/{repo_id}/{oid}",
4858 axum::routing::put(lfs_upload_handler),
4859 )
4860 .route(
4861 "/lfs/objects/{repo_id}/{oid}",
4862 axum::routing::get(lfs_download_handler),
4863 )
4864 .with_state(Arc::clone(&hub));
4865
4866 tokio::spawn(async move {
4867 axum::serve(
4868 listener,
4869 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
4870 )
4871 .await
4872 .unwrap();
4873 });
4874
4875 for _ in 0..50 {
4876 if reqwest::Client::new()
4877 .get(format!("{}/repos", &base))
4878 .send()
4879 .await
4880 .is_ok()
4881 {
4882 return (hub, port, base);
4883 }
4884 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4885 }
4886 panic!("test server did not start in time");
4887 }
4888
4889 async fn start_test_hub_auth() -> (Arc<SutureHubServer>, u16, String) {
4890 let (hub, port, base) = start_test_hub().await;
4891 create_test_user_hub(&hub, "test-admin", "Test Admin", "admin").await;
4893 (hub, port, base)
4894 }
4895
4896 fn post_json(
4897 client: &reqwest::Client,
4898 url: &str,
4899 body: &serde_json::Value,
4900 ) -> reqwest::RequestBuilder {
4901 client
4902 .post(url)
4903 .header("Content-Type", "application/json")
4904 .body(body.to_string())
4905 }
4906
4907 fn patch_json(
4908 client: &reqwest::Client,
4909 url: &str,
4910 body: &serde_json::Value,
4911 ) -> reqwest::RequestBuilder {
4912 client
4913 .patch(url)
4914 .header("Content-Type", "application/json")
4915 .body(body.to_string())
4916 }
4917
4918 #[tokio::test]
4919 async fn test_http_index() {
4920 let (_hub, _port, base) = start_test_hub().await;
4921 let client = reqwest::Client::new();
4922 let resp = client.get(format!("{}/", &base)).send().await.unwrap();
4923 assert_eq!(resp.status(), 200);
4924 let body = resp.text().await.unwrap();
4925 assert!(body.contains("Suture Hub"));
4926 }
4927
4928 #[tokio::test]
4929 async fn test_http_handshake() {
4930 let (_hub, _port, base) = start_test_hub().await;
4931 let client = reqwest::Client::new();
4932 let resp = client
4933 .post(format!("{}/handshake", &base))
4934 .json(&serde_json::json!({"client_version": 1, "client_name": "test"}))
4935 .send()
4936 .await
4937 .unwrap();
4938 assert_eq!(resp.status(), 200);
4939 let data: serde_json::Value = resp.json().await.unwrap();
4940 assert_eq!(data["server_version"], 1);
4941 assert_eq!(data["compatible"], true);
4942 }
4943
4944 #[tokio::test]
4945 async fn test_http_repos_empty_and_populated() {
4946 let (hub, _port, base) = start_test_hub().await;
4947 let client = reqwest::Client::new();
4948
4949 let resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
4950 assert_eq!(resp.status(), 200);
4951 let data: serde_json::Value = resp.json().await.unwrap();
4952 assert_eq!(data["repo_ids"].as_array().unwrap().len(), 0);
4953
4954 let a_hex = "a".repeat(64);
4955 let push_body = serde_json::json!({
4956 "repo_id": "http-repo",
4957 "patches": [{
4958 "id": {"value": &a_hex},
4959 "operation_type": "Create",
4960 "touch_set": ["f"],
4961 "target_path": "f",
4962 "payload": "",
4963 "parent_ids": [],
4964 "author": "alice",
4965 "message": "p",
4966 "timestamp": 0
4967 }],
4968 "branches": [],
4969 "blobs": []
4970 });
4971 let push_resp = client
4972 .post(format!("{}/push", &base))
4973 .json(&push_body)
4974 .send()
4975 .await
4976 .unwrap();
4977 assert_eq!(push_resp.status(), 200);
4978
4979 let resp2 = client.get(format!("{}/repos", &base)).send().await.unwrap();
4980 let data2: serde_json::Value = resp2.json().await.unwrap();
4981 assert_eq!(data2["repo_ids"].as_array().unwrap().len(), 1);
4982
4983 drop(hub);
4984 }
4985
4986 #[tokio::test]
4987 async fn test_http_repo_info() {
4988 let (hub, _port, base) = start_test_hub().await;
4989 let client = reqwest::Client::new();
4990
4991 let resp = client
4992 .get(format!("{}/repo/nonexistent", &base))
4993 .send()
4994 .await
4995 .unwrap();
4996 assert_eq!(resp.status(), 404);
4997
4998 let a_hex = "a".repeat(64);
4999 hub.handle_push(PushRequest {
5000 repo_id: "info-repo".to_string(),
5001 patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5002 branches: vec![make_branch("main", &a_hex)],
5003 blobs: vec![],
5004 signature: None,
5005 known_branches: None,
5006 force: false,
5007 })
5008 .await
5009 .unwrap();
5010
5011 let resp2 = client
5012 .get(format!("{}/repo/info-repo", &base))
5013 .send()
5014 .await
5015 .unwrap();
5016 assert_eq!(resp2.status(), 200);
5017 let data: serde_json::Value = resp2.json().await.unwrap();
5018 assert_eq!(data["patch_count"], 1);
5019 assert_eq!(data["branches"].as_array().unwrap().len(), 1);
5020 }
5021
5022 #[tokio::test]
5023 async fn test_http_repo_branches() {
5024 let (hub, _port, base) = start_test_hub().await;
5025 let client = reqwest::Client::new();
5026 let a_hex = "a".repeat(64);
5027
5028 hub.handle_push(PushRequest {
5029 repo_id: "branch-repo".to_string(),
5030 patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5031 branches: vec![make_branch("main", &a_hex), make_branch("dev", &a_hex)],
5032 blobs: vec![],
5033 signature: None,
5034 known_branches: None,
5035 force: false,
5036 })
5037 .await
5038 .unwrap();
5039
5040 let resp = client
5041 .get(format!("{}/repos/branch-repo/branches", &base))
5042 .send()
5043 .await
5044 .unwrap();
5045 assert_eq!(resp.status(), 200);
5046 let data: serde_json::Value = resp.json().await.unwrap();
5047 assert_eq!(data.as_array().unwrap().len(), 2);
5048 }
5049
5050 #[tokio::test]
5051 async fn test_http_repo_patches() {
5052 let (hub, _port, base) = start_test_hub().await;
5053 let client = reqwest::Client::new();
5054
5055 for i in 0..3u32 {
5056 let hex = format!("{:064x}", i);
5057 let parents: Vec<String> = if i > 0 {
5058 vec![format!("{:064x}", i - 1)]
5059 } else {
5060 vec![]
5061 };
5062 hub.handle_push(PushRequest {
5063 repo_id: "patch-repo".to_string(),
5064 patches: vec![PatchProto {
5065 id: make_hash_proto(&hex),
5066 operation_type: "Create".to_string(),
5067 touch_set: vec![format!("f{i}")],
5068 target_path: Some(format!("f{i}")),
5069 payload: String::new(),
5070 parent_ids: parents.iter().map(|p| make_hash_proto(p)).collect(),
5071 author: "alice".to_string(),
5072 message: format!("p{i}"),
5073 timestamp: 0,
5074 }],
5075 branches: vec![],
5076 blobs: vec![],
5077 signature: None,
5078 known_branches: None,
5079 force: false,
5080 })
5081 .await
5082 .unwrap();
5083 }
5084
5085 let resp = client
5086 .get(format!(
5087 "{}/repos/patch-repo/patches?offset=1&limit=1",
5088 &base
5089 ))
5090 .send()
5091 .await
5092 .unwrap();
5093 assert_eq!(resp.status(), 200);
5094 let data: serde_json::Value = resp.json().await.unwrap();
5095 assert_eq!(data["patches"].as_array().unwrap().len(), 1);
5096 assert!(!data["next_cursor"].as_str().unwrap().is_empty());
5097
5098 let resp2 = client
5099 .get(format!(
5100 "{}/repos/patch-repo/patches?limit=1&cursor={}",
5101 &base,
5102 data["next_cursor"].as_str().unwrap()
5103 ))
5104 .send()
5105 .await
5106 .unwrap();
5107 assert_eq!(resp2.status(), 200);
5108 let data2: serde_json::Value = resp2.json().await.unwrap();
5109 assert_eq!(data2["patches"].as_array().unwrap().len(), 1);
5110
5111 let resp3 = client
5112 .get(format!("{}/repos/patch-repo/patches?limit=50", &base,))
5113 .send()
5114 .await
5115 .unwrap();
5116 assert_eq!(resp3.status(), 200);
5117 let data3: serde_json::Value = resp3.json().await.unwrap();
5118 assert_eq!(data3["patches"].as_array().unwrap().len(), 3);
5119 assert!(data3["next_cursor"].as_str().unwrap().is_empty());
5120 }
5121
5122 #[tokio::test]
5123 async fn test_http_push_pull_roundtrip() {
5124 let (_hub, _port, base) = start_test_hub().await;
5125 let client = reqwest::Client::new();
5126 let a_hex = "a".repeat(64);
5127 let b_hex = "b".repeat(64);
5128
5129 let push_body = serde_json::json!({
5130 "repo_id": "roundtrip-repo",
5131 "patches": [
5132 {
5133 "id": {"value": &a_hex},
5134 "operation_type": "Create",
5135 "touch_set": ["file_a"],
5136 "target_path": "file_a",
5137 "payload": "",
5138 "parent_ids": [],
5139 "author": "alice",
5140 "message": "first patch",
5141 "timestamp": 100
5142 },
5143 {
5144 "id": {"value": &b_hex},
5145 "operation_type": "Modify",
5146 "touch_set": ["file_a"],
5147 "target_path": "file_a",
5148 "payload": "",
5149 "parent_ids": [{"value": &a_hex}],
5150 "author": "bob",
5151 "message": "second patch",
5152 "timestamp": 200
5153 }
5154 ],
5155 "branches": [{"name": "main", "target_id": {"value": &b_hex}}],
5156 "blobs": []
5157 });
5158
5159 let push_resp = client
5160 .post(format!("{}/push", &base))
5161 .json(&push_body)
5162 .send()
5163 .await
5164 .unwrap();
5165 assert_eq!(push_resp.status(), 200);
5166 let push_data: serde_json::Value = push_resp.json().await.unwrap();
5167 assert_eq!(push_data["success"], true);
5168
5169 let pull_resp = client
5170 .post(format!("{}/pull", &base))
5171 .json(&serde_json::json!({
5172 "repo_id": "roundtrip-repo",
5173 "known_branches": [],
5174 "max_depth": null
5175 }))
5176 .send()
5177 .await
5178 .unwrap();
5179 assert_eq!(pull_resp.status(), 200);
5180 let pull_data: serde_json::Value = pull_resp.json().await.unwrap();
5181 assert_eq!(pull_data["success"], true);
5182 assert_eq!(pull_data["patches"].as_array().unwrap().len(), 2);
5183 assert_eq!(pull_data["branches"].as_array().unwrap().len(), 1);
5184 }
5185
5186 #[tokio::test]
5187 async fn test_http_push_compressed() {
5188 let (_hub, _port, base) = start_test_hub().await;
5189 let client = reqwest::Client::new();
5190 let a_hex = "a".repeat(64);
5191 let blob_data = b"compressed test data";
5192 let blob_hash = "cafebabe".repeat(8);
5193 let compressed = suture_protocol::compress(blob_data).unwrap();
5194
5195 let push_body = serde_json::json!({
5196 "repo_id": "comp-repo",
5197 "patches": [{
5198 "id": {"value": &a_hex},
5199 "operation_type": "Create",
5200 "touch_set": ["f"],
5201 "target_path": "f",
5202 "payload": &blob_hash,
5203 "parent_ids": [],
5204 "author": "alice",
5205 "message": "p",
5206 "timestamp": 0
5207 }],
5208 "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5209 "blobs": [{"hash": {"value": &blob_hash}, "data": base64_encode(&compressed)}]
5210 });
5211
5212 let resp = client
5213 .post(format!("{}/push/compressed", &base))
5214 .json(&push_body)
5215 .send()
5216 .await
5217 .unwrap();
5218 assert_eq!(resp.status(), 200);
5219 let data: serde_json::Value = resp.json().await.unwrap();
5220 assert_eq!(data["success"], true);
5221 }
5222
5223 #[tokio::test]
5224 async fn test_http_v2_handshake() {
5225 let (_hub, _port, base) = start_test_hub().await;
5226 let client = reqwest::Client::new();
5227
5228 let resp = client
5229 .post(format!("{}/v2/handshake", &base))
5230 .json(&serde_json::json!({
5231 "client_version": 2,
5232 "client_name": "test-v2",
5233 "capabilities": {
5234 "supports_delta": true,
5235 "supports_compression": false,
5236 "max_blob_size": 0
5237 }
5238 }))
5239 .send()
5240 .await
5241 .unwrap();
5242 assert_eq!(resp.status(), 200);
5243 let data: serde_json::Value = resp.json().await.unwrap();
5244 assert_eq!(data["server_version"], 2);
5245 assert_eq!(data["compatible"], true);
5246 assert_eq!(data["server_capabilities"]["supports_delta"], true);
5247 }
5248
5249 #[tokio::test]
5250 async fn test_http_v2_push_pull() {
5251 let (_hub, _port, base) = start_test_hub().await;
5252 let client = reqwest::Client::new();
5253 let a_hex = "a".repeat(64);
5254 let f_hash = blake3::hash(b"f").to_hex().to_string();
5255
5256 let push_body = serde_json::json!({
5257 "repo_id": "v2-repo",
5258 "patches": [{
5259 "id": {"value": &a_hex},
5260 "operation_type": "Create",
5261 "touch_set": ["f"],
5262 "target_path": "f",
5263 "payload": "hello world",
5264 "parent_ids": [],
5265 "author": "alice",
5266 "message": "v2 patch",
5267 "timestamp": 0
5268 }],
5269 "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5270 "blobs": [{"hash": {"value": &f_hash}, "data": "aGVsbG8gd29ybGQ="}],
5271 "deltas": [],
5272 "signature": null,
5273 "known_branches": null,
5274 "force": false
5275 });
5276
5277 let push_resp = post_json(&client, &format!("{}/v2/push", &base), &push_body)
5278 .send()
5279 .await
5280 .unwrap();
5281 assert_eq!(
5282 push_resp.status(),
5283 200,
5284 "V2 push failed: {}",
5285 push_resp.status()
5286 );
5287
5288 let pull_body = serde_json::json!({
5289 "repo_id": "v2-repo",
5290 "known_branches": [],
5291 "max_depth": null,
5292 "known_blob_hashes": [],
5293 "capabilities": {
5294 "supports_delta": false,
5295 "supports_compression": false,
5296 "max_blob_size": 0
5297 }
5298 });
5299
5300 let pull_resp = post_json(&client, &format!("{}/v2/pull", &base), &pull_body)
5301 .send()
5302 .await
5303 .unwrap();
5304 assert_eq!(pull_resp.status(), 200);
5305 let pull_data: serde_json::Value = pull_resp.json().await.unwrap();
5306 assert_eq!(pull_data["success"], true);
5307 assert_eq!(pull_data["patches"].as_array().unwrap().len(), 1);
5308 assert_eq!(pull_data["protocol_version"], 2);
5309 }
5310
5311 #[tokio::test]
5312 async fn test_http_auth_token_bootstrap() {
5313 let (_hub, _port, base) = start_test_hub().await;
5314 let client = reqwest::Client::new();
5315
5316 let resp = client
5317 .post(format!("{}/auth/token", &base))
5318 .send()
5319 .await
5320 .unwrap();
5321 assert_eq!(resp.status(), 200);
5322 let data: serde_json::Value = resp.json().await.unwrap();
5323 assert!(!data["token"].as_str().unwrap().is_empty());
5324 assert!(data["created_at"].as_u64().unwrap() > 0);
5325 }
5326
5327 #[tokio::test]
5328 async fn test_http_auth_verify() {
5329 let (_hub, _port, base) = start_test_hub().await;
5330 let client = reqwest::Client::new();
5331
5332 let token_resp = client
5333 .post(format!("{}/auth/token", &base))
5334 .send()
5335 .await
5336 .unwrap();
5337 let token_data: serde_json::Value = token_resp.json().await.unwrap();
5338 let token = token_data["token"].as_str().unwrap().to_string();
5339
5340 let verify_resp = client
5341 .post(format!("{}/auth/verify", &base))
5342 .json(&serde_json::json!({
5343 "method": {"Token": &token},
5344 "timestamp": 0
5345 }))
5346 .send()
5347 .await
5348 .unwrap();
5349 assert_eq!(verify_resp.status(), 200);
5350 let verify_data: serde_json::Value = verify_resp.json().await.unwrap();
5351 assert_eq!(verify_data["valid"], true);
5352
5353 let bad_resp = client
5354 .post(format!("{}/auth/verify", &base))
5355 .json(&serde_json::json!({
5356 "method": {"Token": "invalid-token-xyz"},
5357 "timestamp": 0
5358 }))
5359 .send()
5360 .await
5361 .unwrap();
5362 let bad_data: serde_json::Value = bad_resp.json().await.unwrap();
5363 assert_eq!(bad_data["valid"], false);
5364 }
5365
5366 #[tokio::test]
5367 async fn test_http_auth_register() {
5368 let (hub, _port, base) = start_test_hub_auth().await;
5369 let client = reqwest::Client::new();
5370 let admin_token = create_test_user_hub(&hub, "http-admin", "HTTP Admin", "admin").await;
5371
5372 let resp = post_json(
5373 &client,
5374 &format!("{}/auth/register", &base),
5375 &serde_json::json!({
5376 "username": "new-http-user",
5377 "display_name": "New HTTP User",
5378 "role": "member"
5379 }),
5380 )
5381 .header("Authorization", make_auth_header_val(&admin_token))
5382 .send()
5383 .await
5384 .unwrap();
5385 assert_eq!(resp.status(), 201);
5386 let data: serde_json::Value = resp.json().await.unwrap();
5387 assert_eq!(data["success"], true);
5388 assert_eq!(data["user"]["username"], "new-http-user");
5389 }
5390
5391 #[tokio::test]
5392 async fn test_http_users_list() {
5393 let (hub, _port, base) = start_test_hub_auth().await;
5394 let client = reqwest::Client::new();
5395 let admin_token = create_test_user_hub(&hub, "ul-admin", "UL Admin", "admin").await;
5396
5397 let resp = client
5398 .get(format!("{}/users", &base))
5399 .header("Authorization", make_auth_header_val(&admin_token))
5400 .send()
5401 .await
5402 .unwrap();
5403 assert_eq!(resp.status(), 200);
5404 let data: serde_json::Value = resp.json().await.unwrap();
5405 assert_eq!(data["success"], true);
5406 assert!(!data["users"].as_array().unwrap().is_empty());
5407 }
5408
5409 #[tokio::test]
5410 async fn test_http_user_crud() {
5411 let (hub, _port, base) = start_test_hub_auth().await;
5412 let client = reqwest::Client::new();
5413 let admin_token = create_test_user_hub(&hub, "crud-admin", "CRUD Admin", "admin").await;
5414 create_test_user_hub(&hub, "crud-target", "CRUD Target", "reader").await;
5415 let auth = make_auth_header_val(&admin_token);
5416
5417 let get_resp = client
5418 .get(format!("{}/users/crud-target", &base))
5419 .header("Authorization", &auth)
5420 .send()
5421 .await
5422 .unwrap();
5423 assert_eq!(get_resp.status(), 200);
5424 let get_data: serde_json::Value = get_resp.json().await.unwrap();
5425 assert_eq!(get_data["user"]["username"], "crud-target");
5426 assert_eq!(get_data["user"]["role"], "reader");
5427
5428 let patch_resp = patch_json(
5429 &client,
5430 &format!("{}/users/crud-target/role", &base),
5431 &serde_json::json!({"role": "admin"}),
5432 )
5433 .header("Authorization", &auth)
5434 .send()
5435 .await
5436 .unwrap();
5437 assert_eq!(patch_resp.status(), 200);
5438 let patch_data: serde_json::Value = patch_resp.json().await.unwrap();
5439 assert_eq!(patch_data["success"], true);
5440
5441 let del_resp = client
5442 .delete(format!("{}/users/crud-target", &base))
5443 .header("Authorization", &auth)
5444 .send()
5445 .await
5446 .unwrap();
5447 assert_eq!(del_resp.status(), 200);
5448 let del_data: serde_json::Value = del_resp.json().await.unwrap();
5449 assert_eq!(del_data["success"], true);
5450 }
5451
5452 #[tokio::test]
5453 async fn test_http_mirror_setup_and_status() {
5454 let (_hub, _port, base) = start_test_hub().await;
5455 let client = reqwest::Client::new();
5456
5457 let setup_resp = post_json(
5458 &client,
5459 &format!("{}/mirror/setup", &base),
5460 &serde_json::json!({
5461 "repo_name": "mirrored",
5462 "upstream_url": "http://example.com",
5463 "upstream_repo": "upstream/repo"
5464 }),
5465 )
5466 .send()
5467 .await
5468 .unwrap();
5469 assert_eq!(setup_resp.status(), 200);
5470 let setup_data: serde_json::Value = setup_resp.json().await.unwrap();
5471 assert_eq!(setup_data["success"], true);
5472
5473 let status_resp = post_json(
5474 &client,
5475 &format!("{}/mirror/status", &base),
5476 &serde_json::json!({}),
5477 )
5478 .send()
5479 .await
5480 .unwrap();
5481 assert_eq!(status_resp.status(), 200);
5482 let status_data: serde_json::Value = status_resp.json().await.unwrap();
5483 assert_eq!(status_data["mirrors"].as_array().unwrap().len(), 1);
5484 }
5485
5486 #[tokio::test]
5487 async fn test_http_replication() {
5488 let (hub, _port, base) = start_test_hub().await;
5489 let client = reqwest::Client::new();
5490 hub.set_replication_role("leader");
5491
5492 let add_resp = client
5493 .post(format!("{}/replication/peers", &base))
5494 .json(&serde_json::json!({
5495 "peer_url": "http://follower1:8080",
5496 "role": "follower"
5497 }))
5498 .send()
5499 .await
5500 .unwrap();
5501 assert_eq!(add_resp.status(), 200);
5502 let add_data: serde_json::Value = add_resp.json().await.unwrap();
5503 assert_eq!(add_data["success"], true);
5504
5505 let list_resp = client
5506 .get(format!("{}/replication/peers", &base))
5507 .send()
5508 .await
5509 .unwrap();
5510 assert_eq!(list_resp.status(), 200);
5511 let list_data: serde_json::Value = list_resp.json().await.unwrap();
5512 assert_eq!(list_data["peers"].as_array().unwrap().len(), 1);
5513
5514 let status_resp = client
5515 .get(format!("{}/replication/status", &base))
5516 .send()
5517 .await
5518 .unwrap();
5519 assert_eq!(status_resp.status(), 200);
5520 let status_data: serde_json::Value = status_resp.json().await.unwrap();
5521 assert_eq!(status_data["status"]["peer_count"], 1);
5522 }
5523
5524 #[tokio::test]
5525 async fn test_http_branch_protection() {
5526 let (_hub, _port, base) = start_test_hub().await;
5527 let client = reqwest::Client::new();
5528
5529 let protect_resp = client
5530 .post(format!("{}/repos/prot-repo/protect/main", &base))
5531 .send()
5532 .await
5533 .unwrap();
5534 assert_eq!(protect_resp.status(), 200);
5535 let protect_data: serde_json::Value = protect_resp.json().await.unwrap();
5536 assert_eq!(protect_data["success"], true);
5537
5538 let unprotect_resp = client
5539 .post(format!("{}/repos/prot-repo/unprotect/main", &base))
5540 .send()
5541 .await
5542 .unwrap();
5543 assert_eq!(unprotect_resp.status(), 200);
5544 let unprotect_data: serde_json::Value = unprotect_resp.json().await.unwrap();
5545 assert_eq!(unprotect_data["success"], true);
5546 }
5547
5548 #[tokio::test]
5549 async fn test_http_404_unknown_route() {
5550 let (_hub, _port, base) = start_test_hub().await;
5551 let client = reqwest::Client::new();
5552
5553 let resp = client
5554 .get(format!("{}/nonexistent", &base))
5555 .send()
5556 .await
5557 .unwrap();
5558 assert_eq!(resp.status(), 404);
5559 }
5560
5561 #[tokio::test]
5564 async fn test_http_create_repo() {
5565 let (hub, _port, base) = start_test_hub_auth().await;
5566 let client = reqwest::Client::new();
5567 let admin_token = create_test_user_hub(&hub, "repo-admin", "Repo Admin", "admin").await;
5568
5569 {
5571 let store = hub.storage.write().await;
5572 store
5573 .store_token(&admin_token, 1000, "test token", i64::MAX)
5574 .unwrap();
5575 }
5576
5577 let resp = post_json(
5579 &client,
5580 &format!("{}/repos", &base),
5581 &serde_json::json!({
5582 "repo_id": "new-repo"
5583 }),
5584 )
5585 .header("Authorization", make_auth_header_val(&admin_token))
5586 .send()
5587 .await
5588 .unwrap();
5589 assert_eq!(resp.status(), 201);
5590 let data: serde_json::Value = resp.json().await.unwrap();
5591 assert_eq!(data["success"], true);
5592 assert_eq!(data["repo_id"], "new-repo");
5593
5594 let list_resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
5596 let list_data: serde_json::Value = list_resp.json().await.unwrap();
5597 assert!(
5598 list_data["repo_ids"]
5599 .as_array()
5600 .unwrap()
5601 .contains(&serde_json::json!("new-repo"))
5602 );
5603
5604 let resp2 = post_json(
5606 &client,
5607 &format!("{}/repos", &base),
5608 &serde_json::json!({
5609 "repo_id": "new-repo"
5610 }),
5611 )
5612 .header("Authorization", make_auth_header_val(&admin_token))
5613 .send()
5614 .await
5615 .unwrap();
5616 assert_eq!(resp2.status(), 201);
5617
5618 let resp3 = post_json(
5620 &client,
5621 &format!("{}/repos", &base),
5622 &serde_json::json!({
5623 "repo_id": "noauth-repo"
5624 }),
5625 )
5626 .send()
5627 .await
5628 .unwrap();
5629 assert_eq!(resp3.status(), 401);
5630 }
5631
5632 #[tokio::test]
5633 async fn test_http_delete_repo() {
5634 let (hub, _port, base) = start_test_hub_auth().await;
5635 let client = reqwest::Client::new();
5636 let admin_token = create_test_user_hub(&hub, "del-admin", "Del Admin", "admin").await;
5637 let a_hex = "a".repeat(64);
5638
5639 hub.handle_push(PushRequest {
5641 repo_id: "delete-me".to_string(),
5642 patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5643 branches: vec![make_branch("main", &a_hex)],
5644 blobs: vec![],
5645 signature: None,
5646 known_branches: None,
5647 force: false,
5648 })
5649 .await
5650 .unwrap();
5651
5652 let list_resp = client.get(format!("{}/repos", &base)).send().await.unwrap();
5654 let list_data: serde_json::Value = list_resp.json().await.unwrap();
5655 assert!(
5656 list_data["repo_ids"]
5657 .as_array()
5658 .unwrap()
5659 .contains(&serde_json::json!("delete-me"))
5660 );
5661
5662 let del_resp = client
5664 .delete(format!("{}/repos/delete-me", &base))
5665 .header("Authorization", make_auth_header_val(&admin_token))
5666 .send()
5667 .await
5668 .unwrap();
5669 assert_eq!(del_resp.status(), 200);
5670 let del_data: serde_json::Value = del_resp.json().await.unwrap();
5671 assert_eq!(del_data["success"], true);
5672
5673 let list_resp2 = client.get(format!("{}/repos", &base)).send().await.unwrap();
5675 let list_data2: serde_json::Value = list_resp2.json().await.unwrap();
5676 assert!(
5677 !list_data2["repo_ids"]
5678 .as_array()
5679 .unwrap()
5680 .contains(&serde_json::json!("delete-me"))
5681 );
5682 }
5683
5684 #[tokio::test]
5685 async fn test_http_create_branch() {
5686 let (hub, _port, base) = start_test_hub_auth().await;
5687 let client = reqwest::Client::new();
5688 let admin_token = create_test_user_hub(&hub, "branch-admin", "Branch Admin", "admin").await;
5689 let a_hex = "a".repeat(64);
5690
5691 hub.handle_push(PushRequest {
5693 repo_id: "branch-repo-2".to_string(),
5694 patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5695 branches: vec![make_branch("main", &a_hex)],
5696 blobs: vec![],
5697 signature: None,
5698 known_branches: None,
5699 force: false,
5700 })
5701 .await
5702 .unwrap();
5703
5704 let resp = post_json(
5706 &client,
5707 &format!("{}/repos/branch-repo-2/branches", &base),
5708 &serde_json::json!({
5709 "name": "feature",
5710 "target": &a_hex
5711 }),
5712 )
5713 .header("Authorization", make_auth_header_val(&admin_token))
5714 .send()
5715 .await
5716 .unwrap();
5717 assert_eq!(resp.status(), 201);
5718 let data: serde_json::Value = resp.json().await.unwrap();
5719 assert_eq!(data["success"], true);
5720
5721 let br_resp = client
5723 .get(format!("{}/repos/branch-repo-2/branches", &base))
5724 .send()
5725 .await
5726 .unwrap();
5727 let br_data: serde_json::Value = br_resp.json().await.unwrap();
5728 assert_eq!(br_data.as_array().unwrap().len(), 2);
5729 }
5730
5731 #[tokio::test]
5732 async fn test_http_delete_branch() {
5733 let (hub, _port, base) = start_test_hub_auth().await;
5734 let client = reqwest::Client::new();
5735 let admin_token = create_test_user_hub(&hub, "delbr-admin", "DelBr Admin", "admin").await;
5736 let a_hex = "a".repeat(64);
5737
5738 hub.handle_push(PushRequest {
5740 repo_id: "delbr-repo".to_string(),
5741 patches: vec![make_patch(&a_hex, "Create", &[], "alice")],
5742 branches: vec![make_branch("main", &a_hex), make_branch("dev", &a_hex)],
5743 blobs: vec![],
5744 signature: None,
5745 known_branches: None,
5746 force: false,
5747 })
5748 .await
5749 .unwrap();
5750
5751 let resp = client
5753 .delete(format!("{}/repos/delbr-repo/branches/dev", &base))
5754 .header("Authorization", make_auth_header_val(&admin_token))
5755 .send()
5756 .await
5757 .unwrap();
5758 assert_eq!(resp.status(), 200);
5759 let data: serde_json::Value = resp.json().await.unwrap();
5760 assert_eq!(data["success"], true);
5761
5762 let br_resp = client
5764 .get(format!("{}/repos/delbr-repo/branches", &base))
5765 .send()
5766 .await
5767 .unwrap();
5768 let br_data: serde_json::Value = br_resp.json().await.unwrap();
5769 assert_eq!(br_data.as_array().unwrap().len(), 1);
5770 assert_eq!(br_data[0]["name"], "main");
5771 }
5772
5773 #[tokio::test]
5774 async fn test_http_get_blob() {
5775 let (_hub, _port, base) = start_test_hub().await;
5776 let client = reqwest::Client::new();
5777 let a_hex = "a".repeat(64);
5778 let f_hash = blake3::hash(b"hello blob").to_hex().to_string();
5779 let blob_bytes = b"hello blob";
5780 let compressed = suture_protocol::compress(blob_bytes).unwrap();
5781
5782 let push_body = serde_json::json!({
5784 "repo_id": "blob-repo",
5785 "patches": [{
5786 "id": {"value": &a_hex},
5787 "operation_type": "Create",
5788 "touch_set": ["f"],
5789 "target_path": "f",
5790 "payload": &f_hash,
5791 "parent_ids": [],
5792 "author": "alice",
5793 "message": "p",
5794 "timestamp": 0
5795 }],
5796 "branches": [{"name": "main", "target_id": {"value": &a_hex}}],
5797 "blobs": [{"hash": {"value": &f_hash}, "data": base64_encode(&compressed)}]
5798 });
5799 let push_resp = client
5800 .post(format!("{}/push/compressed", &base))
5801 .json(&push_body)
5802 .send()
5803 .await
5804 .unwrap();
5805 assert_eq!(push_resp.status(), 200);
5806
5807 let blob_resp = client
5809 .get(format!("{}/repos/blob-repo/blobs/{}", &base, &f_hash))
5810 .send()
5811 .await
5812 .unwrap();
5813 assert_eq!(blob_resp.status(), 200);
5814 let blob_data: serde_json::Value = blob_resp.json().await.unwrap();
5815 assert_eq!(blob_data["success"], true);
5816 let decoded = base64_decode(blob_data["data"].as_str().unwrap()).unwrap();
5817 assert_eq!(decoded, blob_bytes);
5818
5819 let miss_resp = client
5821 .get(format!(
5822 "{}/repos/blob-repo/blobs/{}",
5823 &base,
5824 "0".repeat(64)
5825 ))
5826 .send()
5827 .await
5828 .unwrap();
5829 assert_eq!(miss_resp.status(), 404);
5830 }
5831
5832 #[tokio::test]
5833 async fn test_http_login() {
5834 let (hub, _port, base) = start_test_hub().await;
5835 let client = reqwest::Client::new();
5836
5837 let token = create_test_user_hub(&hub, "login-user", "Login User", "member").await;
5839 {
5840 let store = hub.storage.write().await;
5841 store
5842 .store_token(&token, 1000, "login test token", i64::MAX)
5843 .unwrap();
5844 }
5845
5846 let resp = post_json(
5848 &client,
5849 &format!("{}/auth/login", &base),
5850 &serde_json::json!({
5851 "username": "login-user",
5852 "token": &token
5853 }),
5854 )
5855 .send()
5856 .await
5857 .unwrap();
5858 assert_eq!(resp.status(), 200);
5859 let data: serde_json::Value = resp.json().await.unwrap();
5860 assert_eq!(data["success"], true);
5861 assert_eq!(data["user"]["username"], "login-user");
5862
5863 let bad_resp = post_json(
5865 &client,
5866 &format!("{}/auth/login", &base),
5867 &serde_json::json!({
5868 "username": "login-user",
5869 "token": "invalid-token"
5870 }),
5871 )
5872 .send()
5873 .await
5874 .unwrap();
5875 assert_eq!(bad_resp.status(), 401);
5876 }
5877
5878 #[tokio::test]
5879 async fn test_http_search() {
5880 let (_hub, _port, base) = start_test_hub().await;
5881 let client = reqwest::Client::new();
5882 let a_hex = "a".repeat(64);
5883
5884 let push_body = serde_json::json!({
5886 "repo_id": "search-test-repo",
5887 "patches": [{
5888 "id": {"value": &a_hex},
5889 "operation_type": "Create",
5890 "touch_set": ["README.md"],
5891 "target_path": "README.md",
5892 "payload": "",
5893 "parent_ids": [],
5894 "author": "searcher",
5895 "message": "initial commit for search",
5896 "timestamp": 0
5897 }],
5898 "branches": [],
5899 "blobs": []
5900 });
5901 let push_resp = client
5902 .post(format!("{}/push", &base))
5903 .json(&push_body)
5904 .send()
5905 .await
5906 .unwrap();
5907 assert_eq!(push_resp.status(), 200);
5908
5909 let resp = client
5911 .get(format!("{}/search?q=search-test", &base))
5912 .send()
5913 .await
5914 .unwrap();
5915 assert_eq!(resp.status(), 200);
5916 let data: serde_json::Value = resp.json().await.unwrap();
5917 assert_eq!(data["repos"].as_array().unwrap().len(), 1);
5918
5919 let resp2 = client
5923 .get(format!("{}/search?q=search", &base))
5924 .send()
5925 .await
5926 .unwrap();
5927 assert_eq!(resp2.status(), 200);
5928 let data2: serde_json::Value = resp2.json().await.unwrap();
5929 assert!(!data2["patches"].as_array().unwrap().is_empty());
5930
5931 let resp3 = client
5933 .get(format!("{}/search?q=nonexistent_xyz", &base))
5934 .send()
5935 .await
5936 .unwrap();
5937 assert_eq!(resp3.status(), 200);
5938 let data3: serde_json::Value = resp3.json().await.unwrap();
5939 assert_eq!(data3["repos"].as_array().unwrap().len(), 0);
5940 }
5941
5942 #[tokio::test]
5943 async fn test_http_activity() {
5944 let (_hub, _port, base) = start_test_hub().await;
5945 let client = reqwest::Client::new();
5946
5947 let resp = client
5949 .get(format!("{}/activity", &base))
5950 .send()
5951 .await
5952 .unwrap();
5953 assert_eq!(resp.status(), 200);
5954 let data: serde_json::Value = resp.json().await.unwrap();
5955 assert!(data["entries"].is_array());
5957 }
5958
5959 #[tokio::test]
5960 async fn test_http_delete_mirror() {
5961 let (hub, _port, base) = start_test_hub_auth().await;
5962 let client = reqwest::Client::new();
5963 let admin_token = create_test_user_hub(&hub, "mirror-admin", "Mirror Admin", "admin").await;
5964
5965 let setup_resp = post_json(
5967 &client,
5968 &format!("{}/mirror/setup", &base),
5969 &serde_json::json!({
5970 "repo_name": "mirrored-del",
5971 "upstream_url": "http://example.com/del",
5972 "upstream_repo": "upstream/del"
5973 }),
5974 )
5975 .send()
5976 .await
5977 .unwrap();
5978 assert_eq!(setup_resp.status(), 200);
5979 let setup_data: serde_json::Value = setup_resp.json().await.unwrap();
5980 let mirror_id = setup_data["mirror_id"].as_i64().unwrap();
5981
5982 let del_resp = client
5984 .delete(format!("{}/mirrors/{}", &base, mirror_id))
5985 .header("Authorization", make_auth_header_val(&admin_token))
5986 .send()
5987 .await
5988 .unwrap();
5989 assert_eq!(del_resp.status(), 200);
5990 let del_data: serde_json::Value = del_resp.json().await.unwrap();
5991 assert_eq!(del_data["success"], true);
5992
5993 let status_resp = post_json(
5995 &client,
5996 &format!("{}/mirror/status", &base),
5997 &serde_json::json!({}),
5998 )
5999 .send()
6000 .await
6001 .unwrap();
6002 let status_data: serde_json::Value = status_resp.json().await.unwrap();
6003 assert_eq!(status_data["mirrors"].as_array().unwrap().len(), 0);
6004 }
6005
6006 #[tokio::test]
6007 async fn test_http_mirror_status_get() {
6008 let (_hub, _port, base) = start_test_hub().await;
6009 let client = reqwest::Client::new();
6010
6011 let resp = client
6013 .get(format!("{}/mirror/status", &base))
6014 .send()
6015 .await
6016 .unwrap();
6017 assert_eq!(resp.status(), 200);
6018 let data: serde_json::Value = resp.json().await.unwrap();
6019 assert!(data["mirrors"].is_array());
6020 }
6021
6022 #[tokio::test]
6023 async fn test_http_repo_tree() {
6024 let (hub, _port, base) = start_test_hub().await;
6025 let client = reqwest::Client::new();
6026 let a_hex = "a".repeat(64);
6027 let b_hex = "b".repeat(64);
6028 let c_hex = "c".repeat(64);
6029 let d_hex = "d".repeat(64);
6030
6031 hub.handle_push(PushRequest {
6032 repo_id: "tree-repo".to_string(),
6033 patches: vec![
6034 PatchProto {
6035 id: make_hash_proto(&a_hex),
6036 operation_type: "Create".to_string(),
6037 touch_set: vec!["src/main.rs".to_string()],
6038 target_path: Some("src/main.rs".to_string()),
6039 payload: "blob_aaa".to_string(),
6040 parent_ids: vec![],
6041 author: "alice".to_string(),
6042 message: "create main".to_string(),
6043 timestamp: 100,
6044 },
6045 PatchProto {
6046 id: make_hash_proto(&b_hex),
6047 operation_type: "Create".to_string(),
6048 touch_set: vec!["src/lib.rs".to_string()],
6049 target_path: Some("src/lib.rs".to_string()),
6050 payload: "blob_bbb".to_string(),
6051 parent_ids: vec![make_hash_proto(&a_hex)],
6052 author: "alice".to_string(),
6053 message: "create lib".to_string(),
6054 timestamp: 200,
6055 },
6056 PatchProto {
6057 id: make_hash_proto(&c_hex),
6058 operation_type: "Delete".to_string(),
6059 touch_set: vec!["src/main.rs".to_string()],
6060 target_path: Some("src/main.rs".to_string()),
6061 payload: String::new(),
6062 parent_ids: vec![make_hash_proto(&b_hex)],
6063 author: "alice".to_string(),
6064 message: "delete main".to_string(),
6065 timestamp: 300,
6066 },
6067 PatchProto {
6068 id: make_hash_proto(&d_hex),
6069 operation_type: "Modify".to_string(),
6070 touch_set: vec!["src/lib.rs".to_string()],
6071 target_path: Some("src/lib.rs".to_string()),
6072 payload: "blob_ddd".to_string(),
6073 parent_ids: vec![make_hash_proto(&c_hex)],
6074 author: "bob".to_string(),
6075 message: "modify lib".to_string(),
6076 timestamp: 400,
6077 },
6078 ],
6079 branches: vec![make_branch("main", &d_hex)],
6080 blobs: vec![],
6081 signature: None,
6082 known_branches: None,
6083 force: false,
6084 })
6085 .await
6086 .unwrap();
6087
6088 let resp = client
6089 .get(format!("{}/repos/tree-repo/tree/main", &base))
6090 .send()
6091 .await
6092 .unwrap();
6093 assert_eq!(resp.status(), 200);
6094 let data: serde_json::Value = resp.json().await.unwrap();
6095 assert_eq!(data["success"], true);
6096 let files = data["files"].as_array().unwrap();
6097 assert_eq!(files.len(), 1);
6098 assert_eq!(files[0]["path"], "src/lib.rs");
6099 assert_eq!(files[0]["content_hash"], "blob_ddd");
6100 }
6101
6102 #[tokio::test]
6103 async fn test_http_repo_tree_empty() {
6104 let (_hub, _port, base) = start_test_hub().await;
6105 let client = reqwest::Client::new();
6106
6107 let resp = client
6108 .get(format!("{}/repos/nonexistent/tree/main", &base))
6109 .send()
6110 .await
6111 .unwrap();
6112 assert_eq!(resp.status(), 200);
6113 let data: serde_json::Value = resp.json().await.unwrap();
6114 assert_eq!(data["success"], true);
6115 let files = data["files"].as_array().unwrap();
6116 assert_eq!(files.len(), 0);
6117 }
6118
6119 #[tokio::test]
6120 async fn test_lfs_batch_upload_none() {
6121 let tmp = tempfile::tempdir().unwrap();
6122 let repo_id = "test-repo";
6123 let oid = "aabbccdd";
6124 let prefix = &oid[..2];
6125 let obj_path = tmp
6126 .path()
6127 .join(repo_id)
6128 .join("objects")
6129 .join(prefix)
6130 .join(oid);
6131 std::fs::create_dir_all(obj_path.parent().unwrap()).unwrap();
6132 std::fs::write(&obj_path, b"existing data").unwrap();
6133
6134 let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6135 let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6136 let client = reqwest::Client::new();
6137
6138 let resp = post_json(
6139 &client,
6140 &format!("{}/lfs/batch", &base),
6141 &serde_json::json!({
6142 "repo_id": repo_id,
6143 "operation": "upload",
6144 "objects": [{"oid": oid, "size": 12}],
6145 }),
6146 )
6147 .send()
6148 .await
6149 .unwrap();
6150
6151 assert_eq!(resp.status(), 200);
6152 let data: serde_json::Value = resp.json().await.unwrap();
6153 assert_eq!(data["objects"][0]["action"], "none");
6154 }
6155
6156 #[tokio::test]
6157 async fn test_lfs_upload_download_roundtrip() {
6158 let tmp = tempfile::tempdir().unwrap();
6159 let repo_id = "test-repo";
6160
6161 let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6162 let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6163 let client = reqwest::Client::new();
6164
6165 let payload = b"hello lfs world".repeat(1000);
6166 let hash = sha2::Sha256::digest(&payload);
6167 let oid = hex::encode(hash);
6168
6169 let resp = post_json(
6170 &client,
6171 &format!("{}/lfs/batch", &base),
6172 &serde_json::json!({
6173 "repo_id": repo_id,
6174 "operation": "upload",
6175 "objects": [{"oid": &oid, "size": payload.len()}],
6176 }),
6177 )
6178 .send()
6179 .await
6180 .unwrap();
6181 assert_eq!(resp.status(), 200);
6182 let data: serde_json::Value = resp.json().await.unwrap();
6183 assert_eq!(data["objects"][0]["action"], "upload");
6184
6185 let resp = client
6186 .put(format!("{}/lfs/objects/{}/{}", &base, repo_id, &oid))
6187 .body(payload.clone())
6188 .send()
6189 .await
6190 .unwrap();
6191 assert_eq!(resp.status(), 200);
6192
6193 let resp = post_json(
6194 &client,
6195 &format!("{}/lfs/batch", &base),
6196 &serde_json::json!({
6197 "repo_id": repo_id,
6198 "operation": "download",
6199 "objects": [{"oid": &oid, "size": payload.len()}],
6200 }),
6201 )
6202 .send()
6203 .await
6204 .unwrap();
6205 assert_eq!(resp.status(), 200);
6206 let data: serde_json::Value = resp.json().await.unwrap();
6207 assert_eq!(data["objects"][0]["action"], "download");
6208
6209 let resp = client
6210 .get(format!("{}/lfs/objects/{}/{}", &base, repo_id, &oid))
6211 .send()
6212 .await
6213 .unwrap();
6214 assert_eq!(resp.status(), 200);
6215 let downloaded = resp.bytes().await.unwrap();
6216 assert_eq!(downloaded.as_ref(), payload.as_slice());
6217
6218 let resp = client
6219 .put(format!(
6220 "{}/lfs/objects/{}/{}",
6221 &base, repo_id, "badbadbadbadbadbadbadbadbadbadbadbadbadbadbad"
6222 ))
6223 .body(payload.clone())
6224 .send()
6225 .await
6226 .unwrap();
6227 assert_eq!(resp.status(), 400);
6228 }
6229
6230 #[tokio::test]
6231 async fn test_lfs_batch_download_missing() {
6232 let tmp = tempfile::tempdir().unwrap();
6233 let hub = SutureHubServer::new_in_memory().with_lfs_dir(tmp.path().to_path_buf());
6234 let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6235 let client = reqwest::Client::new();
6236
6237 let resp = post_json(&client, &format!("{}/lfs/batch", &base), &serde_json::json!({
6238 "repo_id": "test-repo",
6239 "operation": "download",
6240 "objects": [{"oid": "ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00ff00", "size": 100}],
6241 }))
6242 .send().await.unwrap();
6243
6244 assert_eq!(resp.status(), 200);
6245 let data: serde_json::Value = resp.json().await.unwrap();
6246 assert_eq!(data["objects"][0]["action"], "none");
6247 }
6248
6249 #[tokio::test]
6250 async fn test_lfs_no_storage_configured() {
6251 let hub = SutureHubServer::new_in_memory();
6252 let (_hub, _port, base) = start_test_hub_with_lfs(Arc::new(hub)).await;
6253 let client = reqwest::Client::new();
6254
6255 let resp = post_json(&client, &format!("{}/lfs/batch", &base), &serde_json::json!({
6256 "repo_id": "test-repo",
6257 "operation": "upload",
6258 "objects": [{"oid": "aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00", "size": 10}],
6259 }))
6260 .send().await.unwrap();
6261 assert_eq!(resp.status(), 503);
6262
6263 let resp = client
6264 .put(format!("{base}/lfs/objects/test-repo/aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00aa00"))
6265 .body(b"some data".to_vec())
6266 .send()
6267 .await
6268 .unwrap();
6269 assert_eq!(resp.status(), 503);
6270 }
6271
6272 #[cfg(feature = "s3-backend")]
6273 #[test]
6274 fn test_blob_backend_used_when_set() {
6275 use crate::blob_backend::BlobBackend;
6276
6277 struct MockBackend {
6278 store_called: std::sync::atomic::AtomicBool,
6279 get_called: std::sync::atomic::AtomicBool,
6280 }
6281
6282 impl MockBackend {
6283 fn new() -> Self {
6284 Self {
6285 store_called: std::sync::atomic::AtomicBool::new(false),
6286 get_called: std::sync::atomic::AtomicBool::new(false),
6287 }
6288 }
6289 }
6290
6291 impl BlobBackend for MockBackend {
6292 fn store_blob(
6293 &self,
6294 _repo_id: &str,
6295 _hash_hex: &str,
6296 _data: &[u8],
6297 ) -> Result<(), String> {
6298 self.store_called
6299 .store(true, std::sync::atomic::Ordering::Relaxed);
6300 Ok(())
6301 }
6302 fn get_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<Option<Vec<u8>>, String> {
6303 self.get_called
6304 .store(true, std::sync::atomic::Ordering::Relaxed);
6305 Ok(None)
6306 }
6307 fn has_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<bool, String> {
6308 Ok(false)
6309 }
6310 fn delete_blob(&self, _repo_id: &str, _hash_hex: &str) -> Result<(), String> {
6311 Ok(())
6312 }
6313 fn list_blobs(&self, _repo_id: &str) -> Result<Vec<String>, String> {
6314 Ok(vec![])
6315 }
6316 }
6317
6318 let mock = Arc::new(MockBackend::new());
6319 let mut hub = SutureHubServer::new();
6320 hub.set_blob_backend(mock.clone());
6321
6322 let rt = tokio::runtime::Runtime::new().unwrap();
6323 rt.block_on(async {
6324 let store = hub.storage.read().await;
6325 hub.blob_store(&store, "test-repo", &"a".repeat(64), b"data")
6326 .unwrap();
6327 assert!(mock.store_called.load(std::sync::atomic::Ordering::Relaxed));
6328
6329 hub.blob_get(&store, "test-repo", &"a".repeat(64)).unwrap();
6330 assert!(mock.get_called.load(std::sync::atomic::Ordering::Relaxed));
6331 });
6332 }
6333
6334 #[cfg(feature = "raft-cluster")]
6335 #[tokio::test]
6336 async fn test_apply_raft_command_create_repo() {
6337 use crate::raft::HubCommand;
6338
6339 let hub = SutureHubServer::new_in_memory();
6340 hub.apply_raft_command(HubCommand::CreateRepo {
6341 repo_id: "raft-repo".to_string(),
6342 })
6343 .await
6344 .unwrap();
6345
6346 let store = hub.storage.read().await;
6347 assert!(store.repo_exists("raft-repo").unwrap_or(false));
6348 }
6349
6350 #[cfg(feature = "raft-cluster")]
6351 #[tokio::test]
6352 async fn test_apply_raft_command_delete_repo() {
6353 use crate::raft::HubCommand;
6354
6355 let hub = SutureHubServer::new_in_memory();
6356 {
6357 let store = hub.storage.write().await;
6358 store.ensure_repo("del-repo").unwrap();
6359 }
6360 hub.apply_raft_command(HubCommand::DeleteRepo {
6361 repo_id: "del-repo".to_string(),
6362 })
6363 .await
6364 .unwrap();
6365
6366 let store = hub.storage.read().await;
6367 assert!(!store.repo_exists("del-repo").unwrap_or(false));
6368 }
6369
6370 #[cfg(feature = "raft-cluster")]
6371 #[tokio::test]
6372 async fn test_apply_raft_command_branch() {
6373 use crate::raft::HubCommand;
6374
6375 let hub = SutureHubServer::new_in_memory();
6376 {
6377 let store = hub.storage.write().await;
6378 store.ensure_repo("br-repo").unwrap();
6379 }
6380
6381 hub.apply_raft_command(HubCommand::CreateBranch {
6382 repo_id: "br-repo".to_string(),
6383 branch: "main".to_string(),
6384 target: "a".repeat(64),
6385 })
6386 .await
6387 .unwrap();
6388
6389 hub.apply_raft_command(HubCommand::UpdateBranch {
6390 repo_id: "br-repo".to_string(),
6391 branch: "main".to_string(),
6392 target: "b".repeat(64),
6393 })
6394 .await
6395 .unwrap();
6396
6397 hub.apply_raft_command(HubCommand::DeleteBranch {
6398 repo_id: "br-repo".to_string(),
6399 branch: "main".to_string(),
6400 })
6401 .await
6402 .unwrap();
6403
6404 let store = hub.storage.read().await;
6405 let branches = store.get_branches("br-repo").unwrap_or_default();
6406 assert!(branches.is_empty());
6407 }
6408}