1use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::operations::AgentOfflineError;
20use crate::reconciler;
21use crate::state::AppState;
22use crate::webhook_invocations::record_invocation;
23
24type HmacSha256 = Hmac<Sha256>;
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct WebhookConfig {
29 pub repo: String,
31 pub service_name: String,
33 #[serde(default = "default_branch")]
35 pub branch: String,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub secret: Option<String>,
39 #[serde(default)]
41 pub infra: bool,
42}
43
44fn default_branch() -> String {
45 "main".to_string()
46}
47
48pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
50
51fn webhooks_path() -> std::path::PathBuf {
55 if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
56 return std::path::PathBuf::from(p);
57 }
58 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
59 std::path::PathBuf::from(home).join(".orca/webhooks.json")
60}
61
62pub fn new_store() -> WebhookStore {
64 let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
65 .ok()
66 .and_then(|raw| serde_json::from_str(&raw).ok())
67 .unwrap_or_default();
68 Arc::new(RwLock::new(configs))
69}
70
71async fn persist(store: &WebhookStore) {
74 let snapshot = store.read().await.clone();
75 let path = webhooks_path();
76 if let Some(parent) = path.parent() {
77 let _ = std::fs::create_dir_all(parent);
78 }
79 match serde_json::to_string_pretty(&snapshot) {
80 Ok(json) => {
81 if let Err(e) = std::fs::write(&path, json) {
82 error!("Failed to persist webhooks to {}: {e}", path.display());
83 }
84 }
85 Err(e) => error!("Failed to serialize webhooks: {e}"),
86 }
87}
88
89#[derive(Debug, serde::Deserialize)]
91struct PushPayload {
92 #[serde(rename = "ref")]
94 git_ref: String,
95 repository: RepoInfo,
96 head_commit: Option<CommitInfo>,
97}
98
99#[derive(Debug, serde::Deserialize)]
100struct RepoInfo {
101 full_name: String,
102}
103
104#[derive(Debug, serde::Deserialize)]
105struct CommitInfo {
106 id: String,
107 message: String,
108}
109
110fn branch_from_ref(git_ref: &str) -> Option<&str> {
112 git_ref.strip_prefix("refs/heads/")
113}
114
115fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
117 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
118 return false;
119 };
120
121 let Ok(expected) = hex::decode(hex_sig) else {
122 return false;
123 };
124
125 let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
126 return false;
127 };
128
129 mac.update(body);
130 mac.verify_slice(&expected).is_ok()
131}
132
133pub fn webhook_router() -> Router<Arc<AppState>> {
139 use axum::routing::get;
140 Router::new()
141 .route("/api/v1/webhooks/github", post(handle_push))
142 .route("/api/v1/webhooks", post(register).get(list))
143 .route("/api/v1/webhooks/{id}", delete(remove_webhook))
144 .route(
145 "/api/v1/webhooks/{id}/invocations",
146 get(crate::webhook_invocations::invocations),
147 )
148}
149
150pub async fn handle_push(
151 State(state): State<Arc<AppState>>,
152 headers: HeaderMap,
153 body: axum::body::Bytes,
154) -> impl IntoResponse {
155 let payload: PushPayload = match serde_json::from_slice(&body) {
157 Ok(p) => p,
158 Err(e) => {
159 warn!("Webhook: invalid payload: {e}");
160 return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
161 }
162 };
163
164 let repo = &payload.repository.full_name;
165 let Some(branch) = branch_from_ref(&payload.git_ref) else {
166 return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
167 };
168
169 let commit_id = payload
170 .head_commit
171 .as_ref()
172 .map(|c| c.id.as_str())
173 .unwrap_or("unknown");
174 let commit_msg = payload
175 .head_commit
176 .as_ref()
177 .and_then(|c| c.message.lines().next())
178 .unwrap_or("");
179 let short_sha = &commit_id[..commit_id.len().min(8)];
180
181 info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
182
183 let webhooks = state.webhooks.read().await;
185 let matching: Vec<WebhookConfig> = webhooks
186 .iter()
187 .filter(|w| w.repo == *repo && w.branch == branch)
188 .cloned()
189 .collect();
190 drop(webhooks);
191
192 if matching.is_empty() {
193 info!("Webhook: no config for {repo}#{branch}, ignoring");
194 return (
195 StatusCode::OK,
196 "ignored: no matching webhook config".to_string(),
197 )
198 .into_response();
199 }
200
201 let sig_header = headers
202 .get("X-Hub-Signature-256")
203 .and_then(|v| v.to_str().ok())
204 .unwrap_or("");
205
206 let mut deployed = Vec::new();
207 let mut errors = Vec::new();
208 let mut sig_failures = 0u32;
209 let mut agent_offline = false;
210
211 for wh in &matching {
212 if let Some(secret) = &wh.secret
214 && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
215 {
216 sig_failures += 1;
217 warn!("Webhook: HMAC validation failed for {}", wh.service_name);
218 record_invocation(&state, wh, short_sha, 401, false).await;
219 continue;
220 }
221
222 if wh.infra {
223 info!("Webhook: infra push detected, spawning git pull + deploy all");
224 let state_clone = Arc::clone(&state);
225 tokio::spawn(async move {
226 match handle_infra_deploy(&state_clone).await {
227 Ok(count) => info!("Infra deploy complete: {count} services"),
228 Err(e) => error!("Webhook: infra deploy failed: {e}"),
229 }
230 });
231 deployed.push("infra (deploying)".to_string());
232 record_invocation(&state, wh, short_sha, 202, true).await;
233 continue;
234 }
235
236 info!("Webhook: triggering redeploy of {}", wh.service_name);
237 match reconciler::redeploy(&state, &wh.service_name).await {
238 Ok(()) => {
239 deployed.push(wh.service_name.clone());
240 record_invocation(&state, wh, short_sha, 200, true).await;
241 }
242 Err(e) => {
243 let offline = e.downcast_ref::<AgentOfflineError>().is_some();
244 if offline {
245 agent_offline = true;
246 }
247 error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
248 errors.push(format!("{}: {e}", wh.service_name));
249 let code = if offline { 503 } else { 500 };
250 record_invocation(&state, wh, short_sha, code, false).await;
251 }
252 }
253 }
254
255 if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
257 return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
258 }
259
260 let status = if errors.is_empty() {
261 StatusCode::OK
262 } else if deployed.is_empty() && agent_offline {
263 StatusCode::SERVICE_UNAVAILABLE
264 } else if deployed.is_empty() {
265 StatusCode::INTERNAL_SERVER_ERROR
266 } else {
267 StatusCode::PARTIAL_CONTENT
268 };
269
270 (
271 status,
272 Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
273 )
274 .into_response()
275}
276
277pub async fn register(
281 State(state): State<Arc<AppState>>,
282 Json(config): Json<WebhookConfig>,
283) -> impl IntoResponse {
284 info!(
285 "Webhook: registering {}#{} -> {}",
286 config.repo, config.branch, config.service_name
287 );
288 {
289 let mut webhooks = state.webhooks.write().await;
290 webhooks.retain(|w| {
292 !(w.repo == config.repo
293 && w.branch == config.branch
294 && w.service_name == config.service_name)
295 });
296 webhooks.push(config);
297 }
298 persist(&state.webhooks).await;
299 (
300 StatusCode::CREATED,
301 Json(serde_json::json!({"status": "registered"})),
302 )
303}
304
305pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
310 use orca_core::api_types::{WebhookEntry, WebhookListResponse};
311 let webhooks = state.webhooks.read().await;
312 let invocations = state.webhook_invocations.read().await;
313 let entries: Vec<WebhookEntry> = webhooks
314 .iter()
315 .map(|w| WebhookEntry {
316 repo: w.repo.clone(),
317 service_name: w.service_name.clone(),
318 branch: w.branch.clone(),
319 has_secret: w.secret.is_some(),
320 infra: w.infra,
321 last_invocation: invocations
322 .get(&w.service_name)
323 .and_then(|q| q.back())
324 .cloned(),
325 })
326 .collect();
327 Json(WebhookListResponse { webhooks: entries })
328}
329
330pub async fn remove_webhook(
334 State(state): State<Arc<AppState>>,
335 Path(id): Path<String>,
336) -> impl IntoResponse {
337 let removed = {
338 let mut webhooks = state.webhooks.write().await;
339 let before = webhooks.len();
340 webhooks.retain(|w| w.service_name != id);
341 before - webhooks.len()
342 };
343 if removed > 0 {
344 persist(&state.webhooks).await;
345 }
346
347 if removed == 0 {
348 (
349 StatusCode::NOT_FOUND,
350 Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
351 )
352 .into_response()
353 } else {
354 info!("Webhook: removed {removed} webhook(s) for service '{id}'");
355 (
356 StatusCode::OK,
357 Json(serde_json::json!({"status": "removed", "count": removed})),
358 )
359 .into_response()
360 }
361}
362
363async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
366 let output = tokio::process::Command::new("git")
368 .args(["pull", "--ff-only"])
369 .output()
370 .await?;
371
372 if !output.status.success() {
373 let stderr = String::from_utf8_lossy(&output.stderr);
374 anyhow::bail!("git pull failed: {stderr}");
375 }
376 let stdout = String::from_utf8_lossy(&output.stdout);
377 info!("Infra git pull: {}", stdout.trim());
378
379 let services_dir = std::path::Path::new("services");
381 let configs = if services_dir.is_dir() {
382 orca_core::config::ServicesConfig::load_dir(services_dir)?
383 } else {
384 orca_core::config::ServicesConfig::load("services.toml".as_ref())?
385 };
386
387 let count = configs.service.len();
391 let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
392
393 if let Some(store) = &state.store {
395 for config in &configs.service {
396 if deployed.contains(&config.name)
397 && let Err(e) = store.set_service(&config.name, config)
398 {
399 tracing::warn!("Failed to persist {}: {e}", config.name);
400 }
401 }
402 }
403
404 if !errors.is_empty() {
405 warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
406 }
407 info!(
408 "Infra deploy complete: {}/{} services",
409 deployed.len(),
410 count
411 );
412 Ok(count)
413}
414
415#[cfg(test)]
416#[path = "webhook_tests.rs"]
417mod tests;