1use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::operations::AgentOfflineError;
20use crate::reconciler;
21use crate::state::AppState;
22
23type HmacSha256 = Hmac<Sha256>;
24
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27pub struct WebhookConfig {
28 pub repo: String,
30 pub service_name: String,
32 #[serde(default = "default_branch")]
34 pub branch: String,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub secret: Option<String>,
38 #[serde(default)]
40 pub infra: bool,
41}
42
43fn default_branch() -> String {
44 "main".to_string()
45}
46
47pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
49
50fn webhooks_path() -> std::path::PathBuf {
54 if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
55 return std::path::PathBuf::from(p);
56 }
57 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
58 std::path::PathBuf::from(home).join(".orca/webhooks.json")
59}
60
61pub fn new_store() -> WebhookStore {
63 let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
64 .ok()
65 .and_then(|raw| serde_json::from_str(&raw).ok())
66 .unwrap_or_default();
67 Arc::new(RwLock::new(configs))
68}
69
70async fn persist(store: &WebhookStore) {
73 let snapshot = store.read().await.clone();
74 let path = webhooks_path();
75 if let Some(parent) = path.parent() {
76 let _ = std::fs::create_dir_all(parent);
77 }
78 match serde_json::to_string_pretty(&snapshot) {
79 Ok(json) => {
80 if let Err(e) = std::fs::write(&path, json) {
81 error!("Failed to persist webhooks to {}: {e}", path.display());
82 }
83 }
84 Err(e) => error!("Failed to serialize webhooks: {e}"),
85 }
86}
87
88#[derive(Debug, serde::Deserialize)]
90struct PushPayload {
91 #[serde(rename = "ref")]
93 git_ref: String,
94 repository: RepoInfo,
95 head_commit: Option<CommitInfo>,
96}
97
98#[derive(Debug, serde::Deserialize)]
99struct RepoInfo {
100 full_name: String,
101}
102
103#[derive(Debug, serde::Deserialize)]
104struct CommitInfo {
105 id: String,
106 message: String,
107}
108
109fn branch_from_ref(git_ref: &str) -> Option<&str> {
111 git_ref.strip_prefix("refs/heads/")
112}
113
114fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
116 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
117 return false;
118 };
119
120 let Ok(expected) = hex::decode(hex_sig) else {
121 return false;
122 };
123
124 let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
125 return false;
126 };
127
128 mac.update(body);
129 mac.verify_slice(&expected).is_ok()
130}
131
132pub fn webhook_router() -> Router<Arc<AppState>> {
138 Router::new()
139 .route("/api/v1/webhooks/github", post(handle_push))
140 .route("/api/v1/webhooks", post(register).get(list))
141 .route("/api/v1/webhooks/{id}", delete(remove_webhook))
142}
143
144pub async fn handle_push(
145 State(state): State<Arc<AppState>>,
146 headers: HeaderMap,
147 body: axum::body::Bytes,
148) -> impl IntoResponse {
149 let payload: PushPayload = match serde_json::from_slice(&body) {
151 Ok(p) => p,
152 Err(e) => {
153 warn!("Webhook: invalid payload: {e}");
154 return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
155 }
156 };
157
158 let repo = &payload.repository.full_name;
159 let Some(branch) = branch_from_ref(&payload.git_ref) else {
160 return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
161 };
162
163 let commit_id = payload
164 .head_commit
165 .as_ref()
166 .map(|c| c.id.as_str())
167 .unwrap_or("unknown");
168 let commit_msg = payload
169 .head_commit
170 .as_ref()
171 .and_then(|c| c.message.lines().next())
172 .unwrap_or("");
173 let short_sha = &commit_id[..commit_id.len().min(8)];
174
175 info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
176
177 let webhooks = state.webhooks.read().await;
179 let matching: Vec<WebhookConfig> = webhooks
180 .iter()
181 .filter(|w| w.repo == *repo && w.branch == branch)
182 .cloned()
183 .collect();
184 drop(webhooks);
185
186 if matching.is_empty() {
187 info!("Webhook: no config for {repo}#{branch}, ignoring");
188 return (
189 StatusCode::OK,
190 "ignored: no matching webhook config".to_string(),
191 )
192 .into_response();
193 }
194
195 let sig_header = headers
196 .get("X-Hub-Signature-256")
197 .and_then(|v| v.to_str().ok())
198 .unwrap_or("");
199
200 let mut deployed = Vec::new();
201 let mut errors = Vec::new();
202 let mut sig_failures = 0u32;
203 let mut agent_offline = false;
204
205 for wh in &matching {
206 if let Some(secret) = &wh.secret
208 && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
209 {
210 sig_failures += 1;
211 warn!("Webhook: HMAC validation failed for {}", wh.service_name);
212 continue;
213 }
214
215 if wh.infra {
216 info!("Webhook: infra push detected, spawning git pull + deploy all");
217 let state_clone = Arc::clone(&state);
218 tokio::spawn(async move {
219 match handle_infra_deploy(&state_clone).await {
220 Ok(count) => info!("Infra deploy complete: {count} services"),
221 Err(e) => error!("Webhook: infra deploy failed: {e}"),
222 }
223 });
224 deployed.push("infra (deploying)".to_string());
225 continue;
226 }
227
228 info!("Webhook: triggering redeploy of {}", wh.service_name);
229 match reconciler::redeploy(&state, &wh.service_name).await {
230 Ok(()) => deployed.push(wh.service_name.clone()),
231 Err(e) => {
232 if e.downcast_ref::<AgentOfflineError>().is_some() {
233 agent_offline = true;
234 }
235 error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
236 errors.push(format!("{}: {e}", wh.service_name));
237 }
238 }
239 }
240
241 if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
243 return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
244 }
245
246 let status = if errors.is_empty() {
247 StatusCode::OK
248 } else if deployed.is_empty() && agent_offline {
249 StatusCode::SERVICE_UNAVAILABLE
250 } else if deployed.is_empty() {
251 StatusCode::INTERNAL_SERVER_ERROR
252 } else {
253 StatusCode::PARTIAL_CONTENT
254 };
255
256 (
257 status,
258 Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
259 )
260 .into_response()
261}
262
263pub async fn register(
267 State(state): State<Arc<AppState>>,
268 Json(config): Json<WebhookConfig>,
269) -> impl IntoResponse {
270 info!(
271 "Webhook: registering {}#{} -> {}",
272 config.repo, config.branch, config.service_name
273 );
274 {
275 let mut webhooks = state.webhooks.write().await;
276 webhooks.retain(|w| {
278 !(w.repo == config.repo
279 && w.branch == config.branch
280 && w.service_name == config.service_name)
281 });
282 webhooks.push(config);
283 }
284 persist(&state.webhooks).await;
285 (
286 StatusCode::CREATED,
287 Json(serde_json::json!({"status": "registered"})),
288 )
289}
290
291pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
295 let webhooks = state.webhooks.read().await;
296 Json(serde_json::json!({ "webhooks": *webhooks }))
297}
298
299pub async fn remove_webhook(
303 State(state): State<Arc<AppState>>,
304 Path(id): Path<String>,
305) -> impl IntoResponse {
306 let removed = {
307 let mut webhooks = state.webhooks.write().await;
308 let before = webhooks.len();
309 webhooks.retain(|w| w.service_name != id);
310 before - webhooks.len()
311 };
312 if removed > 0 {
313 persist(&state.webhooks).await;
314 }
315
316 if removed == 0 {
317 (
318 StatusCode::NOT_FOUND,
319 Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
320 )
321 .into_response()
322 } else {
323 info!("Webhook: removed {removed} webhook(s) for service '{id}'");
324 (
325 StatusCode::OK,
326 Json(serde_json::json!({"status": "removed", "count": removed})),
327 )
328 .into_response()
329 }
330}
331
332async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
335 let output = tokio::process::Command::new("git")
337 .args(["pull", "--ff-only"])
338 .output()
339 .await?;
340
341 if !output.status.success() {
342 let stderr = String::from_utf8_lossy(&output.stderr);
343 anyhow::bail!("git pull failed: {stderr}");
344 }
345 let stdout = String::from_utf8_lossy(&output.stdout);
346 info!("Infra git pull: {}", stdout.trim());
347
348 let services_dir = std::path::Path::new("services");
350 let configs = if services_dir.is_dir() {
351 orca_core::config::ServicesConfig::load_dir(services_dir)?
352 } else {
353 orca_core::config::ServicesConfig::load("services.toml".as_ref())?
354 };
355
356 let count = configs.service.len();
360 let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
361
362 if let Some(store) = &state.store {
364 for config in &configs.service {
365 if deployed.contains(&config.name)
366 && let Err(e) = store.set_service(&config.name, config)
367 {
368 tracing::warn!("Failed to persist {}: {e}", config.name);
369 }
370 }
371 }
372
373 if !errors.is_empty() {
374 warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
375 }
376 info!(
377 "Infra deploy complete: {}/{} services",
378 deployed.len(),
379 count
380 );
381 Ok(count)
382}
383
384#[cfg(test)]
385#[path = "webhook_tests.rs"]
386mod tests;