1use axum::{
15 extract::{Path, Query, State},
16 http::HeaderMap,
17 Json,
18};
19use chrono::{Duration, Utc};
20use mockforge_registry_core::models::snapshot::CreateSnapshot;
21use serde::Deserialize;
22use uuid::Uuid;
23
24use crate::{
25 error::{ApiError, ApiResult},
26 handlers::usage::effective_limits,
27 middleware::{resolve_org_context, AuthUser},
28 models::{CloudWorkspace, Snapshot, UsageCounter},
29 AppState,
30};
31
32const DEFAULT_LIMIT: i64 = 100;
33const MAX_LIMIT: i64 = 500;
34
35#[derive(Debug, Deserialize)]
36pub struct ListSnapshotsQuery {
37 #[serde(default)]
38 pub limit: Option<i64>,
39}
40
41pub async fn list_snapshots(
43 State(state): State<AppState>,
44 AuthUser(user_id): AuthUser,
45 Path(workspace_id): Path<Uuid>,
46 Query(query): Query<ListSnapshotsQuery>,
47 headers: HeaderMap,
48) -> ApiResult<Json<Vec<Snapshot>>> {
49 authorize_workspace(&state, user_id, &headers, workspace_id).await?;
50 let limit = query.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT);
51 let snapshots = Snapshot::list_by_workspace(state.db.pool(), workspace_id, limit)
52 .await
53 .map_err(ApiError::Database)?;
54 Ok(Json(snapshots))
55}
56
57#[derive(Debug, Deserialize)]
58pub struct CaptureSnapshotRequest {
59 #[serde(default)]
60 pub name: Option<String>,
61 #[serde(default)]
62 pub description: Option<String>,
63 #[serde(default)]
64 pub hosted_deployment_id: Option<Uuid>,
65 #[serde(default)]
68 pub triggered_by: Option<String>,
69}
70
71pub async fn capture_snapshot(
77 State(state): State<AppState>,
78 AuthUser(user_id): AuthUser,
79 Path(workspace_id): Path<Uuid>,
80 headers: HeaderMap,
81 Json(request): Json<CaptureSnapshotRequest>,
82) -> ApiResult<Json<Snapshot>> {
83 let ctx = authorize_workspace(&state, user_id, &headers, workspace_id).await?;
84
85 let limits = effective_limits(&state, &ctx.org).await?;
87 let max_snapshots = limits.get("max_snapshots").and_then(|v| v.as_i64()).unwrap_or(0);
88 if max_snapshots == 0 {
89 return Err(ApiError::ResourceLimitExceeded(
90 "Time Travel snapshots are not enabled on this plan".into(),
91 ));
92 }
93 if max_snapshots > 0 {
94 let used = Snapshot::count_by_workspace(state.db.pool(), workspace_id)
95 .await
96 .map_err(ApiError::Database)?;
97 if used >= max_snapshots {
98 return Err(ApiError::ResourceLimitExceeded(format!(
99 "Snapshot limit reached ({used}/{max_snapshots}). Delete an old \
100 snapshot or upgrade your plan."
101 )));
102 }
103 }
104
105 let triggered_by = request.triggered_by.as_deref().unwrap_or("manual");
109 if triggered_by != "manual" {
110 return Err(ApiError::InvalidRequest(
111 "triggered_by must be 'manual' for user-initiated captures".into(),
112 ));
113 }
114
115 let retention_days =
117 limits.get("snapshot_retention_days").and_then(|v| v.as_i64()).unwrap_or(7);
118 let expires_at = if retention_days > 0 {
119 Some(Utc::now() + Duration::days(retention_days))
120 } else {
121 None
122 };
123
124 let snapshot = Snapshot::create(
125 state.db.pool(),
126 CreateSnapshot {
127 workspace_id,
128 hosted_deployment_id: request.hosted_deployment_id,
129 name: request.name.as_deref(),
130 description: request.description.as_deref(),
131 triggered_by,
132 triggered_by_user: Some(user_id),
133 expires_at,
134 },
135 )
136 .await
137 .map_err(ApiError::Database)?;
138
139 const INLINE_THRESHOLD: i64 = 256 * 1024; let (manifest, size_bytes) = match build_workspace_manifest(state.db.pool(), workspace_id).await
151 {
152 Ok((m, s)) => (m, s),
153 Err(e) => {
154 tracing::error!(snapshot_id = %snapshot.id, error = %e, "manifest build failed");
155 let _ = Snapshot::mark_failed(state.db.pool(), snapshot.id).await;
157 return Err(ApiError::Database(e));
158 }
159 };
160
161 let (storage_url, stored_manifest) = if size_bytes > INLINE_THRESHOLD {
162 let bytes = serde_json::to_vec(&manifest).unwrap_or_default();
165 match state.storage.upload_snapshot_blob(workspace_id, snapshot.id, bytes).await {
166 Ok(url) => {
167 let summary = manifest
168 .get("counts")
169 .cloned()
170 .map(|c| serde_json::json!({ "counts": c, "external": true }))
171 .unwrap_or_else(|| serde_json::json!({ "external": true }));
172 (url, summary)
173 }
174 Err(e) => {
175 tracing::warn!(
176 snapshot_id = %snapshot.id,
177 error = %e,
178 "snapshot blob upload failed; falling back to inline manifest",
179 );
180 (format!("inline-manifest://snapshot/{}", snapshot.id), manifest)
181 }
182 }
183 } else {
184 (format!("inline-manifest://snapshot/{}", snapshot.id), manifest)
185 };
186 match Snapshot::mark_ready(
187 state.db.pool(),
188 snapshot.id,
189 &storage_url,
190 size_bytes,
191 &stored_manifest,
192 )
193 .await
194 {
195 Ok(Some(ready)) => {
196 let _ = ctx; Ok(Json(ready))
204 }
205 Ok(None) => Ok(Json(snapshot)), Err(e) => {
207 let _ = Snapshot::mark_failed(state.db.pool(), snapshot.id).await;
208 Err(ApiError::Database(e))
209 }
210 }
211}
212
213async fn build_workspace_manifest(
218 pool: &sqlx::PgPool,
219 workspace_id: Uuid,
220) -> sqlx::Result<(serde_json::Value, i64)> {
221 use mockforge_registry_core::models::{
222 flow::Flow, mock_environment::MockEnvironment, ChaosCampaign,
223 };
224
225 let mut partial = false;
230 let environments = match MockEnvironment::list_by_workspace(pool, workspace_id).await {
231 Ok(rows) => rows,
232 Err(e) => {
233 tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: mock_environments fetch failed");
234 partial = true;
235 Vec::new()
236 }
237 };
238 let flows = match Flow::list_by_workspace(pool, workspace_id, None).await {
239 Ok(rows) => rows,
240 Err(e) => {
241 tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: flows fetch failed");
242 partial = true;
243 Vec::new()
244 }
245 };
246 let chaos = match ChaosCampaign::list_by_workspace(pool, workspace_id).await {
247 Ok(rows) => rows,
248 Err(e) => {
249 tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: chaos fetch failed");
250 partial = true;
251 Vec::new()
252 }
253 };
254
255 let services = sqlx::query_as::<_, (Uuid, serde_json::Value)>(
259 "SELECT id, to_jsonb(s) AS doc FROM services s WHERE workspace_id = $1",
260 )
261 .bind(workspace_id)
262 .fetch_all(pool)
263 .await
264 .unwrap_or_else(|e| {
265 tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: services fetch failed");
266 partial = true;
267 Vec::new()
268 });
269 let fixtures = sqlx::query_as::<_, (Uuid, serde_json::Value)>(
270 "SELECT id, to_jsonb(f) AS doc FROM fixtures f WHERE workspace_id = $1",
271 )
272 .bind(workspace_id)
273 .fetch_all(pool)
274 .await
275 .unwrap_or_else(|e| {
276 tracing::warn!(workspace_id = %workspace_id, error = %e, "snapshot: fixtures fetch failed");
277 partial = true;
278 Vec::new()
279 });
280
281 let manifest = serde_json::json!({
282 "schema_version": 1,
283 "captured_at": Utc::now(),
284 "workspace_id": workspace_id,
285 "partial": partial,
286 "counts": {
287 "services": services.len(),
288 "fixtures": fixtures.len(),
289 "environments": environments.len(),
290 "flows": flows.len(),
291 "chaos_campaigns": chaos.len(),
292 },
293 "services": services.into_iter().map(|(_, doc)| doc).collect::<Vec<_>>(),
294 "fixtures": fixtures.into_iter().map(|(_, doc)| doc).collect::<Vec<_>>(),
295 "environments": environments,
296 "flows": flows,
297 "chaos_campaigns": chaos,
298 });
299
300 let bytes = manifest.to_string().len() as i64;
301 Ok((manifest, bytes))
302}
303
304pub async fn get_snapshot(
306 State(state): State<AppState>,
307 AuthUser(user_id): AuthUser,
308 Path(id): Path<Uuid>,
309 headers: HeaderMap,
310) -> ApiResult<Json<Snapshot>> {
311 let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
312 Ok(Json(snapshot))
313}
314
315#[derive(Debug, Deserialize)]
323pub struct DiffQuery {
324 #[serde(default)]
325 pub against: Option<String>,
326}
327
328#[derive(Debug, serde::Serialize)]
329pub struct ResourceDiff {
330 pub added: Vec<serde_json::Value>,
331 pub removed: Vec<serde_json::Value>,
332 pub changed: Vec<DiffPair>,
333}
334
335#[derive(Debug, serde::Serialize)]
336pub struct DiffPair {
337 pub from: serde_json::Value,
338 pub to: serde_json::Value,
339}
340
341#[derive(Debug, serde::Serialize)]
342pub struct SnapshotDiff {
343 pub snapshot_id: Uuid,
344 pub against_kind: String,
345 pub against_snapshot_id: Option<Uuid>,
346 pub services: ResourceDiff,
347 pub fixtures: ResourceDiff,
348 pub flows: ResourceDiff,
349 pub environments: ResourceDiff,
350 pub chaos_campaigns: ResourceDiff,
351}
352
353pub async fn diff_snapshot(
354 State(state): State<AppState>,
355 AuthUser(user_id): AuthUser,
356 Path(id): Path<Uuid>,
357 Query(query): Query<DiffQuery>,
358 headers: HeaderMap,
359) -> ApiResult<Json<SnapshotDiff>> {
360 let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
361 let snapshot_manifest = resolve_manifest(&state, &snapshot).await;
362
363 let against_str = query.against.as_deref().unwrap_or("current");
364 let (against_kind, against_id, against_manifest) = if against_str == "current" {
365 let (m, _) = build_workspace_manifest(state.db.pool(), snapshot.workspace_id)
366 .await
367 .map_err(ApiError::Database)?;
368 ("current".to_string(), None, m)
369 } else {
370 let other_id = Uuid::parse_str(against_str).map_err(|_| {
371 ApiError::InvalidRequest("'against' must be 'current' or a snapshot UUID".into())
372 })?;
373 let other = load_authorized_snapshot(&state, user_id, &headers, other_id).await?;
374 if other.workspace_id != snapshot.workspace_id {
375 return Err(ApiError::InvalidRequest(
376 "Cannot diff snapshots across different workspaces".into(),
377 ));
378 }
379 let m = resolve_manifest(&state, &other).await;
380 ("snapshot".to_string(), Some(other_id), m)
381 };
382
383 Ok(Json(SnapshotDiff {
384 snapshot_id: snapshot.id,
385 against_kind,
386 against_snapshot_id: against_id,
387 services: diff_resource(&snapshot_manifest, &against_manifest, "services"),
388 fixtures: diff_resource(&snapshot_manifest, &against_manifest, "fixtures"),
389 flows: diff_resource(&snapshot_manifest, &against_manifest, "flows"),
390 environments: diff_resource(&snapshot_manifest, &against_manifest, "environments"),
391 chaos_campaigns: diff_resource(&snapshot_manifest, &against_manifest, "chaos_campaigns"),
392 }))
393}
394
395async fn resolve_manifest(state: &AppState, snapshot: &Snapshot) -> serde_json::Value {
401 let inline = snapshot.manifest.clone().unwrap_or_else(|| serde_json::json!({}));
402
403 let is_external = inline.get("external").and_then(|v| v.as_bool()).unwrap_or(false);
407 if !is_external {
408 return inline;
409 }
410
411 match state.storage.read_snapshot_blob(snapshot.workspace_id, snapshot.id).await {
415 Ok(bytes) => match serde_json::from_slice::<serde_json::Value>(&bytes) {
416 Ok(v) => v,
417 Err(e) => {
418 tracing::warn!(
419 snapshot_id = %snapshot.id,
420 error = %e,
421 "snapshot blob is not valid JSON; falling back to inline summary",
422 );
423 inline
424 }
425 },
426 Err(e) => {
427 tracing::warn!(
428 snapshot_id = %snapshot.id,
429 error = %e,
430 "snapshot blob read failed; falling back to inline summary",
431 );
432 inline
433 }
434 }
435}
436
437fn diff_resource(from: &serde_json::Value, to: &serde_json::Value, key: &str) -> ResourceDiff {
441 let from_list = from.get(key).and_then(|v| v.as_array()).cloned().unwrap_or_default();
442 let to_list = to.get(key).and_then(|v| v.as_array()).cloned().unwrap_or_default();
443
444 let from_by_id: std::collections::HashMap<String, serde_json::Value> = from_list
445 .iter()
446 .filter_map(|v| v.get("id").and_then(|i| i.as_str()).map(|s| (s.to_string(), v.clone())))
447 .collect();
448 let to_by_id: std::collections::HashMap<String, serde_json::Value> = to_list
449 .iter()
450 .filter_map(|v| v.get("id").and_then(|i| i.as_str()).map(|s| (s.to_string(), v.clone())))
451 .collect();
452
453 let mut added = Vec::new();
454 let mut removed = Vec::new();
455 let mut changed = Vec::new();
456
457 for (id, v) in &from_by_id {
458 match to_by_id.get(id) {
459 None => removed.push(v.clone()),
460 Some(other) if other != v => changed.push(DiffPair {
461 from: v.clone(),
462 to: other.clone(),
463 }),
464 Some(_) => {} }
466 }
467 for (id, v) in &to_by_id {
468 if !from_by_id.contains_key(id) {
469 added.push(v.clone());
470 }
471 }
472
473 ResourceDiff {
474 added,
475 removed,
476 changed,
477 }
478}
479
480pub async fn restore_snapshot(
493 State(state): State<AppState>,
494 AuthUser(user_id): AuthUser,
495 Path(id): Path<Uuid>,
496 headers: HeaderMap,
497) -> ApiResult<Json<serde_json::Value>> {
498 use mockforge_registry_core::models::chaos::CreateChaosCampaign;
499 use mockforge_registry_core::models::mock_environment::{MockEnvironment, MockEnvironmentName};
500 use mockforge_registry_core::models::ChaosCampaign;
501
502 let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
503 let manifest = resolve_manifest(&state, &snapshot).await;
504 if manifest.as_object().map(|o| o.is_empty()).unwrap_or(true) {
505 return Err(ApiError::InvalidRequest("Snapshot has no manifest to restore".into()));
506 }
507
508 let pool = state.db.pool();
509 let workspace_id = snapshot.workspace_id;
510 let mut envs_created = 0u32;
511 let mut envs_skipped = 0u32;
512 let mut chaos_created = 0u32;
513 let mut chaos_skipped = 0u32;
514 let mut errors: Vec<serde_json::Value> = Vec::new();
515
516 if let Some(envs) = manifest.get("environments").and_then(|v| v.as_array()) {
519 for env in envs {
520 let name_str = env.get("name").and_then(|v| v.as_str()).unwrap_or("");
521 let parsed = match MockEnvironmentName::from_str(name_str) {
522 Some(n) => n,
523 None => {
524 errors.push(serde_json::json!({
525 "kind": "environment",
526 "name": name_str,
527 "error": "invalid name (must be dev|test|prod)",
528 }));
529 continue;
530 }
531 };
532 match MockEnvironment::find_by_workspace_and_name(pool, workspace_id, parsed).await {
533 Ok(Some(_)) => {
534 envs_skipped += 1;
535 continue;
536 }
537 Ok(None) => {}
538 Err(e) => {
539 errors.push(serde_json::json!({
540 "kind": "environment",
541 "name": name_str,
542 "error": format!("lookup failed: {e}"),
543 }));
544 continue;
545 }
546 }
547 let reality = env.get("reality_config").cloned();
548 let chaos = env.get("chaos_config").cloned();
549 let drift = env.get("drift_budget_config").cloned();
550 match MockEnvironment::create(pool, workspace_id, parsed, reality, chaos, drift).await {
551 Ok(_) => envs_created += 1,
552 Err(e) => errors.push(serde_json::json!({
553 "kind": "environment",
554 "name": name_str,
555 "error": format!("create failed: {e}"),
556 })),
557 }
558 }
559 }
560
561 if let Some(camps) = manifest.get("chaos_campaigns").and_then(|v| v.as_array()) {
563 let existing = ChaosCampaign::list_by_workspace(pool, workspace_id)
564 .await
565 .map_err(ApiError::Database)?;
566 let existing_names: std::collections::HashSet<String> =
567 existing.into_iter().map(|c| c.name).collect();
568
569 for c in camps {
570 let name = c.get("name").and_then(|v| v.as_str()).unwrap_or("");
571 if name.is_empty() {
572 continue;
573 }
574 if existing_names.contains(name) {
575 chaos_skipped += 1;
576 continue;
577 }
578 let target_kind = c.get("target_kind").and_then(|v| v.as_str()).unwrap_or("external");
579 let target_ref = c.get("target_ref").and_then(|v| v.as_str()).unwrap_or("");
580 let cfg = c.get("config").cloned().unwrap_or_else(|| serde_json::json!({}));
581 let safety = c.get("safety_config").cloned().unwrap_or_else(|| serde_json::json!({}));
582 let description = c.get("description").and_then(|v| v.as_str());
583 match ChaosCampaign::create(
584 pool,
585 CreateChaosCampaign {
586 workspace_id,
587 name,
588 description,
589 target_kind,
590 target_ref,
591 config: &cfg,
592 safety_config: &safety,
593 created_by: Some(user_id),
594 },
595 )
596 .await
597 {
598 Ok(_) => chaos_created += 1,
599 Err(e) => errors.push(serde_json::json!({
600 "kind": "chaos_campaign",
601 "name": name,
602 "error": format!("create failed: {e}"),
603 })),
604 }
605 }
606 }
607
608 Ok(Json(serde_json::json!({
609 "snapshot_id": snapshot.id,
610 "workspace_id": workspace_id,
611 "environments": { "created": envs_created, "skipped_existing": envs_skipped },
612 "chaos_campaigns": { "created": chaos_created, "skipped_existing": chaos_skipped },
613 "errors": errors,
614 "note": "services, fixtures, and flows are not auto-restored; \
615 review the diff endpoint and recreate them manually.",
616 })))
617}
618
619pub async fn delete_snapshot(
626 State(state): State<AppState>,
627 AuthUser(user_id): AuthUser,
628 Path(id): Path<Uuid>,
629 headers: HeaderMap,
630) -> ApiResult<Json<serde_json::Value>> {
631 let snapshot = load_authorized_snapshot(&state, user_id, &headers, id).await?;
632 let workspace_id = snapshot.workspace_id;
633
634 let deleted = Snapshot::delete(state.db.pool(), id).await.map_err(ApiError::Database)?;
635 if !deleted {
636 return Err(ApiError::InvalidRequest("Snapshot not found".into()));
637 }
638
639 let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
641 .await?
642 .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
643 let bytes = Snapshot::sum_ready_bytes_by_workspace(state.db.pool(), workspace_id)
644 .await
645 .map_err(ApiError::Database)?;
646 UsageCounter::set_snapshot_bytes(state.db.pool(), workspace.org_id, bytes)
647 .await
648 .map_err(ApiError::Database)?;
649
650 Ok(Json(serde_json::json!({ "deleted": true })))
651}
652
653async fn authorize_workspace(
655 state: &AppState,
656 user_id: Uuid,
657 headers: &HeaderMap,
658 workspace_id: Uuid,
659) -> ApiResult<crate::middleware::org_context::OrgContext> {
660 let workspace = CloudWorkspace::find_by_id(state.db.pool(), workspace_id)
661 .await?
662 .ok_or_else(|| ApiError::InvalidRequest("Workspace not found".into()))?;
663 let ctx = resolve_org_context(state, user_id, headers, None)
664 .await
665 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
666 if ctx.org_id != workspace.org_id {
667 return Err(ApiError::InvalidRequest("Workspace not found".into()));
668 }
669 Ok(ctx)
670}
671
672async fn load_authorized_snapshot(
674 state: &AppState,
675 user_id: Uuid,
676 headers: &HeaderMap,
677 id: Uuid,
678) -> ApiResult<Snapshot> {
679 let snapshot = Snapshot::find_by_id(state.db.pool(), id)
680 .await
681 .map_err(ApiError::Database)?
682 .ok_or_else(|| ApiError::InvalidRequest("Snapshot not found".into()))?;
683 let workspace = CloudWorkspace::find_by_id(state.db.pool(), snapshot.workspace_id)
684 .await?
685 .ok_or_else(|| ApiError::InvalidRequest("Snapshot not found".into()))?;
686 let ctx = resolve_org_context(state, user_id, headers, None)
687 .await
688 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
689 if ctx.org_id != workspace.org_id {
690 return Err(ApiError::InvalidRequest("Snapshot not found".into()));
691 }
692 Ok(snapshot)
693}