Skip to main content

orca_control/
webhook.rs

1//! Webhook handler for GitHub/Gitea/GitLab push events.
2//!
3//! When a push webhook fires, orca looks up the matching service and triggers
4//! a rolling redeploy (stop all instances, pull fresh image, recreate).
5
6use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::reconciler;
20use crate::state::AppState;
21
22type HmacSha256 = Hmac<Sha256>;
23
24/// Configuration for a webhook trigger.
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct WebhookConfig {
27    /// Repository full name, e.g. "myorg/api".
28    pub repo: String,
29    /// Orca service name to redeploy.
30    pub service_name: String,
31    /// Branch to watch (default: "main").
32    #[serde(default = "default_branch")]
33    pub branch: String,
34    /// Optional HMAC secret for signature validation.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub secret: Option<String>,
37    /// If true, this is an infra webhook: git pull + redeploy all services.
38    #[serde(default)]
39    pub infra: bool,
40}
41
42fn default_branch() -> String {
43    "main".to_string()
44}
45
46/// Shared webhook config store, stored in [`AppState`] extension.
47pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
48
49/// Path to the on-disk webhook config file (under `~/.orca`).
50///
51/// Honors `ORCA_WEBHOOKS_PATH` as an override for tests.
52fn webhooks_path() -> std::path::PathBuf {
53    if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
54        return std::path::PathBuf::from(p);
55    }
56    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
57    std::path::PathBuf::from(home).join(".orca/webhooks.json")
58}
59
60/// Load persisted webhooks from disk, returning an empty list on first run.
61pub fn new_store() -> WebhookStore {
62    let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
63        .ok()
64        .and_then(|raw| serde_json::from_str(&raw).ok())
65        .unwrap_or_default();
66    Arc::new(RwLock::new(configs))
67}
68
69/// Persist the current webhook list to disk. Errors are logged, not returned,
70/// so they don't fail the request that triggered the change.
71async fn persist(store: &WebhookStore) {
72    let snapshot = store.read().await.clone();
73    let path = webhooks_path();
74    if let Some(parent) = path.parent() {
75        let _ = std::fs::create_dir_all(parent);
76    }
77    match serde_json::to_string_pretty(&snapshot) {
78        Ok(json) => {
79            if let Err(e) = std::fs::write(&path, json) {
80                error!("Failed to persist webhooks to {}: {e}", path.display());
81            }
82        }
83        Err(e) => error!("Failed to serialize webhooks: {e}"),
84    }
85}
86
87/// Subset of GitHub push webhook payload we care about.
88#[derive(Debug, serde::Deserialize)]
89struct PushPayload {
90    /// e.g. "refs/heads/main"
91    #[serde(rename = "ref")]
92    git_ref: String,
93    repository: RepoInfo,
94    head_commit: Option<CommitInfo>,
95}
96
97#[derive(Debug, serde::Deserialize)]
98struct RepoInfo {
99    full_name: String,
100}
101
102#[derive(Debug, serde::Deserialize)]
103struct CommitInfo {
104    id: String,
105    message: String,
106}
107
108/// Extract branch name from a git ref like "refs/heads/main".
109fn branch_from_ref(git_ref: &str) -> Option<&str> {
110    git_ref.strip_prefix("refs/heads/")
111}
112
113/// Validate HMAC-SHA256 signature from the `X-Hub-Signature-256` header.
114fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
115    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
116        return false;
117    };
118
119    let Ok(expected) = hex::decode(hex_sig) else {
120        return false;
121    };
122
123    let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
124        return false;
125    };
126
127    mac.update(body);
128    mac.verify_slice(&expected).is_ok()
129}
130
131/// Handle a GitHub/Gitea push webhook.
132///
133/// Mounted at `POST /api/v1/webhooks/github`.
134/// Build webhook routes.
135/// Build webhook routes (call before with_state on parent router).
136pub fn webhook_router() -> Router<Arc<AppState>> {
137    Router::new()
138        .route("/api/v1/webhooks/github", post(handle_push))
139        .route("/api/v1/webhooks", post(register).get(list))
140        .route("/api/v1/webhooks/{id}", delete(remove_webhook))
141}
142
143pub async fn handle_push(
144    State(state): State<Arc<AppState>>,
145    headers: HeaderMap,
146    body: axum::body::Bytes,
147) -> impl IntoResponse {
148    // Parse the payload
149    let payload: PushPayload = match serde_json::from_slice(&body) {
150        Ok(p) => p,
151        Err(e) => {
152            warn!("Webhook: invalid payload: {e}");
153            return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
154        }
155    };
156
157    let repo = &payload.repository.full_name;
158    let Some(branch) = branch_from_ref(&payload.git_ref) else {
159        return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
160    };
161
162    let commit_id = payload
163        .head_commit
164        .as_ref()
165        .map(|c| c.id.as_str())
166        .unwrap_or("unknown");
167    let commit_msg = payload
168        .head_commit
169        .as_ref()
170        .and_then(|c| c.message.lines().next())
171        .unwrap_or("");
172    let short_sha = &commit_id[..commit_id.len().min(8)];
173
174    info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
175
176    // Find matching webhook config
177    let webhooks = state.webhooks.read().await;
178    let matching: Vec<WebhookConfig> = webhooks
179        .iter()
180        .filter(|w| w.repo == *repo && w.branch == branch)
181        .cloned()
182        .collect();
183    drop(webhooks);
184
185    if matching.is_empty() {
186        info!("Webhook: no config for {repo}#{branch}, ignoring");
187        return (
188            StatusCode::OK,
189            "ignored: no matching webhook config".to_string(),
190        )
191            .into_response();
192    }
193
194    let sig_header = headers
195        .get("X-Hub-Signature-256")
196        .and_then(|v| v.to_str().ok())
197        .unwrap_or("");
198
199    let mut deployed = Vec::new();
200    let mut errors = Vec::new();
201    let mut sig_failures = 0u32;
202
203    for wh in &matching {
204        // Validate secret if configured
205        if let Some(secret) = &wh.secret
206            && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
207        {
208            sig_failures += 1;
209            warn!("Webhook: HMAC validation failed for {}", wh.service_name);
210            continue;
211        }
212
213        if wh.infra {
214            info!("Webhook: infra push detected, running git pull + deploy all");
215            match handle_infra_deploy(&state).await {
216                Ok(count) => {
217                    deployed.push(format!("infra ({count} services)"));
218                }
219                Err(e) => {
220                    error!("Webhook: infra deploy failed: {e}");
221                    errors.push(format!("infra: {e}"));
222                }
223            }
224            continue;
225        }
226
227        info!("Webhook: triggering redeploy of {}", wh.service_name);
228        match reconciler::redeploy(&state, &wh.service_name).await {
229            Ok(()) => deployed.push(wh.service_name.clone()),
230            Err(e) => {
231                error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
232                errors.push(format!("{}: {e}", wh.service_name));
233            }
234        }
235    }
236
237    // If every matching webhook failed signature validation, return 401
238    if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
239        return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
240    }
241
242    let status = if errors.is_empty() {
243        StatusCode::OK
244    } else if deployed.is_empty() {
245        StatusCode::INTERNAL_SERVER_ERROR
246    } else {
247        StatusCode::PARTIAL_CONTENT
248    };
249
250    (
251        status,
252        Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
253    )
254        .into_response()
255}
256
257/// Register a new webhook config.
258///
259/// Mounted at `POST /api/v1/webhooks`.
260pub async fn register(
261    State(state): State<Arc<AppState>>,
262    Json(config): Json<WebhookConfig>,
263) -> impl IntoResponse {
264    info!(
265        "Webhook: registering {}#{} -> {}",
266        config.repo, config.branch, config.service_name
267    );
268    {
269        let mut webhooks = state.webhooks.write().await;
270        // Remove existing config for same repo+branch+service to allow updates
271        webhooks.retain(|w| {
272            !(w.repo == config.repo
273                && w.branch == config.branch
274                && w.service_name == config.service_name)
275        });
276        webhooks.push(config);
277    }
278    persist(&state.webhooks).await;
279    (
280        StatusCode::CREATED,
281        Json(serde_json::json!({"status": "registered"})),
282    )
283}
284
285/// List all webhook configs.
286///
287/// Mounted at `GET /api/v1/webhooks`.
288pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
289    let webhooks = state.webhooks.read().await;
290    Json(serde_json::json!({ "webhooks": *webhooks }))
291}
292
293/// Remove a webhook by service name.
294///
295/// Mounted at `DELETE /api/v1/webhooks/{id}` where id is the service_name.
296pub async fn remove_webhook(
297    State(state): State<Arc<AppState>>,
298    Path(id): Path<String>,
299) -> impl IntoResponse {
300    let removed = {
301        let mut webhooks = state.webhooks.write().await;
302        let before = webhooks.len();
303        webhooks.retain(|w| w.service_name != id);
304        before - webhooks.len()
305    };
306    if removed > 0 {
307        persist(&state.webhooks).await;
308    }
309
310    if removed == 0 {
311        (
312            StatusCode::NOT_FOUND,
313            Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
314        )
315            .into_response()
316    } else {
317        info!("Webhook: removed {removed} webhook(s) for service '{id}'");
318        (
319            StatusCode::OK,
320            Json(serde_json::json!({"status": "removed", "count": removed})),
321        )
322            .into_response()
323    }
324}
325
326/// Handle an infra webhook: git pull the working directory, then redeploy
327/// all services from the refreshed service.toml files.
328async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
329    // Run `git pull` in the current working directory
330    let output = tokio::process::Command::new("git")
331        .args(["pull", "--ff-only"])
332        .output()
333        .await?;
334
335    if !output.status.success() {
336        let stderr = String::from_utf8_lossy(&output.stderr);
337        anyhow::bail!("git pull failed: {stderr}");
338    }
339    let stdout = String::from_utf8_lossy(&output.stdout);
340    info!("Infra git pull: {}", stdout.trim());
341
342    // Load all services from the services/ directory
343    let services_dir = std::path::Path::new("services");
344    let configs = if services_dir.is_dir() {
345        orca_core::config::ServicesConfig::load_dir(services_dir)?
346    } else {
347        orca_core::config::ServicesConfig::load("services.toml".as_ref())?
348    };
349
350    // Secrets are resolved in service_config_to_spec() at container creation
351    // time, not here. This ensures spec_matches() compares unresolved templates
352    // and doesn't restart containers just because a token was refreshed.
353    let count = configs.service.len();
354    let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
355
356    // Persist deployed services to store
357    if let Some(store) = &state.store {
358        for config in &configs.service {
359            if deployed.contains(&config.name)
360                && let Err(e) = store.set_service(&config.name, config)
361            {
362                tracing::warn!("Failed to persist {}: {e}", config.name);
363            }
364        }
365    }
366
367    if !errors.is_empty() {
368        warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
369    }
370    info!(
371        "Infra deploy complete: {}/{} services",
372        deployed.len(),
373        count
374    );
375    Ok(count)
376}
377
378#[cfg(test)]
379#[path = "webhook_tests.rs"]
380mod tests;