Skip to main content

orca_control/
webhook.rs

1//! Webhook handler for GitHub/Gitea/GitLab push events.
2//!
3//! When a push webhook fires, orca looks up the matching service and triggers
4//! a rolling redeploy (stop all instances, pull fresh image, recreate).
5
6use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::operations::AgentOfflineError;
20use crate::reconciler;
21use crate::state::AppState;
22use crate::webhook_invocations::record_invocation;
23
24type HmacSha256 = Hmac<Sha256>;
25
26/// Configuration for a webhook trigger.
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct WebhookConfig {
29    /// Repository full name, e.g. "myorg/api".
30    pub repo: String,
31    /// Orca service name to redeploy.
32    pub service_name: String,
33    /// Branch to watch (default: "main").
34    #[serde(default = "default_branch")]
35    pub branch: String,
36    /// Optional HMAC secret for signature validation.
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub secret: Option<String>,
39    /// If true, this is an infra webhook: git pull + redeploy all services.
40    #[serde(default)]
41    pub infra: bool,
42}
43
44fn default_branch() -> String {
45    "main".to_string()
46}
47
48/// Shared webhook config store, stored in [`AppState`] extension.
49pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
50
51/// Path to the on-disk webhook config file (under `~/.orca`).
52///
53/// Honors `ORCA_WEBHOOKS_PATH` as an override for tests.
54fn webhooks_path() -> std::path::PathBuf {
55    if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
56        return std::path::PathBuf::from(p);
57    }
58    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
59    std::path::PathBuf::from(home).join(".orca/webhooks.json")
60}
61
62/// Load persisted webhooks from disk, returning an empty list on first run.
63pub fn new_store() -> WebhookStore {
64    let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
65        .ok()
66        .and_then(|raw| serde_json::from_str(&raw).ok())
67        .unwrap_or_default();
68    Arc::new(RwLock::new(configs))
69}
70
71/// Persist the current webhook list to disk. Errors are logged, not returned,
72/// so they don't fail the request that triggered the change.
73async fn persist(store: &WebhookStore) {
74    let snapshot = store.read().await.clone();
75    let path = webhooks_path();
76    if let Some(parent) = path.parent() {
77        let _ = std::fs::create_dir_all(parent);
78    }
79    match serde_json::to_string_pretty(&snapshot) {
80        Ok(json) => {
81            if let Err(e) = std::fs::write(&path, json) {
82                error!("Failed to persist webhooks to {}: {e}", path.display());
83            }
84        }
85        Err(e) => error!("Failed to serialize webhooks: {e}"),
86    }
87}
88
89/// Subset of GitHub push webhook payload we care about.
90#[derive(Debug, serde::Deserialize)]
91struct PushPayload {
92    /// e.g. "refs/heads/main"
93    #[serde(rename = "ref")]
94    git_ref: String,
95    repository: RepoInfo,
96    head_commit: Option<CommitInfo>,
97}
98
99#[derive(Debug, serde::Deserialize)]
100struct RepoInfo {
101    full_name: String,
102}
103
104#[derive(Debug, serde::Deserialize)]
105struct CommitInfo {
106    id: String,
107    message: String,
108}
109
110/// Extract branch name from a git ref like "refs/heads/main".
111fn branch_from_ref(git_ref: &str) -> Option<&str> {
112    git_ref.strip_prefix("refs/heads/")
113}
114
115/// Validate HMAC-SHA256 signature from the `X-Hub-Signature-256` header.
116fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
117    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
118        return false;
119    };
120
121    let Ok(expected) = hex::decode(hex_sig) else {
122        return false;
123    };
124
125    let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
126        return false;
127    };
128
129    mac.update(body);
130    mac.verify_slice(&expected).is_ok()
131}
132
133/// Handle a GitHub/Gitea push webhook.
134///
135/// Mounted at `POST /api/v1/webhooks/github`.
136/// Build webhook routes.
137/// Build webhook routes (call before with_state on parent router).
138pub fn webhook_router() -> Router<Arc<AppState>> {
139    use axum::routing::get;
140    Router::new()
141        .route("/api/v1/webhooks/github", post(handle_push))
142        .route("/api/v1/webhooks", post(register).get(list))
143        .route("/api/v1/webhooks/{id}", delete(remove_webhook))
144        .route(
145            "/api/v1/webhooks/{id}/invocations",
146            get(crate::webhook_invocations::invocations),
147        )
148}
149
150pub async fn handle_push(
151    State(state): State<Arc<AppState>>,
152    headers: HeaderMap,
153    body: axum::body::Bytes,
154) -> impl IntoResponse {
155    // Parse the payload
156    let payload: PushPayload = match serde_json::from_slice(&body) {
157        Ok(p) => p,
158        Err(e) => {
159            warn!("Webhook: invalid payload: {e}");
160            return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
161        }
162    };
163
164    let repo = &payload.repository.full_name;
165    let Some(branch) = branch_from_ref(&payload.git_ref) else {
166        return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
167    };
168
169    let commit_id = payload
170        .head_commit
171        .as_ref()
172        .map(|c| c.id.as_str())
173        .unwrap_or("unknown");
174    let commit_msg = payload
175        .head_commit
176        .as_ref()
177        .and_then(|c| c.message.lines().next())
178        .unwrap_or("");
179    let short_sha = &commit_id[..commit_id.len().min(8)];
180
181    info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
182
183    // Find matching webhook config
184    let webhooks = state.webhooks.read().await;
185    let matching: Vec<WebhookConfig> = webhooks
186        .iter()
187        .filter(|w| w.repo == *repo && w.branch == branch)
188        .cloned()
189        .collect();
190    drop(webhooks);
191
192    if matching.is_empty() {
193        info!("Webhook: no config for {repo}#{branch}, ignoring");
194        return (
195            StatusCode::OK,
196            "ignored: no matching webhook config".to_string(),
197        )
198            .into_response();
199    }
200
201    let sig_header = headers
202        .get("X-Hub-Signature-256")
203        .and_then(|v| v.to_str().ok())
204        .unwrap_or("");
205
206    let mut deployed = Vec::new();
207    let mut errors = Vec::new();
208    let mut sig_failures = 0u32;
209    let mut agent_offline = false;
210
211    for wh in &matching {
212        // Validate secret if configured
213        if let Some(secret) = &wh.secret
214            && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
215        {
216            sig_failures += 1;
217            warn!("Webhook: HMAC validation failed for {}", wh.service_name);
218            record_invocation(&state, wh, short_sha, 401, false).await;
219            continue;
220        }
221
222        if wh.infra {
223            info!("Webhook: infra push detected, spawning git pull + deploy all");
224            let state_clone = Arc::clone(&state);
225            tokio::spawn(async move {
226                match handle_infra_deploy(&state_clone).await {
227                    Ok(count) => info!("Infra deploy complete: {count} services"),
228                    Err(e) => error!("Webhook: infra deploy failed: {e}"),
229                }
230            });
231            deployed.push("infra (deploying)".to_string());
232            record_invocation(&state, wh, short_sha, 202, true).await;
233            continue;
234        }
235
236        info!("Webhook: triggering redeploy of {}", wh.service_name);
237        match reconciler::redeploy(&state, &wh.service_name).await {
238            Ok(()) => {
239                deployed.push(wh.service_name.clone());
240                record_invocation(&state, wh, short_sha, 200, true).await;
241            }
242            Err(e) => {
243                let offline = e.downcast_ref::<AgentOfflineError>().is_some();
244                if offline {
245                    agent_offline = true;
246                }
247                error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
248                errors.push(format!("{}: {e}", wh.service_name));
249                let code = if offline { 503 } else { 500 };
250                record_invocation(&state, wh, short_sha, code, false).await;
251            }
252        }
253    }
254
255    // If every matching webhook failed signature validation, return 401
256    if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
257        return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
258    }
259
260    let status = if errors.is_empty() {
261        StatusCode::OK
262    } else if deployed.is_empty() && agent_offline {
263        StatusCode::SERVICE_UNAVAILABLE
264    } else if deployed.is_empty() {
265        StatusCode::INTERNAL_SERVER_ERROR
266    } else {
267        StatusCode::PARTIAL_CONTENT
268    };
269
270    (
271        status,
272        Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
273    )
274        .into_response()
275}
276
277/// Register a new webhook config.
278///
279/// Mounted at `POST /api/v1/webhooks`.
280pub async fn register(
281    State(state): State<Arc<AppState>>,
282    Json(config): Json<WebhookConfig>,
283) -> impl IntoResponse {
284    info!(
285        "Webhook: registering {}#{} -> {}",
286        config.repo, config.branch, config.service_name
287    );
288    {
289        let mut webhooks = state.webhooks.write().await;
290        // Remove existing config for same repo+branch+service to allow updates
291        webhooks.retain(|w| {
292            !(w.repo == config.repo
293                && w.branch == config.branch
294                && w.service_name == config.service_name)
295        });
296        webhooks.push(config);
297    }
298    persist(&state.webhooks).await;
299    (
300        StatusCode::CREATED,
301        Json(serde_json::json!({"status": "registered"})),
302    )
303}
304
305/// List all webhook configs.
306///
307/// Mounted at `GET /api/v1/webhooks`. Each entry carries the most recent
308/// invocation (if any) so the TUI dashboard renders without an N+1 fetch.
309pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
310    use orca_core::api_types::{WebhookEntry, WebhookListResponse};
311    let webhooks = state.webhooks.read().await;
312    let invocations = state.webhook_invocations.read().await;
313    let entries: Vec<WebhookEntry> = webhooks
314        .iter()
315        .map(|w| WebhookEntry {
316            repo: w.repo.clone(),
317            service_name: w.service_name.clone(),
318            branch: w.branch.clone(),
319            has_secret: w.secret.is_some(),
320            infra: w.infra,
321            last_invocation: invocations
322                .get(&w.service_name)
323                .and_then(|q| q.back())
324                .cloned(),
325        })
326        .collect();
327    Json(WebhookListResponse { webhooks: entries })
328}
329
330/// Remove a webhook by service name.
331///
332/// Mounted at `DELETE /api/v1/webhooks/{id}` where id is the service_name.
333pub async fn remove_webhook(
334    State(state): State<Arc<AppState>>,
335    Path(id): Path<String>,
336) -> impl IntoResponse {
337    let removed = {
338        let mut webhooks = state.webhooks.write().await;
339        let before = webhooks.len();
340        webhooks.retain(|w| w.service_name != id);
341        before - webhooks.len()
342    };
343    if removed > 0 {
344        persist(&state.webhooks).await;
345    }
346
347    if removed == 0 {
348        (
349            StatusCode::NOT_FOUND,
350            Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
351        )
352            .into_response()
353    } else {
354        info!("Webhook: removed {removed} webhook(s) for service '{id}'");
355        (
356            StatusCode::OK,
357            Json(serde_json::json!({"status": "removed", "count": removed})),
358        )
359            .into_response()
360    }
361}
362
363/// Handle an infra webhook: git pull the working directory, then redeploy
364/// all services from the refreshed service.toml files.
365async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
366    // Run `git pull` in the current working directory
367    let output = tokio::process::Command::new("git")
368        .args(["pull", "--ff-only"])
369        .output()
370        .await?;
371
372    if !output.status.success() {
373        let stderr = String::from_utf8_lossy(&output.stderr);
374        anyhow::bail!("git pull failed: {stderr}");
375    }
376    let stdout = String::from_utf8_lossy(&output.stdout);
377    info!("Infra git pull: {}", stdout.trim());
378
379    // Load all services from the services/ directory
380    let services_dir = std::path::Path::new("services");
381    let configs = if services_dir.is_dir() {
382        orca_core::config::ServicesConfig::load_dir(services_dir)?
383    } else {
384        orca_core::config::ServicesConfig::load("services.toml".as_ref())?
385    };
386
387    // Secrets are resolved in service_config_to_spec() at container creation
388    // time, not here. This ensures spec_matches() compares unresolved templates
389    // and doesn't restart containers just because a token was refreshed.
390    let count = configs.service.len();
391    let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
392
393    // Persist deployed services to store
394    if let Some(store) = &state.store {
395        for config in &configs.service {
396            if deployed.contains(&config.name)
397                && let Err(e) = store.set_service(&config.name, config)
398            {
399                tracing::warn!("Failed to persist {}: {e}", config.name);
400            }
401        }
402    }
403
404    if !errors.is_empty() {
405        warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
406    }
407    info!(
408        "Infra deploy complete: {}/{} services",
409        deployed.len(),
410        count
411    );
412    Ok(count)
413}
414
415#[cfg(test)]
416#[path = "webhook_tests.rs"]
417mod tests;