1use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::reconciler;
20use crate::state::AppState;
21
22type HmacSha256 = Hmac<Sha256>;
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct WebhookConfig {
27 pub repo: String,
29 pub service_name: String,
31 #[serde(default = "default_branch")]
33 pub branch: String,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub secret: Option<String>,
37 #[serde(default)]
39 pub infra: bool,
40}
41
42fn default_branch() -> String {
43 "main".to_string()
44}
45
46pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
48
49fn webhooks_path() -> std::path::PathBuf {
53 if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
54 return std::path::PathBuf::from(p);
55 }
56 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
57 std::path::PathBuf::from(home).join(".orca/webhooks.json")
58}
59
60pub fn new_store() -> WebhookStore {
62 let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
63 .ok()
64 .and_then(|raw| serde_json::from_str(&raw).ok())
65 .unwrap_or_default();
66 Arc::new(RwLock::new(configs))
67}
68
69async fn persist(store: &WebhookStore) {
72 let snapshot = store.read().await.clone();
73 let path = webhooks_path();
74 if let Some(parent) = path.parent() {
75 let _ = std::fs::create_dir_all(parent);
76 }
77 match serde_json::to_string_pretty(&snapshot) {
78 Ok(json) => {
79 if let Err(e) = std::fs::write(&path, json) {
80 error!("Failed to persist webhooks to {}: {e}", path.display());
81 }
82 }
83 Err(e) => error!("Failed to serialize webhooks: {e}"),
84 }
85}
86
87#[derive(Debug, serde::Deserialize)]
89struct PushPayload {
90 #[serde(rename = "ref")]
92 git_ref: String,
93 repository: RepoInfo,
94 head_commit: Option<CommitInfo>,
95}
96
97#[derive(Debug, serde::Deserialize)]
98struct RepoInfo {
99 full_name: String,
100}
101
102#[derive(Debug, serde::Deserialize)]
103struct CommitInfo {
104 id: String,
105 message: String,
106}
107
108fn branch_from_ref(git_ref: &str) -> Option<&str> {
110 git_ref.strip_prefix("refs/heads/")
111}
112
113fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
115 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
116 return false;
117 };
118
119 let Ok(expected) = hex::decode(hex_sig) else {
120 return false;
121 };
122
123 let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
124 return false;
125 };
126
127 mac.update(body);
128 mac.verify_slice(&expected).is_ok()
129}
130
131pub fn webhook_router() -> Router<Arc<AppState>> {
137 Router::new()
138 .route("/api/v1/webhooks/github", post(handle_push))
139 .route("/api/v1/webhooks", post(register).get(list))
140 .route("/api/v1/webhooks/{id}", delete(remove_webhook))
141}
142
143pub async fn handle_push(
144 State(state): State<Arc<AppState>>,
145 headers: HeaderMap,
146 body: axum::body::Bytes,
147) -> impl IntoResponse {
148 let payload: PushPayload = match serde_json::from_slice(&body) {
150 Ok(p) => p,
151 Err(e) => {
152 warn!("Webhook: invalid payload: {e}");
153 return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
154 }
155 };
156
157 let repo = &payload.repository.full_name;
158 let Some(branch) = branch_from_ref(&payload.git_ref) else {
159 return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
160 };
161
162 let commit_id = payload
163 .head_commit
164 .as_ref()
165 .map(|c| c.id.as_str())
166 .unwrap_or("unknown");
167 let commit_msg = payload
168 .head_commit
169 .as_ref()
170 .and_then(|c| c.message.lines().next())
171 .unwrap_or("");
172 let short_sha = &commit_id[..commit_id.len().min(8)];
173
174 info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
175
176 let webhooks = state.webhooks.read().await;
178 let matching: Vec<WebhookConfig> = webhooks
179 .iter()
180 .filter(|w| w.repo == *repo && w.branch == branch)
181 .cloned()
182 .collect();
183 drop(webhooks);
184
185 if matching.is_empty() {
186 info!("Webhook: no config for {repo}#{branch}, ignoring");
187 return (
188 StatusCode::OK,
189 "ignored: no matching webhook config".to_string(),
190 )
191 .into_response();
192 }
193
194 let sig_header = headers
195 .get("X-Hub-Signature-256")
196 .and_then(|v| v.to_str().ok())
197 .unwrap_or("");
198
199 let mut deployed = Vec::new();
200 let mut errors = Vec::new();
201 let mut sig_failures = 0u32;
202
203 for wh in &matching {
204 if let Some(secret) = &wh.secret
206 && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
207 {
208 sig_failures += 1;
209 warn!("Webhook: HMAC validation failed for {}", wh.service_name);
210 continue;
211 }
212
213 if wh.infra {
214 info!("Webhook: infra push detected, running git pull + deploy all");
215 match handle_infra_deploy(&state).await {
216 Ok(count) => {
217 deployed.push(format!("infra ({count} services)"));
218 }
219 Err(e) => {
220 error!("Webhook: infra deploy failed: {e}");
221 errors.push(format!("infra: {e}"));
222 }
223 }
224 continue;
225 }
226
227 info!("Webhook: triggering redeploy of {}", wh.service_name);
228 match reconciler::redeploy(&state, &wh.service_name).await {
229 Ok(()) => deployed.push(wh.service_name.clone()),
230 Err(e) => {
231 error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
232 errors.push(format!("{}: {e}", wh.service_name));
233 }
234 }
235 }
236
237 if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
239 return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
240 }
241
242 let status = if errors.is_empty() {
243 StatusCode::OK
244 } else if deployed.is_empty() {
245 StatusCode::INTERNAL_SERVER_ERROR
246 } else {
247 StatusCode::PARTIAL_CONTENT
248 };
249
250 (
251 status,
252 Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
253 )
254 .into_response()
255}
256
257pub async fn register(
261 State(state): State<Arc<AppState>>,
262 Json(config): Json<WebhookConfig>,
263) -> impl IntoResponse {
264 info!(
265 "Webhook: registering {}#{} -> {}",
266 config.repo, config.branch, config.service_name
267 );
268 {
269 let mut webhooks = state.webhooks.write().await;
270 webhooks.retain(|w| {
272 !(w.repo == config.repo
273 && w.branch == config.branch
274 && w.service_name == config.service_name)
275 });
276 webhooks.push(config);
277 }
278 persist(&state.webhooks).await;
279 (
280 StatusCode::CREATED,
281 Json(serde_json::json!({"status": "registered"})),
282 )
283}
284
285pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
289 let webhooks = state.webhooks.read().await;
290 Json(serde_json::json!({ "webhooks": *webhooks }))
291}
292
293pub async fn remove_webhook(
297 State(state): State<Arc<AppState>>,
298 Path(id): Path<String>,
299) -> impl IntoResponse {
300 let removed = {
301 let mut webhooks = state.webhooks.write().await;
302 let before = webhooks.len();
303 webhooks.retain(|w| w.service_name != id);
304 before - webhooks.len()
305 };
306 if removed > 0 {
307 persist(&state.webhooks).await;
308 }
309
310 if removed == 0 {
311 (
312 StatusCode::NOT_FOUND,
313 Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
314 )
315 .into_response()
316 } else {
317 info!("Webhook: removed {removed} webhook(s) for service '{id}'");
318 (
319 StatusCode::OK,
320 Json(serde_json::json!({"status": "removed", "count": removed})),
321 )
322 .into_response()
323 }
324}
325
326async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
329 let output = tokio::process::Command::new("git")
331 .args(["pull", "--ff-only"])
332 .output()
333 .await?;
334
335 if !output.status.success() {
336 let stderr = String::from_utf8_lossy(&output.stderr);
337 anyhow::bail!("git pull failed: {stderr}");
338 }
339 let stdout = String::from_utf8_lossy(&output.stdout);
340 info!("Infra git pull: {}", stdout.trim());
341
342 let services_dir = std::path::Path::new("services");
344 let configs = if services_dir.is_dir() {
345 orca_core::config::ServicesConfig::load_dir(services_dir)?
346 } else {
347 orca_core::config::ServicesConfig::load("services.toml".as_ref())?
348 };
349
350 let count = configs.service.len();
354 let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
355
356 if let Some(store) = &state.store {
358 for config in &configs.service {
359 if deployed.contains(&config.name)
360 && let Err(e) = store.set_service(&config.name, config)
361 {
362 tracing::warn!("Failed to persist {}: {e}", config.name);
363 }
364 }
365 }
366
367 if !errors.is_empty() {
368 warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
369 }
370 info!(
371 "Infra deploy complete: {}/{} services",
372 deployed.len(),
373 count
374 );
375 Ok(count)
376}
377
378#[cfg(test)]
379#[path = "webhook_tests.rs"]
380mod tests;