Skip to main content

orca_control/
webhook.rs

1//! Webhook handler for GitHub/Gitea/GitLab push events.
2//!
3//! When a push webhook fires, orca looks up the matching service and triggers
4//! a rolling redeploy (stop all instances, pull fresh image, recreate).
5
6use std::sync::Arc;
7
8use axum::extract::Path;
9use axum::extract::State;
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::routing::{delete, post};
13use axum::{Json, Router};
14use hmac::{Hmac, Mac};
15use sha2::Sha256;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::operations::AgentOfflineError;
20use crate::reconciler;
21use crate::state::AppState;
22
23type HmacSha256 = Hmac<Sha256>;
24
25/// Configuration for a webhook trigger.
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27pub struct WebhookConfig {
28    /// Repository full name, e.g. "myorg/api".
29    pub repo: String,
30    /// Orca service name to redeploy.
31    pub service_name: String,
32    /// Branch to watch (default: "main").
33    #[serde(default = "default_branch")]
34    pub branch: String,
35    /// Optional HMAC secret for signature validation.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub secret: Option<String>,
38    /// If true, this is an infra webhook: git pull + redeploy all services.
39    #[serde(default)]
40    pub infra: bool,
41}
42
43fn default_branch() -> String {
44    "main".to_string()
45}
46
47/// Shared webhook config store, stored in [`AppState`] extension.
48pub type WebhookStore = Arc<RwLock<Vec<WebhookConfig>>>;
49
50/// Path to the on-disk webhook config file (under `~/.orca`).
51///
52/// Honors `ORCA_WEBHOOKS_PATH` as an override for tests.
53fn webhooks_path() -> std::path::PathBuf {
54    if let Ok(p) = std::env::var("ORCA_WEBHOOKS_PATH") {
55        return std::path::PathBuf::from(p);
56    }
57    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
58    std::path::PathBuf::from(home).join(".orca/webhooks.json")
59}
60
61/// Load persisted webhooks from disk, returning an empty list on first run.
62pub fn new_store() -> WebhookStore {
63    let configs: Vec<WebhookConfig> = std::fs::read_to_string(webhooks_path())
64        .ok()
65        .and_then(|raw| serde_json::from_str(&raw).ok())
66        .unwrap_or_default();
67    Arc::new(RwLock::new(configs))
68}
69
70/// Persist the current webhook list to disk. Errors are logged, not returned,
71/// so they don't fail the request that triggered the change.
72async fn persist(store: &WebhookStore) {
73    let snapshot = store.read().await.clone();
74    let path = webhooks_path();
75    if let Some(parent) = path.parent() {
76        let _ = std::fs::create_dir_all(parent);
77    }
78    match serde_json::to_string_pretty(&snapshot) {
79        Ok(json) => {
80            if let Err(e) = std::fs::write(&path, json) {
81                error!("Failed to persist webhooks to {}: {e}", path.display());
82            }
83        }
84        Err(e) => error!("Failed to serialize webhooks: {e}"),
85    }
86}
87
88/// Subset of GitHub push webhook payload we care about.
89#[derive(Debug, serde::Deserialize)]
90struct PushPayload {
91    /// e.g. "refs/heads/main"
92    #[serde(rename = "ref")]
93    git_ref: String,
94    repository: RepoInfo,
95    head_commit: Option<CommitInfo>,
96}
97
98#[derive(Debug, serde::Deserialize)]
99struct RepoInfo {
100    full_name: String,
101}
102
103#[derive(Debug, serde::Deserialize)]
104struct CommitInfo {
105    id: String,
106    message: String,
107}
108
109/// Extract branch name from a git ref like "refs/heads/main".
110fn branch_from_ref(git_ref: &str) -> Option<&str> {
111    git_ref.strip_prefix("refs/heads/")
112}
113
114/// Validate HMAC-SHA256 signature from the `X-Hub-Signature-256` header.
115fn validate_signature(secret: &str, body: &[u8], signature_header: &str) -> bool {
116    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
117        return false;
118    };
119
120    let Ok(expected) = hex::decode(hex_sig) else {
121        return false;
122    };
123
124    let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
125        return false;
126    };
127
128    mac.update(body);
129    mac.verify_slice(&expected).is_ok()
130}
131
132/// Handle a GitHub/Gitea push webhook.
133///
134/// Mounted at `POST /api/v1/webhooks/github`.
135/// Build webhook routes.
136/// Build webhook routes (call before with_state on parent router).
137pub fn webhook_router() -> Router<Arc<AppState>> {
138    Router::new()
139        .route("/api/v1/webhooks/github", post(handle_push))
140        .route("/api/v1/webhooks", post(register).get(list))
141        .route("/api/v1/webhooks/{id}", delete(remove_webhook))
142}
143
144pub async fn handle_push(
145    State(state): State<Arc<AppState>>,
146    headers: HeaderMap,
147    body: axum::body::Bytes,
148) -> impl IntoResponse {
149    // Parse the payload
150    let payload: PushPayload = match serde_json::from_slice(&body) {
151        Ok(p) => p,
152        Err(e) => {
153            warn!("Webhook: invalid payload: {e}");
154            return (StatusCode::BAD_REQUEST, format!("invalid payload: {e}")).into_response();
155        }
156    };
157
158    let repo = &payload.repository.full_name;
159    let Some(branch) = branch_from_ref(&payload.git_ref) else {
160        return (StatusCode::OK, "ignored: not a branch push".to_string()).into_response();
161    };
162
163    let commit_id = payload
164        .head_commit
165        .as_ref()
166        .map(|c| c.id.as_str())
167        .unwrap_or("unknown");
168    let commit_msg = payload
169        .head_commit
170        .as_ref()
171        .and_then(|c| c.message.lines().next())
172        .unwrap_or("");
173    let short_sha = &commit_id[..commit_id.len().min(8)];
174
175    info!("Webhook: push to {repo}#{branch} (commit {short_sha}: {commit_msg})");
176
177    // Find matching webhook config
178    let webhooks = state.webhooks.read().await;
179    let matching: Vec<WebhookConfig> = webhooks
180        .iter()
181        .filter(|w| w.repo == *repo && w.branch == branch)
182        .cloned()
183        .collect();
184    drop(webhooks);
185
186    if matching.is_empty() {
187        info!("Webhook: no config for {repo}#{branch}, ignoring");
188        return (
189            StatusCode::OK,
190            "ignored: no matching webhook config".to_string(),
191        )
192            .into_response();
193    }
194
195    let sig_header = headers
196        .get("X-Hub-Signature-256")
197        .and_then(|v| v.to_str().ok())
198        .unwrap_or("");
199
200    let mut deployed = Vec::new();
201    let mut errors = Vec::new();
202    let mut sig_failures = 0u32;
203    let mut agent_offline = false;
204
205    for wh in &matching {
206        // Validate secret if configured
207        if let Some(secret) = &wh.secret
208            && (sig_header.is_empty() || !validate_signature(secret, &body, sig_header))
209        {
210            sig_failures += 1;
211            warn!("Webhook: HMAC validation failed for {}", wh.service_name);
212            continue;
213        }
214
215        if wh.infra {
216            info!("Webhook: infra push detected, spawning git pull + deploy all");
217            let state_clone = Arc::clone(&state);
218            tokio::spawn(async move {
219                match handle_infra_deploy(&state_clone).await {
220                    Ok(count) => info!("Infra deploy complete: {count} services"),
221                    Err(e) => error!("Webhook: infra deploy failed: {e}"),
222                }
223            });
224            deployed.push("infra (deploying)".to_string());
225            continue;
226        }
227
228        info!("Webhook: triggering redeploy of {}", wh.service_name);
229        match reconciler::redeploy(&state, &wh.service_name).await {
230            Ok(()) => deployed.push(wh.service_name.clone()),
231            Err(e) => {
232                if e.downcast_ref::<AgentOfflineError>().is_some() {
233                    agent_offline = true;
234                }
235                error!("Webhook: redeploy of {} failed: {e}", wh.service_name);
236                errors.push(format!("{}: {e}", wh.service_name));
237            }
238        }
239    }
240
241    // If every matching webhook failed signature validation, return 401
242    if sig_failures > 0 && deployed.is_empty() && errors.is_empty() {
243        return (StatusCode::UNAUTHORIZED, "signature validation failed").into_response();
244    }
245
246    let status = if errors.is_empty() {
247        StatusCode::OK
248    } else if deployed.is_empty() && agent_offline {
249        StatusCode::SERVICE_UNAVAILABLE
250    } else if deployed.is_empty() {
251        StatusCode::INTERNAL_SERVER_ERROR
252    } else {
253        StatusCode::PARTIAL_CONTENT
254    };
255
256    (
257        status,
258        Json(serde_json::json!({ "deployed": deployed, "errors": errors })),
259    )
260        .into_response()
261}
262
263/// Register a new webhook config.
264///
265/// Mounted at `POST /api/v1/webhooks`.
266pub async fn register(
267    State(state): State<Arc<AppState>>,
268    Json(config): Json<WebhookConfig>,
269) -> impl IntoResponse {
270    info!(
271        "Webhook: registering {}#{} -> {}",
272        config.repo, config.branch, config.service_name
273    );
274    {
275        let mut webhooks = state.webhooks.write().await;
276        // Remove existing config for same repo+branch+service to allow updates
277        webhooks.retain(|w| {
278            !(w.repo == config.repo
279                && w.branch == config.branch
280                && w.service_name == config.service_name)
281        });
282        webhooks.push(config);
283    }
284    persist(&state.webhooks).await;
285    (
286        StatusCode::CREATED,
287        Json(serde_json::json!({"status": "registered"})),
288    )
289}
290
291/// List all webhook configs.
292///
293/// Mounted at `GET /api/v1/webhooks`.
294pub async fn list(State(state): State<Arc<AppState>>) -> impl IntoResponse {
295    let webhooks = state.webhooks.read().await;
296    Json(serde_json::json!({ "webhooks": *webhooks }))
297}
298
299/// Remove a webhook by service name.
300///
301/// Mounted at `DELETE /api/v1/webhooks/{id}` where id is the service_name.
302pub async fn remove_webhook(
303    State(state): State<Arc<AppState>>,
304    Path(id): Path<String>,
305) -> impl IntoResponse {
306    let removed = {
307        let mut webhooks = state.webhooks.write().await;
308        let before = webhooks.len();
309        webhooks.retain(|w| w.service_name != id);
310        before - webhooks.len()
311    };
312    if removed > 0 {
313        persist(&state.webhooks).await;
314    }
315
316    if removed == 0 {
317        (
318            StatusCode::NOT_FOUND,
319            Json(serde_json::json!({"error": format!("no webhook for service '{id}'")})),
320        )
321            .into_response()
322    } else {
323        info!("Webhook: removed {removed} webhook(s) for service '{id}'");
324        (
325            StatusCode::OK,
326            Json(serde_json::json!({"status": "removed", "count": removed})),
327        )
328            .into_response()
329    }
330}
331
332/// Handle an infra webhook: git pull the working directory, then redeploy
333/// all services from the refreshed service.toml files.
334async fn handle_infra_deploy(state: &AppState) -> anyhow::Result<usize> {
335    // Run `git pull` in the current working directory
336    let output = tokio::process::Command::new("git")
337        .args(["pull", "--ff-only"])
338        .output()
339        .await?;
340
341    if !output.status.success() {
342        let stderr = String::from_utf8_lossy(&output.stderr);
343        anyhow::bail!("git pull failed: {stderr}");
344    }
345    let stdout = String::from_utf8_lossy(&output.stdout);
346    info!("Infra git pull: {}", stdout.trim());
347
348    // Load all services from the services/ directory
349    let services_dir = std::path::Path::new("services");
350    let configs = if services_dir.is_dir() {
351        orca_core::config::ServicesConfig::load_dir(services_dir)?
352    } else {
353        orca_core::config::ServicesConfig::load("services.toml".as_ref())?
354    };
355
356    // Secrets are resolved in service_config_to_spec() at container creation
357    // time, not here. This ensures spec_matches() compares unresolved templates
358    // and doesn't restart containers just because a token was refreshed.
359    let count = configs.service.len();
360    let (deployed, errors) = reconciler::reconcile(state, &configs.service).await;
361
362    // Persist deployed services to store
363    if let Some(store) = &state.store {
364        for config in &configs.service {
365            if deployed.contains(&config.name)
366                && let Err(e) = store.set_service(&config.name, config)
367            {
368                tracing::warn!("Failed to persist {}: {e}", config.name);
369            }
370        }
371    }
372
373    if !errors.is_empty() {
374        warn!("Infra deploy: {} errors: {:?}", errors.len(), errors);
375    }
376    info!(
377        "Infra deploy complete: {}/{} services",
378        deployed.len(),
379        count
380    );
381    Ok(count)
382}
383
384#[cfg(test)]
385#[path = "webhook_tests.rs"]
386mod tests;