Skip to main content

mockforge_registry_server/handlers/
snapshots.rs

1//! Time Travel snapshot handlers (cloud-enablement task #10 / Phase 1).
2//!
3//! Phase 1 surface only — capture-trigger + read paths + delete. The
4//! actual capture worker (consumes 'capturing' rows from the test_runs
5//! queue with kind='snapshot_capture') and restore worker land in
6//! follow-up slices.
7//!
8//! Routes:
9//!   GET    /api/v1/workspaces/{workspace_id}/snapshots
10//!   POST   /api/v1/workspaces/{workspace_id}/snapshots         (trigger capture)
11//!   GET    /api/v1/snapshots/{id}
12//!   DELETE /api/v1/snapshots/{id}
13
14use axum::{
15    extract::{Path, Query, State},
16    http::HeaderMap,
17    Json,
18};
19use chrono::{Duration, Utc};
20use mockforge_registry_core::models::snapshot::CreateSnapshot;
21use serde::Deserialize;
22use uuid::Uuid;
23
24use crate::{
25    error::{ApiError, ApiResult},
26    handlers::usage::effective_limits,
27    middleware::{resolve_org_context, AuthUser},
28    models::{CloudWorkspace, Snapshot, UsageCounter},
29    AppState,
30};
31
32const DEFAULT_LIMIT: i64 = 100;
33const MAX_LIMIT: i64 = 500;
34
35#[derive(Debug, Deserialize)]
36pub struct ListSnapshotsQuery {
37    #[serde(default)]
38    pub limit: Option<i64>,
39}
40
41/// `GET /api/v1/workspaces/{workspace_id}/snapshots`
42pub async fn list_snapshots(
43    State(state): State<AppState>,
44    AuthUser(user_id): AuthUser,
45    Path(workspace_id): Path<Uuid>,
46    Query(query): Query<ListSnapshotsQuery>,
47    headers: HeaderMap,
48) -> ApiResult<Json<Vec<Snapshot>>> {
49    authorize_workspace(&state, user_id, &headers, workspace_id).await?;
50    let limit = query.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT);
51    let snapshots = Snapshot::list_by_workspace(state.db.pool(), workspace_id, limit)
52        .await
53        .map_err(ApiError::Database)?;
54    Ok(Json(snapshots))
55}
56
57#[derive(Debug, Deserialize)]
58pub struct CaptureSnapshotRequest {
59    #[serde(default)]
60    pub name: Option<String>,
61    #[serde(default)]
62    pub description: Option<String>,
63    #[serde(default)]
64    pub hosted_deployment_id: Option<Uuid>,
65    /// Defaults to "manual". Other valid values: "schedule", "pre_chaos",
66    /// "pre_restore" — used by internal callers, not external API users.
67    #[serde(default)]
68    pub triggered_by: Option<String>,
69}
70
71/// `POST /api/v1/workspaces/{workspace_id}/snapshots`
72///
73/// Inserts a row in `capturing` state and (eventually) enqueues the
74/// capture worker. Worker enqueue is a follow-up slice; the row alone
75/// is enough for the UI to render an in-progress capture.
76pub async fn capture_snapshot(
77    State(state): State<AppState>,
78    AuthUser(user_id): AuthUser,
79    Path(workspace_id): Path<Uuid>,
80    headers: HeaderMap,
81    Json(request): Json<CaptureSnapshotRequest>,
82) -> ApiResult<Json<Snapshot>> {
83    let ctx = authorize_workspace(&state, user_id, &headers, workspace_id).await?;
84
85    // Plan-limit checks.
86    let limits = effective_limits(&state, &ctx.org).await?;
87    let max_snapshots = limits.get("max_snapshots").and_then(|v| v.as_i64()).unwrap_or(0);
88    if max_snapshots == 0 {
89        return Err(ApiError::ResourceLimitExceeded(
90            "Time Travel snapshots are not enabled on this plan".into(),
91        ));
92    }
93    if max_snapshots > 0 {
94        let used = Snapshot::count_by_workspace(state.db.pool(), workspace_id)
95            .await
96            .map_err(ApiError::Database)?;
97        if used >= max_snapshots {
98            return Err(ApiError::ResourceLimitExceeded(format!(
99                "Snapshot limit reached ({used}/{max_snapshots}). Delete an old \
100                 snapshot or upgrade your plan."
101            )));
102        }
103    }
104
105    // triggered_by validation. Only `manual` is accepted on the public
106    // route; the schedule worker / chaos/restore hooks call the model
107    // directly and don't go through this handler.
108    let triggered_by = request.triggered_by.as_deref().unwrap_or("manual");
109    if triggered_by != "manual" {
110        return Err(ApiError::InvalidRequest(
111            "triggered_by must be 'manual' for user-initiated captures".into(),
112        ));
113    }
114
115    // expires_at = created_at + plan retention days.
116    let retention_days =
117        limits.get("snapshot_retention_days").and_then(|v| v.as_i64()).unwrap_or(7);
118    let expires_at = if retention_days > 0 {
119        Some(Utc::now() + Duration::days(retention_days))
120    } else {
121        None
122    };
123
124    let snapshot = Snapshot::create(
125        state.db.pool(),
126        CreateSnapshot {
127            workspace_id,
128            hosted_deployment_id: request.hosted_deployment_id,
129            name: request.name.as_deref(),
130            description: request.description.as_deref(),
131            triggered_by,
132            triggered_by_user: Some(user_id),
133            expires_at,
134        },
135    )
136    .await
137    .map_err(ApiError::Database)?;
138
139    // Capture the workspace state synchronously. Sub-second on a
140    // typical workspace, so the request stays interactive without a
141    // background worker round-trip.
142    //
143    // Manifests under INLINE_THRESHOLD bytes ride along in
144    // snapshots.manifest as before. Larger manifests get uploaded to
145    // the storage backend (S3 or local fallback) and the row carries
146    // a real storage_url; the inline manifest column keeps a small
147    // summary stub so the UI's quick-look still has something without
148    // a follow-up fetch.
149    const INLINE_THRESHOLD: i64 = 256 * 1024; // 256 KB
150    let (manifest, size_bytes) = match build_workspace_manifest(state.db.pool(), workspace_id).await
151    {
152        Ok((m, s)) => (m, s),
153        Err(e) => {
154            tracing::error!(snapshot_id = %snapshot.id, error = %e, "manifest build failed");
155            // Flip status to 'failed' so list_by_workspace reflects reality.
156            let _ = Snapshot::mark_failed(state.db.pool(), snapshot.id).await;
157            return Err(ApiError::Database(e));
158        }
159    };
160
161    let (storage_url, stored_manifest) = if size_bytes > INLINE_THRESHOLD {
162        // Upload the full blob; keep a small summary on the row so
163        // listings + diffs that don't need the full payload stay fast.
164        let bytes = serde_json::to_vec(&manifest).unwrap_or_default();
165        match state.storage.upload_snapshot_blob(workspace_id, snapshot.id, bytes).await {
166            Ok(url) => {
167                let summary = manifest
168                    .get("counts")
169                    .cloned()
170                    .map(|c| serde_json::json!({ "counts": c, "external": true }))
171                    .unwrap_or_else(|| serde_json::json!({ "external": true }));
172                (url, summary)
173            }
174            Err(e) => {
175                tracing::warn!(
176                    snapshot_id = %snapshot.id,
177                    error = %e,
178                    "snapshot blob upload failed; falling back to inline manifest",
179                );
180                (format!("inline-manifest://snapshot/{}", snapshot.id), manifest)
181            }
182        }
183    } else {
184        (format!("inline-manifest://snapshot/{}", snapshot.id), manifest)
185    };
186    match Snapshot::mark_ready(
187        state.db.pool(),
188        snapshot.id,
189        &storage_url,
190        size_bytes,
191        &stored_manifest,
192    )
193    .await
194    {
195        Ok(Some(ready)) => {
196            // Storage metering is a gauge (set_snapshot_bytes) not a
197            // counter; updating it correctly requires reading the
198            // current size_bytes sum across all ready snapshots, which
199            // would race with other captures landing concurrently. The
200            // usage_threshold_checker worker reconciles the gauge from
201            // the source of truth, so we leave it alone here.
202            let _ = ctx; // ctx no longer load-bearing post-mark_ready
203            Ok(Json(ready))
204        }
205        Ok(None) => Ok(Json(snapshot)), // already terminal — return what we have
206        Err(e) => {
207            let _ = Snapshot::mark_failed(state.db.pool(), snapshot.id).await;
208            Err(ApiError::Database(e))
209        }
210    }
211}
212
213/// Build a JSON manifest of the workspace's authoritative state.
214/// Includes the resources a "restore" would want to recreate: services,
215/// fixtures, scenarios, environments, federation links, folders.
216/// Returns (manifest, byte_count) so the caller can bill storage usage.
217async fn build_workspace_manifest(
218    pool: &sqlx::PgPool,
219    workspace_id: Uuid,
220) -> sqlx::Result<(serde_json::Value, i64)> {
221    use mockforge_registry_core::models::{
222        flow::Flow, mock_environment::MockEnvironment, ChaosCampaign,
223    };
224
225    // Each list is best-effort — if a resource family fails to load we
226    // log + include an empty array. A partial snapshot is more useful
227    // than no snapshot at all, and `restored_partial: true` in the
228    // manifest tells a future restore worker to be cautious.
229    let mut partial = false;
230    let environments = match MockEnvironment::list_by_workspace(pool, workspace_id).await {
231        Ok(rows) => rows,
232        Err(e) => {
233            tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: mock_environments fetch failed");
234            partial = true;
235            Vec::new()
236        }
237    };
238    let flows = match Flow::list_by_workspace(pool, workspace_id, None).await {
239        Ok(rows) => rows,
240        Err(e) => {
241            tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: flows fetch failed");
242            partial = true;
243            Vec::new()
244        }
245    };
246    let chaos = match ChaosCampaign::list_by_workspace(pool, workspace_id).await {
247        Ok(rows) => rows,
248        Err(e) => {
249            tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: chaos fetch failed");
250            partial = true;
251            Vec::new()
252        }
253    };
254
255    // Raw services / fixtures table dumps via sqlx so we don't need a
256    // model API for every column — the manifest is forward-compatible
257    // because new columns just appear in the JSON.
258    let services = sqlx::query_as::<_, (Uuid, serde_json::Value)>(
259        "SELECT id, to_jsonb(s) AS doc FROM services s WHERE workspace_id = $1",
260    )
261    .bind(workspace_id)
262    .fetch_all(pool)
263    .await
264    .unwrap_or_else(|e| {
265        tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: services fetch failed");
266        partial = true;
267        Vec::new()
268    });
269    let fixtures = sqlx::query_as::<_, (Uuid, serde_json::Value)>(
270        "SELECT id, to_jsonb(f) AS doc FROM fixtures f WHERE workspace_id = $1",
271    )
272    .bind(workspace_id)
273    .fetch_all(pool)
274    .await
275    .unwrap_or_else(|e| {
276        tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: fixtures fetch failed");
277        partial = true;
278        Vec::new()
279    });
280
281    let manifest = serde_json::json!({
282        "schema_version": 1,
283        "captured_at": Utc::now(),
284        "workspace_id": workspace_id,
285        "partial": partial,
286        "counts": {
287            "services": services.len(),
288            "fixtures": fixtures.len(),
289            "environments": environments.len(),
290            "flows": flows.len(),
291            "chaos_campaigns": chaos.len(),
292        },
293        "services": services.into_iter().map(|(_, doc)| doc).collect::<Vec<_>>(),
294        "fixtures": fixtures.into_iter().map(|(_, doc)| doc).collect::<Vec<_>>(),
295        "environments": environments,
296        "flows": flows,
297        "chaos_campaigns": chaos,
298    });
299
300    let bytes = manifest.to_string().len() as i64;
301    Ok((manifest, bytes))
302}
303
304/// `GET /api/v1/snapshots/{id}`
305pub async fn get_snapshot(
306    State(state): State<AppState>,
307    AuthUser(user_id): AuthUser,
308    Path(id): Path<Uuid>,
309    headers: HeaderMap,
310) -> ApiResult<Json<Snapshot>> {
311    let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
312    Ok(Json(snapshot))
313}
314
315/// `GET /api/v1/snapshots/{id}/diff?against=current`
316///
317/// Compares the snapshot's manifest against either the workspace's
318/// current state (`against=current`, default) or another snapshot
319/// (`against=<other_snapshot_id>`). Returns per-resource counts of
320/// added/removed/changed plus the actual diff lists so the UI can
321/// render a side-by-side review before the user commits to a restore.
322#[derive(Debug, Deserialize)]
323pub struct DiffQuery {
324    #[serde(default)]
325    pub against: Option<String>,
326}
327
328#[derive(Debug, serde::Serialize)]
329pub struct ResourceDiff {
330    pub added: Vec<serde_json::Value>,
331    pub removed: Vec<serde_json::Value>,
332    pub changed: Vec<DiffPair>,
333}
334
335#[derive(Debug, serde::Serialize)]
336pub struct DiffPair {
337    pub from: serde_json::Value,
338    pub to: serde_json::Value,
339}
340
341#[derive(Debug, serde::Serialize)]
342pub struct SnapshotDiff {
343    pub snapshot_id: Uuid,
344    pub against_kind: String,
345    pub against_snapshot_id: Option<Uuid>,
346    pub services: ResourceDiff,
347    pub fixtures: ResourceDiff,
348    pub flows: ResourceDiff,
349    pub environments: ResourceDiff,
350    pub chaos_campaigns: ResourceDiff,
351}
352
353pub async fn diff_snapshot(
354    State(state): State<AppState>,
355    AuthUser(user_id): AuthUser,
356    Path(id): Path<Uuid>,
357    Query(query): Query<DiffQuery>,
358    headers: HeaderMap,
359) -> ApiResult<Json<SnapshotDiff>> {
360    let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
361    let snapshot_manifest = resolve_manifest(&state, &snapshot).await;
362
363    let against_str = query.against.as_deref().unwrap_or("current");
364    let (against_kind, against_id, against_manifest) = if against_str == "current" {
365        let (m, _) = build_workspace_manifest(state.db.pool(), snapshot.workspace_id)
366            .await
367            .map_err(ApiError::Database)?;
368        ("current".to_string(), None, m)
369    } else {
370        let other_id = Uuid::parse_str(against_str).map_err(|_| {
371            ApiError::InvalidRequest("'against' must be 'current' or a snapshot UUID".into())
372        })?;
373        let other = load_authorized_snapshot(&state, user_id, &headers, other_id).await?;
374        if other.workspace_id != snapshot.workspace_id {
375            return Err(ApiError::InvalidRequest(
376                "Cannot diff snapshots across different workspaces".into(),
377            ));
378        }
379        let m = resolve_manifest(&state, &other).await;
380        ("snapshot".to_string(), Some(other_id), m)
381    };
382
383    Ok(Json(SnapshotDiff {
384        snapshot_id: snapshot.id,
385        against_kind,
386        against_snapshot_id: against_id,
387        services: diff_resource(&snapshot_manifest, &against_manifest, "services"),
388        fixtures: diff_resource(&snapshot_manifest, &against_manifest, "fixtures"),
389        flows: diff_resource(&snapshot_manifest, &against_manifest, "flows"),
390        environments: diff_resource(&snapshot_manifest, &against_manifest, "environments"),
391        chaos_campaigns: diff_resource(&snapshot_manifest, &against_manifest, "chaos_campaigns"),
392    }))
393}
394
395/// Resolve a snapshot's manifest. Falls back through three sources:
396/// 1. The inline `snapshots.manifest` column (small workspaces or pre-S3 rows).
397/// 2. The blob at `storage_url` if it points at the storage backend
398///    (newer rows where manifest exceeded INLINE_THRESHOLD).
399/// 3. Empty object if neither path produces JSON.
400async fn resolve_manifest(state: &AppState, snapshot: &Snapshot) -> serde_json::Value {
401    let inline = snapshot.manifest.clone().unwrap_or_else(|| serde_json::json!({}));
402
403    // If inline carries the full manifest (no `external: true` marker)
404    // we can use it as-is. The capture handler stamps that marker on
405    // rows whose manifest was uploaded out-of-line.
406    let is_external = inline.get("external").and_then(|v| v.as_bool()).unwrap_or(false);
407    if !is_external {
408        return inline;
409    }
410
411    // External: fetch the blob via the storage backend. Fall back to
412    // the inline summary when the fetch fails so the diff/restore
413    // path still produces something rather than 500ing.
414    match state.storage.read_snapshot_blob(snapshot.workspace_id, snapshot.id).await {
415        Ok(bytes) => match serde_json::from_slice::<serde_json::Value>(&bytes) {
416            Ok(v) => v,
417            Err(e) => {
418                tracing::warn!(
419                    snapshot_id = %snapshot.id,
420                    error = %e,
421                    "snapshot blob is not valid JSON; falling back to inline summary",
422                );
423                inline
424            }
425        },
426        Err(e) => {
427            tracing::warn!(
428                snapshot_id = %snapshot.id,
429                error = %e,
430                "snapshot blob read failed; falling back to inline summary",
431            );
432            inline
433        }
434    }
435}
436
437/// Diff one resource family between two manifests by `id`. Resources
438/// in `from` but not `to` are "removed"; resources in `to` but not
439/// `from` are "added"; same id with different content is "changed".
440fn diff_resource(from: &serde_json::Value, to: &serde_json::Value, key: &str) -> ResourceDiff {
441    let from_list = from.get(key).and_then(|v| v.as_array()).cloned().unwrap_or_default();
442    let to_list = to.get(key).and_then(|v| v.as_array()).cloned().unwrap_or_default();
443
444    let from_by_id: std::collections::HashMap<String, serde_json::Value> = from_list
445        .iter()
446        .filter_map(|v| v.get("id").and_then(|i| i.as_str()).map(|s| (s.to_string(), v.clone())))
447        .collect();
448    let to_by_id: std::collections::HashMap<String, serde_json::Value> = to_list
449        .iter()
450        .filter_map(|v| v.get("id").and_then(|i| i.as_str()).map(|s| (s.to_string(), v.clone())))
451        .collect();
452
453    let mut added = Vec::new();
454    let mut removed = Vec::new();
455    let mut changed = Vec::new();
456
457    for (id, v) in &from_by_id {
458        match to_by_id.get(id) {
459            None => removed.push(v.clone()),
460            Some(other) if other != v => changed.push(DiffPair {
461                from: v.clone(),
462                to: other.clone(),
463            }),
464            Some(_) => {} // identical
465        }
466    }
467    for (id, v) in &to_by_id {
468        if !from_by_id.contains_key(id) {
469            added.push(v.clone());
470        }
471    }
472
473    ResourceDiff {
474        added,
475        removed,
476        changed,
477    }
478}
479
480/// `POST /api/v1/snapshots/{id}/restore`
481///
482/// Best-effort restore from a snapshot manifest. Writes mock_environments
483/// and chaos_campaigns from the manifest into the snapshot's workspace.
484/// Existing rows with the same name are skipped (idempotent on repeat
485/// runs); rows in the workspace but not in the manifest are left in
486/// place — restore is additive, not destructive.
487///
488/// Services + fixtures + flows are NOT restored automatically; their
489/// FK chains and version history make a safe automated restore
490/// non-trivial. The diff endpoint already shows what's missing so an
491/// operator can copy those out manually if needed.
492pub async fn restore_snapshot(
493    State(state): State<AppState>,
494    AuthUser(user_id): AuthUser,
495    Path(id): Path<Uuid>,
496    headers: HeaderMap,
497) -> ApiResult<Json<serde_json::Value>> {
498    use mockforge_registry_core::models::chaos::CreateChaosCampaign;
499    use mockforge_registry_core::models::mock_environment::{MockEnvironment, MockEnvironmentName};
500    use mockforge_registry_core::models::ChaosCampaign;
501
502    let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
503    let manifest = resolve_manifest(&state, &snapshot).await;
504    if manifest.as_object().map(|o| o.is_empty()).unwrap_or(true) {
505        return Err(ApiError::InvalidRequest("Snapshot has no manifest to restore".into()));
506    }
507
508    let pool = state.db.pool();
509    let workspace_id = snapshot.workspace_id;
510    let mut envs_created = 0u32;
511    let mut envs_skipped = 0u32;
512    let mut chaos_created = 0u32;
513    let mut chaos_skipped = 0u32;
514    let mut errors: Vec<serde_json::Value> = Vec::new();
515
516    // Mock environments — keyed on name, restoring is just creating
517    // when the name is free. We skip when one already exists.
518    if let Some(envs) = manifest.get("environments").and_then(|v| v.as_array()) {
519        for env in envs {
520            let name_str = env.get("name").and_then(|v| v.as_str()).unwrap_or("");
521            let parsed = match MockEnvironmentName::from_str(name_str) {
522                Some(n) => n,
523                None => {
524                    errors.push(serde_json::json!({
525                        "kind": "environment",
526                        "name": name_str,
527                        "error": "invalid name (must be dev|test|prod)",
528                    }));
529                    continue;
530                }
531            };
532            match MockEnvironment::find_by_workspace_and_name(pool, workspace_id, parsed).await {
533                Ok(Some(_)) => {
534                    envs_skipped += 1;
535                    continue;
536                }
537                Ok(None) => {}
538                Err(e) => {
539                    errors.push(serde_json::json!({
540                        "kind": "environment",
541                        "name": name_str,
542                        "error": format!("lookup failed: {e}"),
543                    }));
544                    continue;
545                }
546            }
547            let reality = env.get("reality_config").cloned();
548            let chaos = env.get("chaos_config").cloned();
549            let drift = env.get("drift_budget_config").cloned();
550            match MockEnvironment::create(pool, workspace_id, parsed, reality, chaos, drift).await {
551                Ok(_) => envs_created += 1,
552                Err(e) => errors.push(serde_json::json!({
553                    "kind": "environment",
554                    "name": name_str,
555                    "error": format!("create failed: {e}"),
556                })),
557            }
558        }
559    }
560
561    // Chaos campaigns — keyed on name within workspace. Same merge rule.
562    if let Some(camps) = manifest.get("chaos_campaigns").and_then(|v| v.as_array()) {
563        let existing = ChaosCampaign::list_by_workspace(pool, workspace_id)
564            .await
565            .map_err(ApiError::Database)?;
566        let existing_names: std::collections::HashSet<String> =
567            existing.into_iter().map(|c| c.name).collect();
568
569        for c in camps {
570            let name = c.get("name").and_then(|v| v.as_str()).unwrap_or("");
571            if name.is_empty() {
572                continue;
573            }
574            if existing_names.contains(name) {
575                chaos_skipped += 1;
576                continue;
577            }
578            let target_kind = c.get("target_kind").and_then(|v| v.as_str()).unwrap_or("external");
579            let target_ref = c.get("target_ref").and_then(|v| v.as_str()).unwrap_or("");
580            let cfg = c.get("config").cloned().unwrap_or_else(|| serde_json::json!({}));
581            let safety = c.get("safety_config").cloned().unwrap_or_else(|| serde_json::json!({}));
582            let description = c.get("description").and_then(|v| v.as_str());
583            match ChaosCampaign::create(
584                pool,
585                CreateChaosCampaign {
586                    workspace_id,
587                    name,
588                    description,
589                    target_kind,
590                    target_ref,
591                    config: &cfg,
592                    safety_config: &safety,
593                    created_by: Some(user_id),
594                },
595            )
596            .await
597            {
598                Ok(_) => chaos_created += 1,
599                Err(e) => errors.push(serde_json::json!({
600                    "kind": "chaos_campaign",
601                    "name": name,
602                    "error": format!("create failed: {e}"),
603                })),
604            }
605        }
606    }
607
608    Ok(Json(serde_json::json!({
609        "snapshot_id": snapshot.id,
610        "workspace_id": workspace_id,
611        "environments": { "created": envs_created, "skipped_existing": envs_skipped },
612        "chaos_campaigns": { "created": chaos_created, "skipped_existing": chaos_skipped },
613        "errors": errors,
614        "note": "services, fixtures, and flows are not auto-restored; \
615                 review the diff endpoint and recreate them manually.",
616    })))
617}
618
619/// `DELETE /api/v1/snapshots/{id}`
620///
621/// Removes the row. Re-syncs the `usage_counters.snapshot_bytes_stored`
622/// gauge so the dashboard meter reflects reality immediately. Blob
623/// reclaim from object storage happens asynchronously in a follow-up
624/// slice (the worker reads orphaned storage_url values).
625pub async fn delete_snapshot(
626    State(state): State<AppState>,
627    AuthUser(user_id): AuthUser,
628    Path(id): Path<Uuid>,
629    headers: HeaderMap,
630) -> ApiResult<Json<serde_json::Value>> {
631    let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
632    let workspace_id = snapshot.workspace_id;
633
634    let deleted = Snapshot::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
635    if !deleted {
636        return Err(ApiError::InvalidRequest("Snapshot not found".into()));
637    }
638
639    // Re-sync the storage gauge for the org.
640    let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
641        .await?
642        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
643    let bytes = Snapshot::sum_ready_bytes_by_workspace(state.db.pool(), workspace_id)
644        .await
645        .map_err(ApiError::Database)?;
646    UsageCounter::set_snapshot_bytes(state.db.pool(), workspace.org_id, bytes)
647        .await
648        .map_err(ApiError::Database)?;
649
650    Ok(Json(serde_json::json!({ "deleted": true })))
651}
652
653/// Verify caller belongs to the workspace's org.
654async fn authorize_workspace(
655    state: &AppState,
656    user_id: Uuid,
657    headers: &HeaderMap,
658    workspace_id: Uuid,
659) -> ApiResult<crate::middleware::org_context::OrgContext> {
660    let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
661        .await?
662        .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
663    let ctx = resolve_org_context(state, user_id, headers, None)
664        .await
665        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
666    if ctx.org_id != workspace.org_id {
667        return Err(ApiError::InvalidRequest("Workspace not found".into()));
668    }
669    Ok(ctx)
670}
671
672/// Fetch a snapshot and verify caller belongs to its workspace's org.
673async fn load_authorized_snapshot(
674    state: &AppState,
675    user_id: Uuid,
676    headers: &HeaderMap,
677    id: Uuid,
678) -> ApiResult<Snapshot> {
679    let snapshot = Snapshot::find_by_id(state.db.pool(), id)
680        .await
681        .map_err(ApiError::Database)?
682        .ok_or_else(|| ApiError::InvalidRequest("Snapshot not found".into()))?;
683    let workspace = CloudWorkspace::find_by_id(state.db.pool(), snapshot.workspace_id)
684        .await?
685        .ok_or_else(|| ApiError::InvalidRequest("Snapshot not found".into()))?;
686    let ctx = resolve_org_context(state, user_id, headers, None)
687        .await
688        .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
689    if ctx.org_id != workspace.org_id {
690        return Err(ApiError::InvalidRequest("Snapshot not found".into()));
691    }
692    Ok(snapshot)
693}