1use std::collections::HashSet;
37
38use axum::{
39 extract::{Path, State},
40 http::HeaderMap,
41 Json,
42};
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use uuid::Uuid;
46
47use mockforge_registry_core::models::{
48 feature_usage::{FeatureType, FeatureUsage, PluginInvokeAggregateRow},
49 hosted_mock_plugin::AttachHostedMockPlugin,
50 AuditEventType, HostedMockPlugin,
51};
52
53use crate::{
54 error::{ApiError, ApiResult},
55 middleware::{
56 permission_check::PermissionChecker, permissions::Permission, resolve_org_context, AuthUser,
57 },
58 AppState,
59};
60
61const PERMISSION_SECTIONS: &[&str] = &["egress", "env", "request", "response", "storage"];
65
66const MAX_PERMISSIONS_BYTES: usize = 32 * 1024;
70
71#[derive(Debug, Deserialize)]
74pub struct AttachRequest {
75 pub plugin_name: String,
78 pub version: String,
81 #[serde(default = "empty_object")]
84 pub config: serde_json::Value,
85 #[serde(default = "empty_object")]
87 pub permissions: serde_json::Value,
88 #[serde(default = "default_enabled")]
89 pub enabled: bool,
90}
91
92fn empty_object() -> serde_json::Value {
93 serde_json::json!({})
94}
95
96fn default_enabled() -> bool {
97 true
98}
99
100#[derive(Debug, Deserialize)]
101pub struct UpdateAttachmentRequest {
102 #[serde(default)]
106 pub permissions: Option<serde_json::Value>,
107 #[serde(default)]
110 pub enabled: Option<bool>,
111 #[serde(default)]
113 pub config: Option<serde_json::Value>,
114}
115
116#[derive(Debug, Serialize)]
117pub struct AttachmentResponse {
118 pub id: Uuid,
119 pub deployment_id: Uuid,
120 pub plugin_id: Uuid,
121 pub plugin_version_id: Uuid,
122 pub config: serde_json::Value,
123 pub permissions: serde_json::Value,
124 pub enabled: bool,
125 pub attached_at: DateTime<Utc>,
126 pub updated_at: DateTime<Utc>,
127}
128
129impl From<HostedMockPlugin> for AttachmentResponse {
130 fn from(row: HostedMockPlugin) -> Self {
131 Self {
132 id: row.id,
133 deployment_id: row.deployment_id,
134 plugin_id: row.plugin_id,
135 plugin_version_id: row.plugin_version_id,
136 config: row.config_json,
137 permissions: row.permissions_json,
138 enabled: row.enabled,
139 attached_at: row.attached_at,
140 updated_at: row.updated_at,
141 }
142 }
143}
144
145pub async fn list_attachments(
149 State(state): State<AppState>,
150 AuthUser(user_id): AuthUser,
151 Path(deployment_id): Path<Uuid>,
152 headers: HeaderMap,
153) -> ApiResult<Json<Vec<AttachmentResponse>>> {
154 authorize_deployment(&state, user_id, &headers, deployment_id, Permission::HostedMockUpdate)
155 .await?;
156
157 let rows = HostedMockPlugin::list_by_deployment(state.db.pool(), deployment_id)
158 .await
159 .map_err(ApiError::Database)?;
160
161 Ok(Json(rows.into_iter().map(AttachmentResponse::from).collect()))
162}
163
164pub async fn attach_plugin(
166 State(state): State<AppState>,
167 AuthUser(user_id): AuthUser,
168 Path(deployment_id): Path<Uuid>,
169 headers: HeaderMap,
170 Json(request): Json<AttachRequest>,
171) -> ApiResult<Json<AttachmentResponse>> {
172 let org_ctx = authorize_deployment(
173 &state,
174 user_id,
175 &headers,
176 deployment_id,
177 Permission::HostedMockUpdate,
178 )
179 .await?;
180
181 validate_permissions(&request.permissions)?;
184
185 let plugin = state
188 .store
189 .find_plugin_by_name(&request.plugin_name)
190 .await
191 .map_err(|e| ApiError::Database(sqlx::Error::Protocol(e.to_string())))?
192 .ok_or_else(|| {
193 ApiError::InvalidRequest(format!("Plugin '{}' not found", request.plugin_name))
194 })?;
195
196 let plugin_version = state
197 .store
198 .find_plugin_version(plugin.id, &request.version)
199 .await
200 .map_err(|e| ApiError::Database(sqlx::Error::Protocol(e.to_string())))?
201 .ok_or_else(|| {
202 ApiError::InvalidRequest(format!(
203 "Plugin '{}' has no version '{}'",
204 request.plugin_name, request.version
205 ))
206 })?;
207
208 if plugin_version.yanked {
209 return Err(ApiError::InvalidRequest(format!(
210 "Plugin '{}' version '{}' is yanked and cannot be attached",
211 request.plugin_name, request.version
212 )));
213 }
214
215 enforce_plan_limit(&state, &org_ctx, deployment_id, plugin.id).await?;
219
220 let row = HostedMockPlugin::attach(
221 state.db.pool(),
222 AttachHostedMockPlugin {
223 deployment_id,
224 plugin_id: plugin.id,
225 plugin_version_id: plugin_version.id,
226 config_json: &request.config,
227 permissions_json: &request.permissions,
228 enabled: request.enabled,
229 attached_by: Some(user_id),
230 },
231 )
232 .await
233 .map_err(ApiError::Database)?;
234
235 state
239 .store
240 .record_feature_usage(
241 org_ctx.org_id,
242 Some(user_id),
243 FeatureType::PluginAttach,
244 Some(serde_json::json!({
245 "deployment_id": deployment_id,
246 "plugin_id": plugin.id,
247 "plugin_name": plugin.name,
248 "version": request.version,
249 })),
250 )
251 .await;
252
253 let (ip_address, user_agent) = client_metadata(&headers);
254 state
255 .store
256 .record_audit_event(
257 org_ctx.org_id,
258 Some(user_id),
259 AuditEventType::PluginAttached,
260 format!(
261 "Plugin '{}@{}' attached to deployment {}",
262 plugin.name, request.version, deployment_id
263 ),
264 Some(serde_json::json!({
265 "deployment_id": deployment_id,
266 "plugin_id": plugin.id,
267 "plugin_name": plugin.name,
268 "version": request.version,
269 "permissions": request.permissions,
270 })),
271 ip_address.as_deref(),
272 user_agent.as_deref(),
273 )
274 .await;
275
276 Ok(Json(AttachmentResponse::from(row)))
277}
278
279pub async fn update_attachment(
281 State(state): State<AppState>,
282 AuthUser(user_id): AuthUser,
283 Path((deployment_id, attachment_id)): Path<(Uuid, Uuid)>,
284 headers: HeaderMap,
285 Json(request): Json<UpdateAttachmentRequest>,
286) -> ApiResult<Json<AttachmentResponse>> {
287 authorize_deployment(&state, user_id, &headers, deployment_id, Permission::HostedMockUpdate)
288 .await?;
289
290 let existing = load_authorized_attachment(&state, deployment_id, attachment_id).await?;
294
295 if let Some(ref new_perms) = request.permissions {
297 validate_permissions(new_perms)?;
298 }
299
300 let row = HostedMockPlugin::attach(
305 state.db.pool(),
306 AttachHostedMockPlugin {
307 deployment_id,
308 plugin_id: existing.plugin_id,
309 plugin_version_id: existing.plugin_version_id,
310 config_json: request.config.as_ref().unwrap_or(&existing.config_json),
311 permissions_json: request.permissions.as_ref().unwrap_or(&existing.permissions_json),
312 enabled: request.enabled.unwrap_or(existing.enabled),
313 attached_by: Some(user_id),
314 },
315 )
316 .await
317 .map_err(ApiError::Database)?;
318
319 Ok(Json(AttachmentResponse::from(row)))
320}
321
322pub async fn detach_plugin(
324 State(state): State<AppState>,
325 AuthUser(user_id): AuthUser,
326 Path((deployment_id, attachment_id)): Path<(Uuid, Uuid)>,
327 headers: HeaderMap,
328) -> ApiResult<Json<serde_json::Value>> {
329 let org_ctx = authorize_deployment(
330 &state,
331 user_id,
332 &headers,
333 deployment_id,
334 Permission::HostedMockUpdate,
335 )
336 .await?;
337
338 let existing = load_authorized_attachment(&state, deployment_id, attachment_id).await?;
339
340 let deleted = HostedMockPlugin::delete(state.db.pool(), attachment_id)
341 .await
342 .map_err(ApiError::Database)?;
343 if !deleted {
344 return Ok(Json(serde_json::json!({ "deleted": false })));
347 }
348
349 state
350 .store
351 .record_feature_usage(
352 org_ctx.org_id,
353 Some(user_id),
354 FeatureType::PluginDetach,
355 Some(serde_json::json!({
356 "deployment_id": deployment_id,
357 "plugin_id": existing.plugin_id,
358 })),
359 )
360 .await;
361
362 let (ip_address, user_agent) = client_metadata(&headers);
363 state
364 .store
365 .record_audit_event(
366 org_ctx.org_id,
367 Some(user_id),
368 AuditEventType::PluginDetached,
369 format!(
370 "Plugin attachment {} detached from deployment {}",
371 attachment_id, deployment_id
372 ),
373 Some(serde_json::json!({
374 "deployment_id": deployment_id,
375 "plugin_id": existing.plugin_id,
376 "attachment_id": attachment_id,
377 })),
378 ip_address.as_deref(),
379 user_agent.as_deref(),
380 )
381 .await;
382
383 Ok(Json(serde_json::json!({ "deleted": true })))
384}
385
386#[derive(Debug, Serialize)]
390#[serde(rename_all = "camelCase")]
391pub struct PluginUsageEntry {
392 #[serde(skip_serializing_if = "Option::is_none")]
396 pub attachment_id: Option<Uuid>,
397 #[serde(skip_serializing_if = "Option::is_none")]
401 pub plugin_name: Option<String>,
402 #[serde(skip_serializing_if = "Option::is_none")]
403 pub plugin_version: Option<String>,
404 pub invoke_ms: i64,
407 #[serde(skip_serializing_if = "Option::is_none")]
411 pub memory_peak_mb: Option<i64>,
412}
413
414#[derive(Debug, Serialize)]
415#[serde(rename_all = "camelCase")]
416pub struct DeploymentPluginUsageResponse {
417 pub period_start: DateTime<Utc>,
419 pub period_end: DateTime<Utc>,
421 pub by_plugin: Vec<PluginUsageEntry>,
423 pub deployment_total_invoke_ms: i64,
425 pub plan_limit_invoke_ms_per_month: i64,
429 pub plan_limit_memory_mb: i64,
432}
433
434pub async fn get_plugin_usage(
447 State(state): State<AppState>,
448 AuthUser(user_id): AuthUser,
449 Path(deployment_id): Path<Uuid>,
450 headers: HeaderMap,
451) -> ApiResult<Json<DeploymentPluginUsageResponse>> {
452 let org_ctx =
453 authorize_deployment(&state, user_id, &headers, deployment_id, Permission::HostedMockRead)
454 .await?;
455
456 let (period_start, period_end) = current_billing_period();
457
458 let rows = FeatureUsage::aggregate_plugin_invoke_ms_by_deployment(
459 state.db.pool(),
460 org_ctx.org_id,
461 deployment_id,
462 period_start,
463 )
464 .await
465 .map_err(ApiError::Database)?;
466
467 let by_plugin: Vec<PluginUsageEntry> = rows.into_iter().map(into_usage_entry).collect();
468 let deployment_total_invoke_ms: i64 = by_plugin.iter().map(|p| p.invoke_ms).sum();
469
470 let limits = &org_ctx.org.limits_json;
471 let plan_limit_invoke_ms_per_month = limits
472 .get("max_plugin_invoke_ms_per_month")
473 .and_then(|v| v.as_i64())
474 .unwrap_or(0);
475 let plan_limit_memory_mb =
476 limits.get("max_plugin_memory_mb").and_then(|v| v.as_i64()).unwrap_or(0);
477
478 Ok(Json(DeploymentPluginUsageResponse {
479 period_start,
480 period_end,
481 by_plugin,
482 deployment_total_invoke_ms,
483 plan_limit_invoke_ms_per_month,
484 plan_limit_memory_mb,
485 }))
486}
487
488fn into_usage_entry(row: PluginInvokeAggregateRow) -> PluginUsageEntry {
492 PluginUsageEntry {
493 attachment_id: row.attachment_id.as_deref().and_then(|s| Uuid::parse_str(s).ok()),
494 plugin_name: row.plugin_name,
495 plugin_version: row.plugin_version,
496 invoke_ms: row.invoke_ms,
497 memory_peak_mb: row.memory_peak_mb,
498 }
499}
500
501fn current_billing_period() -> (DateTime<Utc>, DateTime<Utc>) {
505 use chrono::{Datelike, NaiveDate, TimeZone};
506 let now = Utc::now();
507 let start_date = NaiveDate::from_ymd_opt(now.year(), now.month(), 1)
508 .expect("month/year are valid by construction");
509 let (next_year, next_month) = if now.month() == 12 {
510 (now.year() + 1, 1)
511 } else {
512 (now.year(), now.month() + 1)
513 };
514 let end_date = NaiveDate::from_ymd_opt(next_year, next_month, 1)
515 .expect("next month/year are valid by construction");
516 let start = Utc.from_utc_datetime(&start_date.and_hms_opt(0, 0, 0).expect("midnight is valid"));
517 let end = Utc.from_utc_datetime(&end_date.and_hms_opt(0, 0, 0).expect("midnight is valid"));
518 (start, end)
519}
520
521async fn authorize_deployment(
534 state: &AppState,
535 user_id: Uuid,
536 headers: &HeaderMap,
537 deployment_id: Uuid,
538 permission: Permission,
539) -> ApiResult<crate::middleware::org_context::OrgContext> {
540 let org_ctx = resolve_org_context(state, user_id, headers, None)
541 .await
542 .map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
543
544 let checker = PermissionChecker::new(state);
545 checker.require_permission(user_id, org_ctx.org_id, permission).await?;
546
547 let deployment = state
548 .store
549 .find_hosted_mock_by_id(deployment_id)
550 .await?
551 .ok_or_else(|| ApiError::InvalidRequest("Deployment not found".into()))?;
552 if deployment.org_id != org_ctx.org_id {
553 return Err(ApiError::InvalidRequest("Deployment not found".into()));
554 }
555
556 Ok(org_ctx)
557}
558
559async fn load_authorized_attachment(
563 state: &AppState,
564 deployment_id: Uuid,
565 attachment_id: Uuid,
566) -> ApiResult<HostedMockPlugin> {
567 let row = HostedMockPlugin::find_by_id(state.db.pool(), attachment_id)
568 .await
569 .map_err(ApiError::Database)?
570 .ok_or_else(|| ApiError::InvalidRequest("Plugin attachment not found".into()))?;
571 if row.deployment_id != deployment_id {
572 return Err(ApiError::InvalidRequest("Plugin attachment not found".into()));
573 }
574 Ok(row)
575}
576
577fn validate_permissions(value: &serde_json::Value) -> ApiResult<()> {
584 let obj = value
585 .as_object()
586 .ok_or_else(|| ApiError::InvalidRequest("permissions must be a JSON object".into()))?;
587
588 let allowed: HashSet<&str> = PERMISSION_SECTIONS.iter().copied().collect();
589 for key in obj.keys() {
590 if !allowed.contains(key.as_str()) {
591 return Err(ApiError::InvalidRequest(format!(
592 "permissions: unknown top-level key '{}' (allowed: {})",
593 key,
594 PERMISSION_SECTIONS.join(", "),
595 )));
596 }
597 }
598
599 let serialized = serde_json::to_vec(value)
600 .map_err(|e| ApiError::InvalidRequest(format!("permissions failed to serialize: {}", e)))?;
601 if serialized.len() > MAX_PERMISSIONS_BYTES {
602 return Err(ApiError::InvalidRequest(format!(
603 "permissions payload too large: {} bytes (max {} bytes)",
604 serialized.len(),
605 MAX_PERMISSIONS_BYTES,
606 )));
607 }
608
609 Ok(())
610}
611
612async fn enforce_plan_limit(
616 state: &AppState,
617 org_ctx: &crate::middleware::org_context::OrgContext,
618 deployment_id: Uuid,
619 plugin_id: Uuid,
620) -> ApiResult<()> {
621 let limits = &org_ctx.org.limits_json;
622 let max = limits.get("max_plugins_per_mock").and_then(|v| v.as_i64()).unwrap_or(0);
625 if max < 0 {
626 return Ok(());
627 }
628
629 let already_attached = HostedMockPlugin::list_by_deployment(state.db.pool(), deployment_id)
631 .await
632 .map_err(ApiError::Database)?
633 .iter()
634 .any(|p| p.plugin_id == plugin_id && p.enabled);
635 if already_attached {
636 return Ok(());
637 }
638
639 let active = HostedMockPlugin::count_active_by_deployment(state.db.pool(), deployment_id)
640 .await
641 .map_err(ApiError::Database)?;
642 if active >= max {
643 return Err(ApiError::InvalidRequest(format!(
644 "Plugin attachment limit reached: your plan allows {} active plugins per hosted mock. Upgrade to attach more.",
645 max
646 )));
647 }
648 Ok(())
649}
650
651fn client_metadata(headers: &HeaderMap) -> (Option<String>, Option<String>) {
652 let ip = headers
653 .get("X-Forwarded-For")
654 .or_else(|| headers.get("X-Real-IP"))
655 .and_then(|h| h.to_str().ok())
656 .map(|s| s.split(',').next().unwrap_or(s).trim().to_string());
657 let ua = headers.get("User-Agent").and_then(|h| h.to_str().ok()).map(|s| s.to_string());
658 (ip, ua)
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn validate_permissions_accepts_empty_object() {
667 let v = serde_json::json!({});
668 assert!(validate_permissions(&v).is_ok());
669 }
670
671 #[test]
672 fn validate_permissions_accepts_known_sections() {
673 let v = serde_json::json!({
674 "egress": { "allow": ["*.stripe.com"] },
675 "env": { "read": ["MY_FLAG"] },
676 "request": { "read_body": true },
677 "response": { "modify_body": true },
678 "storage": { "kv_namespace": null },
679 });
680 assert!(validate_permissions(&v).is_ok());
681 }
682
683 #[test]
684 fn validate_permissions_rejects_unknown_key() {
685 let v = serde_json::json!({ "filesystem": { "read": "/etc" } });
686 let err = validate_permissions(&v).unwrap_err();
687 match err {
688 ApiError::InvalidRequest(msg) => {
689 assert!(msg.contains("unknown top-level key 'filesystem'"));
690 }
691 other => panic!("expected InvalidRequest, got {:?}", other),
692 }
693 }
694
695 #[test]
696 fn validate_permissions_rejects_non_object() {
697 let v = serde_json::json!(["not", "an", "object"]);
698 let err = validate_permissions(&v).unwrap_err();
699 assert!(matches!(err, ApiError::InvalidRequest(_)));
700 }
701
702 #[test]
703 fn validate_permissions_rejects_oversized_payload() {
704 let large = "x".repeat(MAX_PERMISSIONS_BYTES + 100);
707 let v = serde_json::json!({ "egress": { "allow": [large] } });
708 let err = validate_permissions(&v).unwrap_err();
709 match err {
710 ApiError::InvalidRequest(msg) => assert!(msg.contains("too large")),
711 other => panic!("expected InvalidRequest, got {:?}", other),
712 }
713 }
714
715 #[test]
716 fn current_billing_period_starts_at_month_boundary() {
717 let (start, end) = current_billing_period();
718 assert_eq!(start.time(), chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
720 assert_eq!(end.time(), chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
721 assert_eq!(chrono::Datelike::day(&start), 1);
724 assert_eq!(chrono::Datelike::day(&end), 1);
725 assert!(end > start);
727 let span = end - start;
729 assert!(
730 span.num_days() >= 28 && span.num_days() <= 31,
731 "span = {} days",
732 span.num_days()
733 );
734 }
735
736 #[test]
737 fn into_usage_entry_drops_malformed_attachment_id() {
738 let row = PluginInvokeAggregateRow {
739 attachment_id: Some("not-a-uuid".to_string()),
740 plugin_name: Some("foo".to_string()),
741 plugin_version: Some("1.0.0".to_string()),
742 invoke_ms: 100,
743 memory_peak_mb: Some(42),
744 };
745 let entry = into_usage_entry(row);
746 assert_eq!(entry.attachment_id, None);
747 assert_eq!(entry.plugin_name, Some("foo".to_string()));
748 assert_eq!(entry.invoke_ms, 100);
749 assert_eq!(entry.memory_peak_mb, Some(42));
750 }
751
752 #[test]
753 fn into_usage_entry_parses_well_formed_attachment_id() {
754 let id = Uuid::new_v4();
755 let row = PluginInvokeAggregateRow {
756 attachment_id: Some(id.to_string()),
757 plugin_name: None,
758 plugin_version: None,
759 invoke_ms: 0,
760 memory_peak_mb: None,
761 };
762 let entry = into_usage_entry(row);
763 assert_eq!(entry.attachment_id, Some(id));
764 assert_eq!(entry.invoke_ms, 0);
765 assert_eq!(entry.memory_peak_mb, None);
766 }
767}