Skip to main content

varpulis_cli/
api.rs

1//! REST API for SaaS pipeline management
2//!
3//! Provides RESTful endpoints for deploying and managing CEP pipelines
4//! in a multi-tenant environment.
5
6use std::convert::Infallible;
7
8use axum::extract::{Json, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::{IntoResponse, Response};
11use axum::routing::{get, post};
12use axum::Router;
13use futures_util::stream;
14use indexmap::IndexMap;
15use rustc_hash::FxBuildHasher;
16use serde::{Deserialize, Serialize};
17use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
18use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
19use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
20use varpulis_runtime::Event;
21
22use crate::auth::constant_time_compare;
23use crate::billing::SharedBillingState;
24
25// =============================================================================
26// Request/Response types
27// =============================================================================
28
29#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31    pub name: String,
32    pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37    pub id: String,
38    pub name: String,
39    pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44    pub id: String,
45    pub name: String,
46    pub status: String,
47    pub source: String,
48    pub uptime_secs: u64,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub global_template_id: Option<String>,
51    /// Pipeline scope: "global", "tenant", or "own".
52    #[serde(default = "default_scope")]
53    pub scope_level: String,
54    /// Source org for inherited pipelines (None = belongs to current org).
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub inherited_from_org_id: Option<String>,
57    /// Whether the pipeline is read-only (inherited from parent/global).
58    #[serde(default)]
59    pub read_only: bool,
60}
61
62fn default_scope() -> String {
63    "own".to_string()
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct PipelineListResponse {
68    pub pipelines: Vec<PipelineInfo>,
69    pub total: usize,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub pagination: Option<PaginationMeta>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct PipelineMetricsResponse {
76    pub pipeline_id: String,
77    pub events_processed: u64,
78    pub output_events_emitted: u64,
79}
80
81#[derive(Debug, Deserialize, Serialize)]
82pub struct InjectEventRequest {
83    pub event_type: String,
84    pub fields: serde_json::Map<String, serde_json::Value>,
85}
86
87#[derive(Debug, Deserialize, Serialize)]
88pub struct InjectBatchRequest {
89    pub events: Vec<InjectEventRequest>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93pub struct InjectBatchResponse {
94    pub accepted: usize,
95    pub output_events: Vec<serde_json::Value>,
96    pub processing_time_us: u64,
97}
98
99#[derive(Debug, Deserialize, Serialize)]
100pub struct ReloadPipelineRequest {
101    pub source: String,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105pub struct CheckpointResponse {
106    pub pipeline_id: String,
107    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
108    pub events_processed: u64,
109}
110
111#[derive(Debug, Deserialize, Serialize)]
112pub struct RestoreRequest {
113    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117pub struct RestoreResponse {
118    pub pipeline_id: String,
119    pub restored: bool,
120    pub events_restored: u64,
121}
122
123#[derive(Debug, Serialize)]
124pub struct ApiError {
125    pub error: String,
126    pub code: String,
127}
128
129#[derive(Debug, Deserialize)]
130pub struct DlqQueryParams {
131    #[serde(default)]
132    pub offset: Option<usize>,
133    #[serde(default)]
134    pub limit: Option<usize>,
135}
136
137#[derive(Debug, Serialize)]
138pub struct DlqEntriesResponse {
139    pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
140    pub total: u64,
141}
142
143#[derive(Debug, Serialize)]
144pub struct DlqReplayResponse {
145    pub replayed: usize,
146}
147
148#[derive(Debug, Serialize)]
149pub struct DlqClearResponse {
150    pub cleared: bool,
151}
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct UsageResponse {
155    pub tenant_id: String,
156    pub events_processed: u64,
157    pub output_events_emitted: u64,
158    pub active_pipelines: usize,
159    pub quota: QuotaInfo,
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct QuotaInfo {
164    pub max_pipelines: usize,
165    pub max_events_per_second: u64,
166    pub max_streams_per_pipeline: usize,
167}
168
169// =============================================================================
170// Pipeline Graph (Visual Builder) Request/Response types
171// =============================================================================
172
173#[derive(Debug, Deserialize)]
174pub struct PipelineGraphRequest {
175    pub vpl: String,
176}
177
178#[derive(Debug, Serialize)]
179pub struct GenerateResponse {
180    pub vpl: String,
181}
182
183// =============================================================================
184// Tenant Admin Request/Response types
185// =============================================================================
186
187#[derive(Debug, Deserialize, Serialize)]
188pub struct CreateTenantRequest {
189    pub name: String,
190    #[serde(default)]
191    pub quota_tier: Option<String>,
192}
193
194#[derive(Debug, Serialize, Deserialize)]
195pub struct TenantResponse {
196    pub id: String,
197    pub name: String,
198    pub api_key: String,
199    pub quota: QuotaInfo,
200}
201
202#[derive(Debug, Serialize, Deserialize)]
203pub struct TenantListResponse {
204    pub tenants: Vec<TenantResponse>,
205    pub total: usize,
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub pagination: Option<PaginationMeta>,
208}
209
210#[derive(Debug, Serialize, Deserialize)]
211pub struct TenantDetailResponse {
212    pub id: String,
213    pub name: String,
214    pub api_key: String,
215    pub quota: QuotaInfo,
216    pub usage: TenantUsageInfo,
217    pub pipeline_count: usize,
218}
219
220#[derive(Debug, Serialize, Deserialize)]
221pub struct TenantUsageInfo {
222    pub events_processed: u64,
223    pub output_events_emitted: u64,
224    pub active_pipelines: usize,
225}
226
227// =============================================================================
228// API Routes
229// =============================================================================
230
231/// Build a tower-http CORS layer from an optional list of allowed origins.
232///
233/// - Explicit list of origins: restrict to those origins.
234/// - A list containing `"*"`: allow any origin (must be explicitly opted into).
235/// - `None` (default): allow only localhost origins for safety.
236fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
237    let methods = AllowMethods::list([
238        axum::http::Method::GET,
239        axum::http::Method::POST,
240        axum::http::Method::PUT,
241        axum::http::Method::DELETE,
242        axum::http::Method::OPTIONS,
243    ]);
244
245    let headers = AllowHeaders::list([
246        "content-type".parse().unwrap(),
247        "x-api-key".parse().unwrap(),
248        "authorization".parse().unwrap(),
249        "x-request-id".parse().unwrap(),
250        "traceparent".parse().unwrap(),
251    ]);
252
253    let origin = match origins {
254        Some(ref list) if list.iter().any(|o| o == "*") => {
255            tracing::warn!(
256                "CORS configured with allow_any_origin — this is unsafe for production. \
257                 Set --cors-origins to restrict to specific origins."
258            );
259            AllowOrigin::any()
260        }
261        Some(ref list) if !list.is_empty() => {
262            let origins: Vec<axum::http::HeaderValue> =
263                list.iter().filter_map(|s| s.parse().ok()).collect();
264            AllowOrigin::list(origins)
265        }
266        _ => AllowOrigin::list([
267            "http://localhost:5173".parse().unwrap(),
268            "http://localhost:8080".parse().unwrap(),
269            "http://127.0.0.1:5173".parse().unwrap(),
270            "http://127.0.0.1:8080".parse().unwrap(),
271        ]),
272    };
273
274    CorsLayer::new()
275        .allow_methods(methods)
276        .allow_headers(headers)
277        .allow_origin(origin)
278}
279
280/// Shared state for the API router.
281#[derive(Debug, Clone)]
282pub struct ApiState {
283    pub manager: SharedTenantManager,
284    pub admin_key: Option<String>,
285    pub billing_state: Option<SharedBillingState>,
286    #[cfg(feature = "saas")]
287    pub db_pool: Option<varpulis_db::PgPool>,
288}
289
290/// Axum extractor for X-API-Key header.
291#[derive(Debug)]
292pub struct ApiKey(pub String);
293
294impl<S> axum::extract::FromRequestParts<S> for ApiKey
295where
296    S: Send + Sync,
297{
298    type Rejection = Response;
299
300    async fn from_request_parts(
301        parts: &mut axum::http::request::Parts,
302        _state: &S,
303    ) -> Result<Self, Self::Rejection> {
304        parts
305            .headers
306            .get("x-api-key")
307            .and_then(|v| v.to_str().ok())
308            .map(|s| Self(s.to_string()))
309            .ok_or_else(|| {
310                (
311                    StatusCode::UNAUTHORIZED,
312                    axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
313                )
314                    .into_response()
315            })
316    }
317}
318
319/// Axum extractor for X-Admin-Key header.
320#[derive(Debug)]
321pub struct AdminKey(pub String);
322
323impl<S> axum::extract::FromRequestParts<S> for AdminKey
324where
325    S: Send + Sync,
326{
327    type Rejection = Response;
328
329    async fn from_request_parts(
330        parts: &mut axum::http::request::Parts,
331        _state: &S,
332    ) -> Result<Self, Self::Rejection> {
333        parts
334            .headers
335            .get("x-admin-key")
336            .and_then(|v| v.to_str().ok())
337            .map(|s| Self(s.to_string()))
338            .ok_or_else(|| {
339                (
340                    StatusCode::UNAUTHORIZED,
341                    axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
342                )
343                    .into_response()
344            })
345    }
346}
347
348/// Build the complete API route tree
349pub fn api_routes(
350    manager: SharedTenantManager,
351    admin_key: Option<String>,
352    cors_origins: Option<Vec<String>>,
353    billing_state: Option<SharedBillingState>,
354    #[cfg(feature = "saas")] db_pool: Option<varpulis_db::PgPool>,
355) -> Router {
356    let state = ApiState {
357        manager,
358        admin_key,
359        billing_state,
360        #[cfg(feature = "saas")]
361        db_pool,
362    };
363
364    let cors = build_cors(cors_origins);
365
366    Router::new()
367        // Pipeline CRUD
368        .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
369        .route(
370            "/api/v1/pipelines/{pipeline_id}",
371            get(handle_get).delete(handle_delete),
372        )
373        // Pipeline actions
374        .route(
375            "/api/v1/pipelines/{pipeline_id}/events",
376            post(handle_inject),
377        )
378        .route(
379            "/api/v1/pipelines/{pipeline_id}/events-batch",
380            post(handle_inject_batch),
381        )
382        .route(
383            "/api/v1/pipelines/{pipeline_id}/checkpoint",
384            post(handle_checkpoint),
385        )
386        .route(
387            "/api/v1/pipelines/{pipeline_id}/restore",
388            post(handle_restore),
389        )
390        .route(
391            "/api/v1/pipelines/{pipeline_id}/metrics",
392            get(handle_metrics),
393        )
394        .route(
395            "/api/v1/pipelines/{pipeline_id}/topology",
396            get(handle_topology),
397        )
398        .route(
399            "/api/v1/pipelines/{pipeline_id}/reload",
400            post(handle_reload),
401        )
402        .route("/api/v1/usage", get(handle_usage))
403        .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
404        // DLQ routes
405        .route(
406            "/api/v1/pipelines/{pipeline_id}/dlq",
407            get(handle_dlq_get).delete(handle_dlq_clear),
408        )
409        .route(
410            "/api/v1/pipelines/{pipeline_id}/dlq/replay",
411            post(handle_dlq_replay),
412        )
413        // Pipeline graph (visual builder)
414        .route("/api/v1/pipeline/graph", post(handle_pipeline_to_graph))
415        .route("/api/v1/pipeline/generate", post(handle_graph_to_pipeline))
416        // Tenant admin routes
417        .route(
418            "/api/v1/tenants",
419            post(handle_create_tenant).get(handle_list_tenants),
420        )
421        .route(
422            "/api/v1/tenants/{tenant_id}",
423            get(handle_get_tenant).delete(handle_delete_tenant),
424        )
425        .layer(cors)
426        .with_state(state)
427}
428
429// =============================================================================
430// Handlers
431// =============================================================================
432
433async fn handle_deploy(
434    State(state): State<ApiState>,
435    ApiKey(api_key): ApiKey,
436    Json(body): Json<DeployPipelineRequest>,
437) -> Response {
438    let manager = &state.manager;
439    let mut mgr = manager.write().await;
440
441    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
442        Some(id) => id.clone(),
443        None => {
444            return error_response(
445                StatusCode::UNAUTHORIZED,
446                "invalid_api_key",
447                "Invalid API key",
448            )
449        }
450    };
451
452    let pipeline_name = body.name.clone();
453    #[cfg(feature = "saas")]
454    let vpl_source = body.source.clone();
455
456    let result = mgr
457        .deploy_pipeline_on_tenant(&tenant_id, body.name, body.source)
458        .await;
459
460    match result {
461        Ok(id) => {
462            mgr.persist_if_needed(&tenant_id);
463
464            // Sync pipeline to DB for hierarchy-aware views
465            #[cfg(feature = "saas")]
466            if let Some(ref pool) = state.db_pool {
467                if let Ok(org_uuid) = tenant_id.0.parse::<uuid::Uuid>() {
468                    if let Err(e) = varpulis_db::repo::create_scoped_pipeline(
469                        pool,
470                        org_uuid,
471                        &pipeline_name,
472                        &vpl_source,
473                        "own",
474                    )
475                    .await
476                    {
477                        tracing::warn!("Failed to sync pipeline to DB: {}", e);
478                    }
479                }
480            }
481
482            let resp = DeployPipelineResponse {
483                id,
484                name: pipeline_name,
485                status: "running".to_string(),
486            };
487            (StatusCode::CREATED, axum::Json(&resp)).into_response()
488        }
489        Err(e) => tenant_error_response(e),
490    }
491}
492
493async fn handle_list(
494    State(state): State<ApiState>,
495    ApiKey(api_key): ApiKey,
496    Query(pagination): Query<PaginationParams>,
497) -> Response {
498    let manager = &state.manager;
499    if pagination.exceeds_max() {
500        return error_response(
501            StatusCode::BAD_REQUEST,
502            "invalid_limit",
503            &format!("limit must not exceed {MAX_LIMIT}"),
504        );
505    }
506
507    let mgr = manager.read().await;
508
509    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
510        Some(id) => id.clone(),
511        None => {
512            return error_response(
513                StatusCode::UNAUTHORIZED,
514                "invalid_api_key",
515                "Invalid API key",
516            )
517        }
518    };
519
520    let tenant = match mgr.get_tenant(&tenant_id) {
521        Some(t) => t,
522        None => {
523            return error_response(
524                StatusCode::NOT_FOUND,
525                "tenant_not_found",
526                "Tenant not found",
527            )
528        }
529    };
530
531    let all_pipelines: Vec<PipelineInfo> = tenant
532        .pipelines
533        .values()
534        .map(|p| {
535            let is_global = p.global_template_id.is_some();
536            PipelineInfo {
537                id: p.id.clone(),
538                name: p.name.clone(),
539                status: p.status.to_string(),
540                source: p.source.clone(),
541                uptime_secs: p.created_at.elapsed().as_secs(),
542                global_template_id: p.global_template_id.clone(),
543                scope_level: if is_global {
544                    "global".to_string()
545                } else {
546                    "own".to_string()
547                },
548                inherited_from_org_id: None,
549                read_only: is_global,
550            }
551        })
552        .collect();
553
554    let (pipelines, meta) = pagination.paginate(all_pipelines);
555    let total = meta.total;
556    let resp = PipelineListResponse {
557        pipelines,
558        total,
559        pagination: Some(meta),
560    };
561    axum::Json(&resp).into_response()
562}
563
564async fn handle_get(
565    State(state): State<ApiState>,
566    Path(pipeline_id): Path<String>,
567    ApiKey(api_key): ApiKey,
568) -> Response {
569    let manager = &state.manager;
570    let mgr = manager.read().await;
571
572    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
573        Some(id) => id.clone(),
574        None => {
575            return error_response(
576                StatusCode::UNAUTHORIZED,
577                "invalid_api_key",
578                "Invalid API key",
579            )
580        }
581    };
582
583    let tenant = match mgr.get_tenant(&tenant_id) {
584        Some(t) => t,
585        None => {
586            return error_response(
587                StatusCode::NOT_FOUND,
588                "tenant_not_found",
589                "Tenant not found",
590            )
591        }
592    };
593
594    match tenant.pipelines.get(&pipeline_id) {
595        Some(p) => {
596            let is_global = p.global_template_id.is_some();
597            let info = PipelineInfo {
598                id: p.id.clone(),
599                name: p.name.clone(),
600                status: p.status.to_string(),
601                source: p.source.clone(),
602                uptime_secs: p.created_at.elapsed().as_secs(),
603                global_template_id: p.global_template_id.clone(),
604                scope_level: if is_global {
605                    "global".to_string()
606                } else {
607                    "own".to_string()
608                },
609                inherited_from_org_id: None,
610                read_only: is_global,
611            };
612            axum::Json(&info).into_response()
613        }
614        None => error_response(
615            StatusCode::NOT_FOUND,
616            "pipeline_not_found",
617            "Pipeline not found",
618        ),
619    }
620}
621
622async fn handle_delete(
623    State(state): State<ApiState>,
624    Path(pipeline_id): Path<String>,
625    ApiKey(api_key): ApiKey,
626) -> Response {
627    let manager = &state.manager;
628    let mut mgr = manager.write().await;
629
630    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
631        Some(id) => id.clone(),
632        None => {
633            return error_response(
634                StatusCode::UNAUTHORIZED,
635                "invalid_api_key",
636                "Invalid API key",
637            )
638        }
639    };
640
641    #[cfg(feature = "saas")]
642    let mut pipeline_name_for_db = None;
643
644    let result = {
645        let tenant = match mgr.get_tenant_mut(&tenant_id) {
646            Some(t) => t,
647            None => {
648                return error_response(
649                    StatusCode::NOT_FOUND,
650                    "tenant_not_found",
651                    "Tenant not found",
652                )
653            }
654        };
655
656        // Protect global pipelines from tenant deletion
657        if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
658            if pipeline.global_template_id.is_some() {
659                return error_response(
660                    StatusCode::FORBIDDEN,
661                    "global_pipeline_protected",
662                    "Global pipelines can only be managed by admin",
663                );
664            }
665            #[cfg(feature = "saas")]
666            {
667                pipeline_name_for_db = Some(pipeline.name.clone());
668            }
669        }
670
671        tenant.remove_pipeline(&pipeline_id)
672    };
673
674    match result {
675        Ok(()) => {
676            mgr.persist_if_needed(&tenant_id);
677
678            // Sync deletion to DB
679            #[cfg(feature = "saas")]
680            if let Some(ref pool) = state.db_pool {
681                if let (Ok(org_uuid), Some(name)) =
682                    (tenant_id.0.parse::<uuid::Uuid>(), &pipeline_name_for_db)
683                {
684                    let _ = varpulis_db::repo::delete_pipeline_by_name(pool, org_uuid, name).await;
685                }
686            }
687
688            axum::Json(serde_json::json!({"deleted": true})).into_response()
689        }
690        Err(e) => tenant_error_response(e),
691    }
692}
693
694async fn handle_inject(
695    State(state): State<ApiState>,
696    Path(pipeline_id): Path<String>,
697    ApiKey(api_key): ApiKey,
698    Json(body): Json<InjectEventRequest>,
699) -> Response {
700    let manager = &state.manager;
701    let billing_state = &state.billing_state;
702    // Check usage limit (SaaS mode only)
703    #[cfg(feature = "saas")]
704    let mut usage_warning: Option<f64> = None;
705    #[cfg(feature = "saas")]
706    if let Some(ref bs) = billing_state {
707        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
708            match bs.check_usage_limit(org_id, 1).await {
709                crate::billing::UsageCheckResult::Exceeded(err) => {
710                    return crate::billing::usage_limit_response(&err);
711                }
712                crate::billing::UsageCheckResult::ApproachingLimit { usage_percent } => {
713                    usage_warning = Some(usage_percent);
714                }
715                crate::billing::UsageCheckResult::Ok => {}
716            }
717            // Record the event for usage tracking
718            bs.usage.write().await.record_events(org_id, 1);
719        }
720    }
721    #[cfg(not(feature = "saas"))]
722    let _ = &billing_state;
723
724    let mut mgr = manager.write().await;
725
726    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
727        Some(id) => id.clone(),
728        None => {
729            return error_response(
730                StatusCode::UNAUTHORIZED,
731                "invalid_api_key",
732                "Invalid API key",
733            )
734        }
735    };
736
737    // Check backpressure before processing
738    if let Err(e) = mgr.check_backpressure() {
739        return tenant_error_response(e);
740    }
741
742    let mut event = Event::new(body.event_type.clone());
743    for (key, value) in &body.fields {
744        let v = json_to_runtime_value(value);
745        event = event.with_field(key.as_str(), v);
746    }
747
748    match mgr
749        .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
750        .await
751    {
752        Ok(output_events) => {
753            let events_json: Vec<serde_json::Value> = output_events
754                .iter()
755                .map(|e| {
756                    let mut fields = serde_json::Map::new();
757                    for (k, v) in &e.data {
758                        fields.insert(k.to_string(), crate::websocket::value_to_json(v));
759                    }
760                    serde_json::json!({
761                        "event_type": e.event_type.to_string(),
762                        "fields": serde_json::Value::Object(fields),
763                    })
764                })
765                .collect();
766            let response = serde_json::json!({
767                "accepted": true,
768                "output_events": events_json,
769            });
770            #[cfg(feature = "saas")]
771            if let Some(pct) = usage_warning {
772                return (
773                    StatusCode::OK,
774                    [("X-Usage-Warning", format!("approaching_limit ({pct:.0}%)"))],
775                    axum::Json(response),
776                )
777                    .into_response();
778            }
779            axum::Json(response).into_response()
780        }
781        Err(e) => tenant_error_response(e),
782    }
783}
784
785async fn handle_inject_batch(
786    State(state): State<ApiState>,
787    Path(pipeline_id): Path<String>,
788    ApiKey(api_key): ApiKey,
789    Json(body): Json<InjectBatchRequest>,
790) -> Response {
791    let manager = &state.manager;
792    let billing_state = &state.billing_state;
793    let event_count = body.events.len() as i64;
794
795    // Check usage limit for the entire batch (SaaS mode only)
796    #[cfg(feature = "saas")]
797    if let Some(ref bs) = billing_state {
798        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
799            match bs.check_usage_limit(org_id, event_count).await {
800                crate::billing::UsageCheckResult::Exceeded(err) => {
801                    return crate::billing::usage_limit_response(&err);
802                }
803                crate::billing::UsageCheckResult::ApproachingLimit { .. }
804                | crate::billing::UsageCheckResult::Ok => {}
805            }
806            // Record the batch for usage tracking
807            bs.usage.write().await.record_events(org_id, event_count);
808        }
809    }
810    #[cfg(not(feature = "saas"))]
811    let _ = (&billing_state, event_count);
812
813    let mut mgr = manager.write().await;
814
815    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
816        Some(id) => id.clone(),
817        None => {
818            return error_response(
819                StatusCode::UNAUTHORIZED,
820                "invalid_api_key",
821                "Invalid API key",
822            )
823        }
824    };
825
826    // Check backpressure before processing the batch
827    if let Err(e) = mgr.check_backpressure() {
828        return tenant_error_response(e);
829    }
830
831    let start = std::time::Instant::now();
832    let mut accepted = 0usize;
833    let mut output_events = Vec::new();
834
835    for req in body.events {
836        let mut event = Event::new(req.event_type.clone());
837        for (key, value) in &req.fields {
838            let v = json_to_runtime_value(value);
839            event = event.with_field(key.as_str(), v);
840        }
841
842        match mgr
843            .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
844            .await
845        {
846            Ok(outputs) => {
847                accepted += 1;
848                for e in &outputs {
849                    let mut flat = serde_json::Map::new();
850                    flat.insert(
851                        "event_type".to_string(),
852                        serde_json::Value::String(e.event_type.to_string()),
853                    );
854                    for (k, v) in &e.data {
855                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
856                    }
857                    output_events.push(serde_json::Value::Object(flat));
858                }
859            }
860            Err(TenantError::BackpressureExceeded { .. }) => {
861                // Stop processing the rest of the batch on backpressure
862                break;
863            }
864            Err(_) => {
865                // Skip other failed events silently in batch mode
866            }
867        }
868    }
869
870    let processing_time_us = start.elapsed().as_micros() as u64;
871
872    let resp = InjectBatchResponse {
873        accepted,
874        output_events,
875        processing_time_us,
876    };
877    axum::Json(&resp).into_response()
878}
879
880async fn handle_checkpoint(
881    State(state): State<ApiState>,
882    Path(pipeline_id): Path<String>,
883    ApiKey(api_key): ApiKey,
884) -> Response {
885    let manager = &state.manager;
886    let mgr = manager.read().await;
887
888    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
889        Some(id) => id.clone(),
890        None => {
891            return error_response(
892                StatusCode::UNAUTHORIZED,
893                "invalid_api_key",
894                "Invalid API key",
895            )
896        }
897    };
898
899    let tenant = match mgr.get_tenant(&tenant_id) {
900        Some(t) => t,
901        None => {
902            return error_response(
903                StatusCode::NOT_FOUND,
904                "tenant_not_found",
905                "Tenant not found",
906            )
907        }
908    };
909
910    match tenant.checkpoint_pipeline(&pipeline_id).await {
911        Ok(checkpoint) => {
912            let resp = CheckpointResponse {
913                pipeline_id,
914                events_processed: checkpoint.events_processed,
915                checkpoint,
916            };
917            axum::Json(&resp).into_response()
918        }
919        Err(e) => tenant_error_response(e),
920    }
921}
922
923async fn handle_restore(
924    State(state): State<ApiState>,
925    Path(pipeline_id): Path<String>,
926    ApiKey(api_key): ApiKey,
927    Json(body): Json<RestoreRequest>,
928) -> Response {
929    let manager = &state.manager;
930    let mut mgr = manager.write().await;
931
932    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
933        Some(id) => id.clone(),
934        None => {
935            return error_response(
936                StatusCode::UNAUTHORIZED,
937                "invalid_api_key",
938                "Invalid API key",
939            )
940        }
941    };
942
943    let tenant = match mgr.get_tenant_mut(&tenant_id) {
944        Some(t) => t,
945        None => {
946            return error_response(
947                StatusCode::NOT_FOUND,
948                "tenant_not_found",
949                "Tenant not found",
950            )
951        }
952    };
953
954    match tenant
955        .restore_pipeline(&pipeline_id, &body.checkpoint)
956        .await
957    {
958        Ok(()) => {
959            let resp = RestoreResponse {
960                pipeline_id,
961                restored: true,
962                events_restored: body.checkpoint.events_processed,
963            };
964            axum::Json(&resp).into_response()
965        }
966        Err(e) => tenant_error_response(e),
967    }
968}
969
970async fn handle_metrics(
971    State(state): State<ApiState>,
972    Path(pipeline_id): Path<String>,
973    ApiKey(api_key): ApiKey,
974) -> Response {
975    let manager = &state.manager;
976    let mgr = manager.read().await;
977
978    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
979        Some(id) => id.clone(),
980        None => {
981            return error_response(
982                StatusCode::UNAUTHORIZED,
983                "invalid_api_key",
984                "Invalid API key",
985            )
986        }
987    };
988
989    let tenant = match mgr.get_tenant(&tenant_id) {
990        Some(t) => t,
991        None => {
992            return error_response(
993                StatusCode::NOT_FOUND,
994                "tenant_not_found",
995                "Tenant not found",
996            )
997        }
998    };
999
1000    if !tenant.pipelines.contains_key(&pipeline_id) {
1001        return error_response(
1002            StatusCode::NOT_FOUND,
1003            "pipeline_not_found",
1004            "Pipeline not found",
1005        );
1006    }
1007
1008    let resp = PipelineMetricsResponse {
1009        pipeline_id,
1010        events_processed: tenant.usage.events_processed,
1011        output_events_emitted: tenant.usage.output_events_emitted,
1012    };
1013    axum::Json(&resp).into_response()
1014}
1015
1016async fn handle_topology(
1017    State(state): State<ApiState>,
1018    Path(pipeline_id): Path<String>,
1019    ApiKey(api_key): ApiKey,
1020) -> Response {
1021    let manager = &state.manager;
1022    let mgr = manager.read().await;
1023
1024    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1025        Some(id) => id.clone(),
1026        None => {
1027            return error_response(
1028                StatusCode::UNAUTHORIZED,
1029                "invalid_api_key",
1030                "Invalid API key",
1031            )
1032        }
1033    };
1034
1035    let tenant = match mgr.get_tenant(&tenant_id) {
1036        Some(t) => t,
1037        None => {
1038            return error_response(
1039                StatusCode::NOT_FOUND,
1040                "tenant_not_found",
1041                "Tenant not found",
1042            )
1043        }
1044    };
1045
1046    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1047        Some(p) => p,
1048        None => {
1049            return error_response(
1050                StatusCode::NOT_FOUND,
1051                "pipeline_not_found",
1052                "Pipeline not found",
1053            )
1054        }
1055    };
1056
1057    let engine = pipeline.engine.lock().await;
1058    let topology = engine.topology();
1059    axum::Json(&topology).into_response()
1060}
1061
1062async fn handle_reload(
1063    State(state): State<ApiState>,
1064    Path(pipeline_id): Path<String>,
1065    ApiKey(api_key): ApiKey,
1066    Json(body): Json<ReloadPipelineRequest>,
1067) -> Response {
1068    let manager = &state.manager;
1069    let mut mgr = manager.write().await;
1070
1071    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1072        Some(id) => id.clone(),
1073        None => {
1074            return error_response(
1075                StatusCode::UNAUTHORIZED,
1076                "invalid_api_key",
1077                "Invalid API key",
1078            )
1079        }
1080    };
1081
1082    let result = {
1083        let tenant = match mgr.get_tenant_mut(&tenant_id) {
1084            Some(t) => t,
1085            None => {
1086                return error_response(
1087                    StatusCode::NOT_FOUND,
1088                    "tenant_not_found",
1089                    "Tenant not found",
1090                )
1091            }
1092        };
1093
1094        // Protect global pipelines from tenant reload
1095        if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
1096            if pipeline.global_template_id.is_some() {
1097                return error_response(
1098                    StatusCode::FORBIDDEN,
1099                    "global_pipeline_protected",
1100                    "Global pipelines can only be managed by admin",
1101                );
1102            }
1103        }
1104
1105        tenant.reload_pipeline(&pipeline_id, body.source).await
1106    };
1107
1108    match result {
1109        Ok(()) => {
1110            mgr.persist_if_needed(&tenant_id);
1111            axum::Json(serde_json::json!({"reloaded": true})).into_response()
1112        }
1113        Err(e) => tenant_error_response(e),
1114    }
1115}
1116
1117async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
1118    let manager = &state.manager;
1119    let mgr = manager.read().await;
1120
1121    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1122        Some(id) => id.clone(),
1123        None => {
1124            return error_response(
1125                StatusCode::UNAUTHORIZED,
1126                "invalid_api_key",
1127                "Invalid API key",
1128            )
1129        }
1130    };
1131
1132    let tenant = match mgr.get_tenant(&tenant_id) {
1133        Some(t) => t,
1134        None => {
1135            return error_response(
1136                StatusCode::NOT_FOUND,
1137                "tenant_not_found",
1138                "Tenant not found",
1139            )
1140        }
1141    };
1142
1143    let resp = UsageResponse {
1144        tenant_id: tenant.id.to_string(),
1145        events_processed: tenant.usage.events_processed,
1146        output_events_emitted: tenant.usage.output_events_emitted,
1147        active_pipelines: tenant.usage.active_pipelines,
1148        quota: QuotaInfo {
1149            max_pipelines: tenant.quota.max_pipelines,
1150            max_events_per_second: tenant.quota.max_events_per_second,
1151            max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
1152        },
1153    };
1154    axum::Json(&resp).into_response()
1155}
1156
1157/// Handle SSE log streaming for a pipeline
1158async fn handle_logs(
1159    State(state): State<ApiState>,
1160    Path(pipeline_id): Path<String>,
1161    ApiKey(api_key): ApiKey,
1162) -> Response {
1163    let manager = &state.manager;
1164    let mgr = manager.read().await;
1165
1166    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1167        Some(id) => id.clone(),
1168        None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
1169    };
1170
1171    // Verify tenant owns this pipeline
1172    let tenant = match mgr.get_tenant(&tenant_id) {
1173        Some(t) => t,
1174        None => {
1175            return error_response(
1176                StatusCode::NOT_FOUND,
1177                "tenant_not_found",
1178                "Tenant not found",
1179            )
1180        }
1181    };
1182
1183    let rx: tokio::sync::broadcast::Receiver<Event> =
1184        match tenant.subscribe_pipeline_logs(&pipeline_id) {
1185            Ok(rx) => rx,
1186            Err(_) => {
1187                return error_response(
1188                    StatusCode::NOT_FOUND,
1189                    "pipeline_not_found",
1190                    &format!("Pipeline {pipeline_id} not found"),
1191                )
1192            }
1193        };
1194
1195    drop(mgr); // Release the read lock before streaming
1196
1197    // Create SSE stream from broadcast receiver using futures unfold
1198    let stream = stream::unfold(rx, |mut rx| async move {
1199        match rx.recv().await {
1200            Ok(event) => {
1201                let data: serde_json::Map<String, serde_json::Value> = event
1202                    .data
1203                    .iter()
1204                    .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
1205                        (k.to_string(), json_from_value(v))
1206                    })
1207                    .collect();
1208                let json = serde_json::to_string(&LogEvent {
1209                    event_type: event.event_type.to_string(),
1210                    timestamp: event.timestamp.to_rfc3339(),
1211                    data,
1212                })
1213                .unwrap_or_default();
1214                let sse = axum::response::sse::Event::default().data(json);
1215                Some((Ok::<_, Infallible>(sse), rx))
1216            }
1217            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1218                let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1219                let sse = axum::response::sse::Event::default()
1220                    .event("warning")
1221                    .data(msg);
1222                Some((Ok(sse), rx))
1223            }
1224            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1225        }
1226    });
1227
1228    axum::response::sse::Sse::new(stream)
1229        .keep_alive(axum::response::sse::KeepAlive::default())
1230        .into_response()
1231}
1232
1233#[derive(Serialize)]
1234struct LogEvent {
1235    event_type: String,
1236    timestamp: String,
1237    data: serde_json::Map<String, serde_json::Value>,
1238}
1239
1240fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1241    match v {
1242        varpulis_core::Value::Null => serde_json::Value::Null,
1243        varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1244        varpulis_core::Value::Int(i) => serde_json::json!(*i),
1245        varpulis_core::Value::Float(f) => serde_json::json!(*f),
1246        varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1247        varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1248        varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1249        varpulis_core::Value::Array(arr) => {
1250            serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1251        }
1252        varpulis_core::Value::Map(map) => {
1253            let obj: serde_json::Map<String, serde_json::Value> = map
1254                .iter()
1255                .map(|(k, v)| (k.to_string(), json_from_value(v)))
1256                .collect();
1257            serde_json::Value::Object(obj)
1258        }
1259    }
1260}
1261
1262// =============================================================================
1263// DLQ Handlers
1264// =============================================================================
1265
1266async fn handle_dlq_get(
1267    State(state): State<ApiState>,
1268    Path(pipeline_id): Path<String>,
1269    ApiKey(api_key): ApiKey,
1270    Query(params): Query<DlqQueryParams>,
1271) -> Response {
1272    let manager = &state.manager;
1273    let mgr = manager.read().await;
1274
1275    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1276        Some(id) => id.clone(),
1277        None => {
1278            return error_response(
1279                StatusCode::UNAUTHORIZED,
1280                "invalid_api_key",
1281                "Invalid API key",
1282            )
1283        }
1284    };
1285
1286    let tenant = match mgr.get_tenant(&tenant_id) {
1287        Some(t) => t,
1288        None => {
1289            return error_response(
1290                StatusCode::NOT_FOUND,
1291                "tenant_not_found",
1292                "Tenant not found",
1293            )
1294        }
1295    };
1296
1297    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1298        Some(p) => p,
1299        None => {
1300            return error_response(
1301                StatusCode::NOT_FOUND,
1302                "pipeline_not_found",
1303                "Pipeline not found",
1304            )
1305        }
1306    };
1307
1308    let engine = pipeline.engine.lock().await;
1309    let dlq = match engine.dlq() {
1310        Some(d) => d,
1311        None => {
1312            let resp = DlqEntriesResponse {
1313                entries: Vec::new(),
1314                total: 0,
1315            };
1316            return axum::Json(&resp).into_response();
1317        }
1318    };
1319
1320    let offset = params.offset.unwrap_or(0);
1321    let limit = params.limit.unwrap_or(100).min(1000);
1322
1323    match dlq.read_entries(offset, limit) {
1324        Ok(entries) => {
1325            let resp = DlqEntriesResponse {
1326                total: dlq.line_count(),
1327                entries,
1328            };
1329            axum::Json(&resp).into_response()
1330        }
1331        Err(e) => error_response(
1332            StatusCode::INTERNAL_SERVER_ERROR,
1333            "dlq_read_error",
1334            &format!("Failed to read DLQ: {e}"),
1335        ),
1336    }
1337}
1338
1339async fn handle_dlq_replay(
1340    State(state): State<ApiState>,
1341    Path(pipeline_id): Path<String>,
1342    ApiKey(api_key): ApiKey,
1343) -> Response {
1344    let manager = &state.manager;
1345    let mgr = manager.read().await;
1346
1347    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1348        Some(id) => id.clone(),
1349        None => {
1350            return error_response(
1351                StatusCode::UNAUTHORIZED,
1352                "invalid_api_key",
1353                "Invalid API key",
1354            )
1355        }
1356    };
1357
1358    let tenant = match mgr.get_tenant(&tenant_id) {
1359        Some(t) => t,
1360        None => {
1361            return error_response(
1362                StatusCode::NOT_FOUND,
1363                "tenant_not_found",
1364                "Tenant not found",
1365            )
1366        }
1367    };
1368
1369    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1370        Some(p) => p,
1371        None => {
1372            return error_response(
1373                StatusCode::NOT_FOUND,
1374                "pipeline_not_found",
1375                "Pipeline not found",
1376            )
1377        }
1378    };
1379
1380    // Read all DLQ entries
1381    let entries = {
1382        let engine = pipeline.engine.lock().await;
1383        let dlq = match engine.dlq() {
1384            Some(d) => d,
1385            None => {
1386                let resp = DlqReplayResponse { replayed: 0 };
1387                return axum::Json(&resp).into_response();
1388            }
1389        };
1390        // Read all entries (up to a reasonable limit)
1391        match dlq.read_entries(0, 100_000) {
1392            Ok(entries) => entries,
1393            Err(e) => {
1394                return error_response(
1395                    StatusCode::INTERNAL_SERVER_ERROR,
1396                    "dlq_read_error",
1397                    &format!("Failed to read DLQ: {e}"),
1398                )
1399            }
1400        }
1401    };
1402
1403    // Replay each entry as an event into the pipeline engine
1404    let mut replayed = 0usize;
1405    {
1406        let mut engine = pipeline.engine.lock().await;
1407        for entry in &entries {
1408            // Reconstruct event from the DLQ entry
1409            let event_type = entry
1410                .event
1411                .get("event_type")
1412                .and_then(|v| v.as_str())
1413                .unwrap_or("unknown");
1414            let mut event = Event::new(event_type);
1415            if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1416                for (k, v) in data {
1417                    let rv = json_to_runtime_value(v);
1418                    event = event.with_field(k.as_str(), rv);
1419                }
1420            }
1421            if engine.process(event).await.is_ok() {
1422                replayed += 1;
1423            }
1424        }
1425    }
1426
1427    let resp = DlqReplayResponse { replayed };
1428    axum::Json(&resp).into_response()
1429}
1430
1431async fn handle_dlq_clear(
1432    State(state): State<ApiState>,
1433    Path(pipeline_id): Path<String>,
1434    ApiKey(api_key): ApiKey,
1435) -> Response {
1436    let manager = &state.manager;
1437    let mgr = manager.read().await;
1438
1439    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1440        Some(id) => id.clone(),
1441        None => {
1442            return error_response(
1443                StatusCode::UNAUTHORIZED,
1444                "invalid_api_key",
1445                "Invalid API key",
1446            )
1447        }
1448    };
1449
1450    let tenant = match mgr.get_tenant(&tenant_id) {
1451        Some(t) => t,
1452        None => {
1453            return error_response(
1454                StatusCode::NOT_FOUND,
1455                "tenant_not_found",
1456                "Tenant not found",
1457            )
1458        }
1459    };
1460
1461    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1462        Some(p) => p,
1463        None => {
1464            return error_response(
1465                StatusCode::NOT_FOUND,
1466                "pipeline_not_found",
1467                "Pipeline not found",
1468            )
1469        }
1470    };
1471
1472    let engine = pipeline.engine.lock().await;
1473    match engine.dlq() {
1474        Some(dlq) => match dlq.clear() {
1475            Ok(()) => {
1476                let resp = DlqClearResponse { cleared: true };
1477                axum::Json(&resp).into_response()
1478            }
1479            Err(e) => error_response(
1480                StatusCode::INTERNAL_SERVER_ERROR,
1481                "dlq_clear_error",
1482                &format!("Failed to clear DLQ: {e}"),
1483            ),
1484        },
1485        None => {
1486            let resp = DlqClearResponse { cleared: true };
1487            axum::Json(&resp).into_response()
1488        }
1489    }
1490}
1491
1492// =============================================================================
1493// Tenant Admin Routes
1494// =============================================================================
1495
1496// Tenant admin routes are now part of the main Router in api_routes()
1497
1498#[allow(clippy::result_large_err)]
1499fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1500    match configured {
1501        None => Err(error_response(
1502            StatusCode::FORBIDDEN,
1503            "admin_disabled",
1504            "Admin API is disabled (no --api-key configured)",
1505        )),
1506        Some(key) => {
1507            if constant_time_compare(key, provided) {
1508                Ok(())
1509            } else {
1510                Err(error_response(
1511                    StatusCode::UNAUTHORIZED,
1512                    "invalid_admin_key",
1513                    "Invalid admin key",
1514                ))
1515            }
1516        }
1517    }
1518}
1519
1520fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1521    match tier {
1522        Some("free") => TenantQuota::free(),
1523        Some("pro") => TenantQuota::pro(),
1524        Some("enterprise") => TenantQuota::enterprise(),
1525        _ => TenantQuota::default(),
1526    }
1527}
1528
1529async fn handle_create_tenant(
1530    State(state): State<ApiState>,
1531    AdminKey(admin_key): AdminKey,
1532    Json(body): Json<CreateTenantRequest>,
1533) -> Response {
1534    let manager = &state.manager;
1535    let configured_key = &state.admin_key;
1536    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1537        return resp;
1538    }
1539
1540    let api_key = uuid::Uuid::new_v4().to_string();
1541    let quota = quota_from_tier(body.quota_tier.as_deref());
1542
1543    let mut mgr = manager.write().await;
1544    match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1545        Ok(tenant_id) => {
1546            let resp = TenantResponse {
1547                id: tenant_id.as_str().to_string(),
1548                name: body.name,
1549                api_key,
1550                quota: QuotaInfo {
1551                    max_pipelines: quota.max_pipelines,
1552                    max_events_per_second: quota.max_events_per_second,
1553                    max_streams_per_pipeline: quota.max_streams_per_pipeline,
1554                },
1555            };
1556            (StatusCode::CREATED, axum::Json(&resp)).into_response()
1557        }
1558        Err(e) => tenant_error_response(e),
1559    }
1560}
1561
1562async fn handle_list_tenants(
1563    State(state): State<ApiState>,
1564    AdminKey(admin_key): AdminKey,
1565    Query(pagination): Query<PaginationParams>,
1566) -> Response {
1567    let manager = &state.manager;
1568    let configured_key = &state.admin_key;
1569    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1570        return resp;
1571    }
1572
1573    if pagination.exceeds_max() {
1574        return error_response(
1575            StatusCode::BAD_REQUEST,
1576            "invalid_limit",
1577            &format!("limit must not exceed {MAX_LIMIT}"),
1578        );
1579    }
1580
1581    let mgr = manager.read().await;
1582    let all_tenants: Vec<TenantResponse> = mgr
1583        .list_tenants()
1584        .iter()
1585        .map(|t| TenantResponse {
1586            id: t.id.as_str().to_string(),
1587            name: t.name.clone(),
1588            api_key: format!("{}...", &t.api_key_hash[..8]),
1589            quota: QuotaInfo {
1590                max_pipelines: t.quota.max_pipelines,
1591                max_events_per_second: t.quota.max_events_per_second,
1592                max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1593            },
1594        })
1595        .collect();
1596    let (tenants, meta) = pagination.paginate(all_tenants);
1597    let total = meta.total;
1598    let resp = TenantListResponse {
1599        tenants,
1600        total,
1601        pagination: Some(meta),
1602    };
1603    axum::Json(&resp).into_response()
1604}
1605
1606async fn handle_get_tenant(
1607    State(state): State<ApiState>,
1608    Path(tenant_id_str): Path<String>,
1609    AdminKey(admin_key): AdminKey,
1610) -> Response {
1611    let manager = &state.manager;
1612    let configured_key = &state.admin_key;
1613    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1614        return resp;
1615    }
1616
1617    let mgr = manager.read().await;
1618    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1619    match mgr.get_tenant(&tenant_id) {
1620        Some(t) => {
1621            let resp = TenantDetailResponse {
1622                id: t.id.as_str().to_string(),
1623                name: t.name.clone(),
1624                api_key: format!("{}...", &t.api_key_hash[..8]),
1625                quota: QuotaInfo {
1626                    max_pipelines: t.quota.max_pipelines,
1627                    max_events_per_second: t.quota.max_events_per_second,
1628                    max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1629                },
1630                usage: TenantUsageInfo {
1631                    events_processed: t.usage.events_processed,
1632                    output_events_emitted: t.usage.output_events_emitted,
1633                    active_pipelines: t.usage.active_pipelines,
1634                },
1635                pipeline_count: t.pipelines.len(),
1636            };
1637            axum::Json(&resp).into_response()
1638        }
1639        None => error_response(
1640            StatusCode::NOT_FOUND,
1641            "tenant_not_found",
1642            "Tenant not found",
1643        ),
1644    }
1645}
1646
1647async fn handle_delete_tenant(
1648    State(state): State<ApiState>,
1649    Path(tenant_id_str): Path<String>,
1650    AdminKey(admin_key): AdminKey,
1651) -> Response {
1652    let manager = &state.manager;
1653    let configured_key = &state.admin_key;
1654    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1655        return resp;
1656    }
1657
1658    let mut mgr = manager.write().await;
1659    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1660    match mgr.remove_tenant(&tenant_id) {
1661        Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1662        Err(e) => tenant_error_response(e),
1663    }
1664}
1665
1666// =============================================================================
1667// Pipeline Graph Handlers (Visual Builder)
1668// =============================================================================
1669
1670async fn handle_pipeline_to_graph(Json(body): Json<PipelineGraphRequest>) -> Response {
1671    match varpulis_parser::parse(&body.vpl) {
1672        Ok(program) => {
1673            let graph = varpulis_runtime::engine::graph::program_to_graph(&program);
1674            (StatusCode::OK, axum::Json(graph)).into_response()
1675        }
1676        Err(e) => error_response(
1677            StatusCode::BAD_REQUEST,
1678            "parse_error",
1679            &format!("Failed to parse VPL: {e}"),
1680        ),
1681    }
1682}
1683
1684async fn handle_graph_to_pipeline(
1685    Json(graph): Json<varpulis_runtime::engine::graph::PipelineGraph>,
1686) -> Response {
1687    let vpl = varpulis_runtime::engine::graph::graph_to_vpl(&graph);
1688    (StatusCode::OK, axum::Json(GenerateResponse { vpl })).into_response()
1689}
1690
1691// =============================================================================
1692// Helpers
1693// =============================================================================
1694
1695fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1696    let body = ApiError {
1697        error: message.to_string(),
1698        code: code.to_string(),
1699    };
1700    (status, axum::Json(body)).into_response()
1701}
1702
1703fn tenant_error_response(err: TenantError) -> Response {
1704    // BackpressureExceeded needs a Retry-After header, handle it specially
1705    if let TenantError::BackpressureExceeded { current, max } = &err {
1706        let body = serde_json::json!({
1707            "error": format!("queue depth {current} exceeds maximum {max}"),
1708            "code": "queue_depth_exceeded",
1709            "retry_after": 1,
1710        });
1711        return (
1712            StatusCode::TOO_MANY_REQUESTS,
1713            [("Retry-After", "1"), ("Content-Type", "application/json")],
1714            serde_json::to_string(&body).unwrap_or_default(),
1715        )
1716            .into_response();
1717    }
1718
1719    let (status, code) = match &err {
1720        TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1721        TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1722        TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1723        TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1724        TenantError::BackpressureExceeded { .. } => unreachable!(),
1725        TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1726        TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1727        TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1728    };
1729    error_response(status, code, &err.to_string())
1730}
1731
1732fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1733    match v {
1734        serde_json::Value::Null => varpulis_core::Value::Null,
1735        serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1736        serde_json::Value::Number(n) => {
1737            if let Some(i) = n.as_i64() {
1738                varpulis_core::Value::Int(i)
1739            } else if let Some(f) = n.as_f64() {
1740                varpulis_core::Value::Float(f)
1741            } else {
1742                varpulis_core::Value::Null
1743            }
1744        }
1745        serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1746        serde_json::Value::Array(arr) => {
1747            varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1748        }
1749        serde_json::Value::Object(map) => {
1750            let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1751                IndexMap::with_hasher(FxBuildHasher);
1752            for (k, v) in map {
1753                m.insert(k.as_str().into(), json_to_runtime_value(v));
1754            }
1755            varpulis_core::Value::map(m)
1756        }
1757    }
1758}
1759
1760#[cfg(test)]
1761mod tests {
1762    use std::sync::Arc;
1763
1764    use axum::body::Body;
1765    use axum::http::Request;
1766    use tokio::sync::RwLock;
1767    use tower::ServiceExt;
1768    use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1769
1770    use super::*;
1771
1772    /// Test response wrapper for axum integration tests.
1773    struct TestResponse {
1774        status: StatusCode,
1775        body: bytes::Bytes,
1776        headers: axum::http::HeaderMap,
1777    }
1778
1779    impl TestResponse {
1780        fn status(&self) -> StatusCode {
1781            self.status
1782        }
1783        fn body(&self) -> &[u8] {
1784            &self.body
1785        }
1786        fn headers(&self) -> &axum::http::HeaderMap {
1787            &self.headers
1788        }
1789    }
1790
1791    /// Test request builder for axum integration tests.
1792    struct TestRequestBuilder {
1793        method: String,
1794        path: String,
1795        headers: Vec<(String, String)>,
1796        body: Option<String>,
1797    }
1798
1799    impl TestRequestBuilder {
1800        fn new() -> Self {
1801            Self {
1802                method: "GET".to_string(),
1803                path: "/".to_string(),
1804                headers: Vec::new(),
1805                body: None,
1806            }
1807        }
1808        fn method(mut self, m: &str) -> Self {
1809            self.method = m.to_string();
1810            self
1811        }
1812        fn path(mut self, p: &str) -> Self {
1813            self.path = p.to_string();
1814            self
1815        }
1816        fn header(mut self, k: &str, v: &str) -> Self {
1817            self.headers.push((k.to_string(), v.to_string()));
1818            self
1819        }
1820        fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1821            self.body = Some(serde_json::to_string(body).unwrap());
1822            self.headers
1823                .push(("content-type".to_string(), "application/json".to_string()));
1824            self
1825        }
1826        async fn reply(self, app: &Router) -> TestResponse {
1827            let mut builder = Request::builder()
1828                .method(self.method.as_str())
1829                .uri(&self.path);
1830            for (k, v) in &self.headers {
1831                builder = builder.header(k.as_str(), v.as_str());
1832            }
1833            let body = match self.body {
1834                Some(b) => Body::from(b),
1835                None => Body::empty(),
1836            };
1837            let req = builder.body(body).unwrap();
1838            let resp = app.clone().oneshot(req).await.unwrap();
1839            let status = resp.status();
1840            let headers = resp.headers().clone();
1841            let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1842                .await
1843                .unwrap();
1844            TestResponse {
1845                status,
1846                body,
1847                headers,
1848            }
1849        }
1850    }
1851
1852    /// Mimics `test_request()`.
1853    fn test_request() -> TestRequestBuilder {
1854        TestRequestBuilder::new()
1855    }
1856
1857    async fn setup_test_manager() -> SharedTenantManager {
1858        let mut mgr = TenantManager::new();
1859        let id = mgr
1860            .create_tenant(
1861                "Test Corp".into(),
1862                "test-key-123".into(),
1863                TenantQuota::default(),
1864            )
1865            .unwrap();
1866
1867        // Deploy a pipeline
1868        let tenant = mgr.get_tenant_mut(&id).unwrap();
1869        tenant
1870            .deploy_pipeline(
1871                "Test Pipeline".into(),
1872                "stream A = SensorReading .where(x > 1)".into(),
1873            )
1874            .await
1875            .unwrap();
1876
1877        Arc::new(RwLock::new(mgr))
1878    }
1879
1880    #[tokio::test]
1881    async fn test_deploy_pipeline() {
1882        let mgr = setup_test_manager().await;
1883        let routes = api_routes(mgr, None, None, None);
1884
1885        let resp = test_request()
1886            .method("POST")
1887            .path("/api/v1/pipelines")
1888            .header("x-api-key", "test-key-123")
1889            .json(&DeployPipelineRequest {
1890                name: "New Pipeline".into(),
1891                source: "stream B = Events .where(y > 10)".into(),
1892            })
1893            .reply(&routes)
1894            .await;
1895
1896        assert_eq!(resp.status(), StatusCode::CREATED);
1897        let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1898        assert_eq!(body.name, "New Pipeline");
1899        assert_eq!(body.status, "running");
1900    }
1901
1902    #[tokio::test]
1903    async fn test_deploy_invalid_api_key() {
1904        let mgr = setup_test_manager().await;
1905        let routes = api_routes(mgr, None, None, None);
1906
1907        let resp = test_request()
1908            .method("POST")
1909            .path("/api/v1/pipelines")
1910            .header("x-api-key", "wrong-key")
1911            .json(&DeployPipelineRequest {
1912                name: "Bad".into(),
1913                source: "stream X = Y .where(z > 1)".into(),
1914            })
1915            .reply(&routes)
1916            .await;
1917
1918        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1919    }
1920
1921    #[tokio::test]
1922    async fn test_deploy_invalid_vpl() {
1923        let mgr = setup_test_manager().await;
1924        let routes = api_routes(mgr, None, None, None);
1925
1926        let resp = test_request()
1927            .method("POST")
1928            .path("/api/v1/pipelines")
1929            .header("x-api-key", "test-key-123")
1930            .json(&DeployPipelineRequest {
1931                name: "Bad VPL".into(),
1932                source: "this is not valid {{{".into(),
1933            })
1934            .reply(&routes)
1935            .await;
1936
1937        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1938    }
1939
1940    #[tokio::test]
1941    async fn test_list_pipelines() {
1942        let mgr = setup_test_manager().await;
1943        let routes = api_routes(mgr, None, None, None);
1944
1945        let resp = test_request()
1946            .method("GET")
1947            .path("/api/v1/pipelines")
1948            .header("x-api-key", "test-key-123")
1949            .reply(&routes)
1950            .await;
1951
1952        assert_eq!(resp.status(), StatusCode::OK);
1953        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1954        assert_eq!(body.total, 1);
1955        assert_eq!(body.pipelines[0].name, "Test Pipeline");
1956    }
1957
1958    #[tokio::test]
1959    async fn test_usage_endpoint() {
1960        let mgr = setup_test_manager().await;
1961        let routes = api_routes(mgr, None, None, None);
1962
1963        let resp = test_request()
1964            .method("GET")
1965            .path("/api/v1/usage")
1966            .header("x-api-key", "test-key-123")
1967            .reply(&routes)
1968            .await;
1969
1970        assert_eq!(resp.status(), StatusCode::OK);
1971        let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1972        assert_eq!(body.active_pipelines, 1);
1973    }
1974
1975    #[tokio::test]
1976    async fn test_inject_event() {
1977        let mgr = setup_test_manager().await;
1978
1979        // Get pipeline ID
1980        let pipeline_id = {
1981            let m = mgr.read().await;
1982            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1983            let tenant = m.get_tenant(&tid).unwrap();
1984            tenant.pipelines.keys().next().unwrap().clone()
1985        };
1986
1987        let routes = api_routes(mgr, None, None, None);
1988
1989        let resp = test_request()
1990            .method("POST")
1991            .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1992            .header("x-api-key", "test-key-123")
1993            .json(&InjectEventRequest {
1994                event_type: "SensorReading".into(),
1995                fields: {
1996                    let mut m = serde_json::Map::new();
1997                    m.insert(
1998                        "x".into(),
1999                        serde_json::Value::Number(serde_json::Number::from(42)),
2000                    );
2001                    m
2002                },
2003            })
2004            .reply(&routes)
2005            .await;
2006
2007        assert_eq!(resp.status(), StatusCode::OK);
2008    }
2009
2010    #[test]
2011    fn test_json_to_runtime_value() {
2012        assert_eq!(
2013            json_to_runtime_value(&serde_json::json!(null)),
2014            varpulis_core::Value::Null
2015        );
2016        assert_eq!(
2017            json_to_runtime_value(&serde_json::json!(true)),
2018            varpulis_core::Value::Bool(true)
2019        );
2020        assert_eq!(
2021            json_to_runtime_value(&serde_json::json!(42)),
2022            varpulis_core::Value::Int(42)
2023        );
2024        assert_eq!(
2025            json_to_runtime_value(&serde_json::json!(1.23)),
2026            varpulis_core::Value::Float(1.23)
2027        );
2028        assert_eq!(
2029            json_to_runtime_value(&serde_json::json!("hello")),
2030            varpulis_core::Value::Str("hello".into())
2031        );
2032    }
2033
2034    #[test]
2035    fn test_error_response_format() {
2036        let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
2037        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2038    }
2039
2040    #[test]
2041    fn test_tenant_error_mapping() {
2042        let resp = tenant_error_response(TenantError::NotFound("t1".into()));
2043        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2044
2045        let resp = tenant_error_response(TenantError::RateLimitExceeded);
2046        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2047
2048        let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
2049        let resp = tenant_error_response(TenantError::ParseError(parse_err));
2050        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2051    }
2052
2053    // =========================================================================
2054    // Tenant Admin API tests
2055    // =========================================================================
2056
2057    fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
2058        let mgr = Arc::new(RwLock::new(TenantManager::new()));
2059        let key = admin_key.map(|k| k.to_string());
2060        let routes = api_routes(mgr.clone(), key, None, None);
2061        (mgr, routes)
2062    }
2063
2064    #[tokio::test]
2065    async fn test_create_tenant() {
2066        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2067
2068        let resp = test_request()
2069            .method("POST")
2070            .path("/api/v1/tenants")
2071            .header("x-admin-key", "admin-secret")
2072            .json(&CreateTenantRequest {
2073                name: "Acme Corp".into(),
2074                quota_tier: None,
2075            })
2076            .reply(&routes)
2077            .await;
2078
2079        assert_eq!(resp.status(), StatusCode::CREATED);
2080        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2081        assert_eq!(body.name, "Acme Corp");
2082        assert!(!body.api_key.is_empty());
2083        assert!(!body.id.is_empty());
2084    }
2085
2086    #[tokio::test]
2087    async fn test_list_tenants_admin() {
2088        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2089
2090        // Create two tenants
2091        for name in &["Tenant A", "Tenant B"] {
2092            test_request()
2093                .method("POST")
2094                .path("/api/v1/tenants")
2095                .header("x-admin-key", "admin-secret")
2096                .json(&CreateTenantRequest {
2097                    name: name.to_string(),
2098                    quota_tier: None,
2099                })
2100                .reply(&routes)
2101                .await;
2102        }
2103
2104        let resp = test_request()
2105            .method("GET")
2106            .path("/api/v1/tenants")
2107            .header("x-admin-key", "admin-secret")
2108            .reply(&routes)
2109            .await;
2110
2111        assert_eq!(resp.status(), StatusCode::OK);
2112        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2113        assert_eq!(body.total, 2);
2114    }
2115
2116    #[tokio::test]
2117    async fn test_get_tenant_admin() {
2118        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2119
2120        // Create a tenant
2121        let create_resp = test_request()
2122            .method("POST")
2123            .path("/api/v1/tenants")
2124            .header("x-admin-key", "admin-secret")
2125            .json(&CreateTenantRequest {
2126                name: "Detail Corp".into(),
2127                quota_tier: Some("pro".into()),
2128            })
2129            .reply(&routes)
2130            .await;
2131
2132        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2133
2134        let resp = test_request()
2135            .method("GET")
2136            .path(&format!("/api/v1/tenants/{}", created.id))
2137            .header("x-admin-key", "admin-secret")
2138            .reply(&routes)
2139            .await;
2140
2141        assert_eq!(resp.status(), StatusCode::OK);
2142        let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
2143        assert_eq!(body.name, "Detail Corp");
2144        assert_eq!(body.pipeline_count, 0);
2145        // Pro tier quotas
2146        assert_eq!(body.quota.max_pipelines, 20);
2147    }
2148
2149    #[tokio::test]
2150    async fn test_delete_tenant_admin() {
2151        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2152
2153        // Create then delete
2154        let create_resp = test_request()
2155            .method("POST")
2156            .path("/api/v1/tenants")
2157            .header("x-admin-key", "admin-secret")
2158            .json(&CreateTenantRequest {
2159                name: "Doomed".into(),
2160                quota_tier: None,
2161            })
2162            .reply(&routes)
2163            .await;
2164        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2165
2166        let resp = test_request()
2167            .method("DELETE")
2168            .path(&format!("/api/v1/tenants/{}", created.id))
2169            .header("x-admin-key", "admin-secret")
2170            .reply(&routes)
2171            .await;
2172
2173        assert_eq!(resp.status(), StatusCode::OK);
2174
2175        // Verify tenant is gone
2176        let list_resp = test_request()
2177            .method("GET")
2178            .path("/api/v1/tenants")
2179            .header("x-admin-key", "admin-secret")
2180            .reply(&routes)
2181            .await;
2182        let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2183        assert_eq!(body.total, 0);
2184    }
2185
2186    #[tokio::test]
2187    async fn test_invalid_admin_key() {
2188        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2189
2190        let resp = test_request()
2191            .method("GET")
2192            .path("/api/v1/tenants")
2193            .header("x-admin-key", "wrong-key")
2194            .reply(&routes)
2195            .await;
2196
2197        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2198    }
2199
2200    #[tokio::test]
2201    async fn test_no_admin_key_configured() {
2202        let (_mgr, routes) = setup_admin_routes(None);
2203
2204        let resp = test_request()
2205            .method("GET")
2206            .path("/api/v1/tenants")
2207            .header("x-admin-key", "anything")
2208            .reply(&routes)
2209            .await;
2210
2211        assert_eq!(resp.status(), StatusCode::FORBIDDEN);
2212    }
2213
2214    #[tokio::test]
2215    async fn test_create_tenant_tier_selection() {
2216        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2217
2218        // Free tier
2219        let resp = test_request()
2220            .method("POST")
2221            .path("/api/v1/tenants")
2222            .header("x-admin-key", "admin-secret")
2223            .json(&CreateTenantRequest {
2224                name: "Free User".into(),
2225                quota_tier: Some("free".into()),
2226            })
2227            .reply(&routes)
2228            .await;
2229        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2230        assert_eq!(body.quota.max_pipelines, 5); // free tier
2231
2232        // Enterprise tier
2233        let resp = test_request()
2234            .method("POST")
2235            .path("/api/v1/tenants")
2236            .header("x-admin-key", "admin-secret")
2237            .json(&CreateTenantRequest {
2238                name: "Enterprise User".into(),
2239                quota_tier: Some("enterprise".into()),
2240            })
2241            .reply(&routes)
2242            .await;
2243        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2244        assert_eq!(body.quota.max_pipelines, 1000); // enterprise tier
2245    }
2246
2247    // =========================================================================
2248    // Pipeline CRUD handler tests
2249    // =========================================================================
2250
2251    /// Helper: get the first pipeline ID from the test manager
2252    async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2253        let m = mgr.read().await;
2254        let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2255        let tenant = m.get_tenant(&tid).unwrap();
2256        tenant.pipelines.keys().next().unwrap().clone()
2257    }
2258
2259    #[tokio::test]
2260    async fn test_get_single_pipeline() {
2261        let mgr = setup_test_manager().await;
2262        let pipeline_id = get_first_pipeline_id(&mgr).await;
2263        let routes = api_routes(mgr, None, None, None);
2264
2265        let resp = test_request()
2266            .method("GET")
2267            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2268            .header("x-api-key", "test-key-123")
2269            .reply(&routes)
2270            .await;
2271
2272        assert_eq!(resp.status(), StatusCode::OK);
2273        let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2274        assert_eq!(body.id, pipeline_id);
2275        assert_eq!(body.name, "Test Pipeline");
2276        assert_eq!(body.status, "running");
2277        assert!(body.source.contains("SensorReading"));
2278    }
2279
2280    #[tokio::test]
2281    async fn test_get_pipeline_not_found() {
2282        let mgr = setup_test_manager().await;
2283        let routes = api_routes(mgr, None, None, None);
2284
2285        let resp = test_request()
2286            .method("GET")
2287            .path("/api/v1/pipelines/nonexistent-id")
2288            .header("x-api-key", "test-key-123")
2289            .reply(&routes)
2290            .await;
2291
2292        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2293    }
2294
2295    #[tokio::test]
2296    async fn test_delete_pipeline_api() {
2297        let mgr = setup_test_manager().await;
2298        let pipeline_id = get_first_pipeline_id(&mgr).await;
2299        let routes = api_routes(mgr.clone(), None, None, None);
2300
2301        let resp = test_request()
2302            .method("DELETE")
2303            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2304            .header("x-api-key", "test-key-123")
2305            .reply(&routes)
2306            .await;
2307
2308        assert_eq!(resp.status(), StatusCode::OK);
2309        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2310        assert_eq!(body["deleted"], true);
2311
2312        // Verify it's gone
2313        let list_resp = test_request()
2314            .method("GET")
2315            .path("/api/v1/pipelines")
2316            .header("x-api-key", "test-key-123")
2317            .reply(&routes)
2318            .await;
2319        let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2320        assert_eq!(list.total, 0);
2321    }
2322
2323    #[tokio::test]
2324    async fn test_delete_pipeline_not_found() {
2325        let mgr = setup_test_manager().await;
2326        let routes = api_routes(mgr, None, None, None);
2327
2328        let resp = test_request()
2329            .method("DELETE")
2330            .path("/api/v1/pipelines/nonexistent-id")
2331            .header("x-api-key", "test-key-123")
2332            .reply(&routes)
2333            .await;
2334
2335        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2336    }
2337
2338    // =========================================================================
2339    // Batch inject handler tests
2340    // =========================================================================
2341
2342    #[tokio::test]
2343    async fn test_inject_batch() {
2344        let mgr = setup_test_manager().await;
2345        let pipeline_id = get_first_pipeline_id(&mgr).await;
2346        let routes = api_routes(mgr, None, None, None);
2347
2348        let resp = test_request()
2349            .method("POST")
2350            .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2351            .header("x-api-key", "test-key-123")
2352            .json(&InjectBatchRequest {
2353                events: vec![
2354                    InjectEventRequest {
2355                        event_type: "SensorReading".into(),
2356                        fields: {
2357                            let mut m = serde_json::Map::new();
2358                            m.insert("x".into(), serde_json::json!(5));
2359                            m
2360                        },
2361                    },
2362                    InjectEventRequest {
2363                        event_type: "SensorReading".into(),
2364                        fields: {
2365                            let mut m = serde_json::Map::new();
2366                            m.insert("x".into(), serde_json::json!(10));
2367                            m
2368                        },
2369                    },
2370                ],
2371            })
2372            .reply(&routes)
2373            .await;
2374
2375        assert_eq!(resp.status(), StatusCode::OK);
2376        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2377        assert_eq!(body.accepted, 2);
2378        assert!(body.processing_time_us > 0);
2379    }
2380
2381    #[tokio::test]
2382    async fn test_inject_batch_invalid_pipeline() {
2383        let mgr = setup_test_manager().await;
2384        let routes = api_routes(mgr, None, None, None);
2385
2386        // Batch mode silently skips failed events (including nonexistent pipeline)
2387        let resp = test_request()
2388            .method("POST")
2389            .path("/api/v1/pipelines/nonexistent/events-batch")
2390            .header("x-api-key", "test-key-123")
2391            .json(&InjectBatchRequest {
2392                events: vec![InjectEventRequest {
2393                    event_type: "Test".into(),
2394                    fields: serde_json::Map::new(),
2395                }],
2396            })
2397            .reply(&routes)
2398            .await;
2399
2400        // Returns 200 but accepted=0 since pipeline doesn't exist
2401        assert_eq!(resp.status(), StatusCode::OK);
2402        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2403        assert_eq!(body.accepted, 0);
2404    }
2405
2406    // =========================================================================
2407    // Checkpoint/Restore handler tests
2408    // =========================================================================
2409
2410    #[tokio::test]
2411    async fn test_checkpoint_pipeline() {
2412        let mgr = setup_test_manager().await;
2413        let pipeline_id = get_first_pipeline_id(&mgr).await;
2414        let routes = api_routes(mgr, None, None, None);
2415
2416        let resp = test_request()
2417            .method("POST")
2418            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2419            .header("x-api-key", "test-key-123")
2420            .reply(&routes)
2421            .await;
2422
2423        assert_eq!(resp.status(), StatusCode::OK);
2424        let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2425        assert_eq!(body.pipeline_id, pipeline_id);
2426    }
2427
2428    #[tokio::test]
2429    async fn test_checkpoint_not_found() {
2430        let mgr = setup_test_manager().await;
2431        let routes = api_routes(mgr, None, None, None);
2432
2433        let resp = test_request()
2434            .method("POST")
2435            .path("/api/v1/pipelines/nonexistent/checkpoint")
2436            .header("x-api-key", "test-key-123")
2437            .reply(&routes)
2438            .await;
2439
2440        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2441    }
2442
2443    #[tokio::test]
2444    async fn test_restore_pipeline() {
2445        let mgr = setup_test_manager().await;
2446        let pipeline_id = get_first_pipeline_id(&mgr).await;
2447        let routes = api_routes(mgr, None, None, None);
2448
2449        // First checkpoint
2450        let cp_resp = test_request()
2451            .method("POST")
2452            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2453            .header("x-api-key", "test-key-123")
2454            .reply(&routes)
2455            .await;
2456        let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2457
2458        // Then restore
2459        let resp = test_request()
2460            .method("POST")
2461            .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2462            .header("x-api-key", "test-key-123")
2463            .json(&RestoreRequest {
2464                checkpoint: cp.checkpoint,
2465            })
2466            .reply(&routes)
2467            .await;
2468
2469        assert_eq!(resp.status(), StatusCode::OK);
2470        let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2471        assert_eq!(body.pipeline_id, pipeline_id);
2472        assert!(body.restored);
2473    }
2474
2475    #[tokio::test]
2476    async fn test_restore_not_found() {
2477        let mgr = setup_test_manager().await;
2478        let routes = api_routes(mgr, None, None, None);
2479
2480        let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2481            version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2482            window_states: std::collections::HashMap::new(),
2483            sase_states: std::collections::HashMap::new(),
2484            join_states: std::collections::HashMap::new(),
2485            variables: std::collections::HashMap::new(),
2486            events_processed: 0,
2487            output_events_emitted: 0,
2488            watermark_state: None,
2489            distinct_states: std::collections::HashMap::new(),
2490            limit_states: std::collections::HashMap::new(),
2491        };
2492
2493        let resp = test_request()
2494            .method("POST")
2495            .path("/api/v1/pipelines/nonexistent/restore")
2496            .header("x-api-key", "test-key-123")
2497            .json(&RestoreRequest { checkpoint })
2498            .reply(&routes)
2499            .await;
2500
2501        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2502    }
2503
2504    // =========================================================================
2505    // Metrics handler tests
2506    // =========================================================================
2507
2508    #[tokio::test]
2509    async fn test_metrics_endpoint() {
2510        let mgr = setup_test_manager().await;
2511        let pipeline_id = get_first_pipeline_id(&mgr).await;
2512        let routes = api_routes(mgr, None, None, None);
2513
2514        let resp = test_request()
2515            .method("GET")
2516            .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2517            .header("x-api-key", "test-key-123")
2518            .reply(&routes)
2519            .await;
2520
2521        assert_eq!(resp.status(), StatusCode::OK);
2522        let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2523        assert_eq!(body.pipeline_id, pipeline_id);
2524    }
2525
2526    #[tokio::test]
2527    async fn test_metrics_not_found() {
2528        let mgr = setup_test_manager().await;
2529        let routes = api_routes(mgr, None, None, None);
2530
2531        let resp = test_request()
2532            .method("GET")
2533            .path("/api/v1/pipelines/nonexistent/metrics")
2534            .header("x-api-key", "test-key-123")
2535            .reply(&routes)
2536            .await;
2537
2538        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2539    }
2540
2541    // =========================================================================
2542    // Reload handler tests
2543    // =========================================================================
2544
2545    #[tokio::test]
2546    async fn test_reload_pipeline() {
2547        let mgr = setup_test_manager().await;
2548        let pipeline_id = get_first_pipeline_id(&mgr).await;
2549        let routes = api_routes(mgr, None, None, None);
2550
2551        let resp = test_request()
2552            .method("POST")
2553            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2554            .header("x-api-key", "test-key-123")
2555            .json(&ReloadPipelineRequest {
2556                source: "stream B = Events .where(y > 10)".into(),
2557            })
2558            .reply(&routes)
2559            .await;
2560
2561        assert_eq!(resp.status(), StatusCode::OK);
2562        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2563        assert_eq!(body["reloaded"], true);
2564    }
2565
2566    #[tokio::test]
2567    async fn test_reload_invalid_vpl() {
2568        let mgr = setup_test_manager().await;
2569        let pipeline_id = get_first_pipeline_id(&mgr).await;
2570        let routes = api_routes(mgr, None, None, None);
2571
2572        let resp = test_request()
2573            .method("POST")
2574            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2575            .header("x-api-key", "test-key-123")
2576            .json(&ReloadPipelineRequest {
2577                source: "not valid {{{".into(),
2578            })
2579            .reply(&routes)
2580            .await;
2581
2582        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2583    }
2584
2585    #[tokio::test]
2586    async fn test_reload_not_found() {
2587        let mgr = setup_test_manager().await;
2588        let routes = api_routes(mgr, None, None, None);
2589
2590        let resp = test_request()
2591            .method("POST")
2592            .path("/api/v1/pipelines/nonexistent/reload")
2593            .header("x-api-key", "test-key-123")
2594            .json(&ReloadPipelineRequest {
2595                source: "stream B = Events .where(y > 10)".into(),
2596            })
2597            .reply(&routes)
2598            .await;
2599
2600        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2601    }
2602
2603    // =========================================================================
2604    // Logs (SSE) handler tests
2605    // =========================================================================
2606
2607    #[tokio::test]
2608    async fn test_logs_invalid_pipeline() {
2609        let mgr = setup_test_manager().await;
2610        let routes = api_routes(mgr, None, None, None);
2611
2612        let resp = test_request()
2613            .method("GET")
2614            .path("/api/v1/pipelines/nonexistent/logs")
2615            .header("x-api-key", "test-key-123")
2616            .reply(&routes)
2617            .await;
2618
2619        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2620    }
2621
2622    #[tokio::test]
2623    async fn test_logs_invalid_api_key() {
2624        let mgr = setup_test_manager().await;
2625        let pipeline_id = get_first_pipeline_id(&mgr).await;
2626        let routes = api_routes(mgr, None, None, None);
2627
2628        let resp = test_request()
2629            .method("GET")
2630            .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2631            .header("x-api-key", "wrong-key")
2632            .reply(&routes)
2633            .await;
2634
2635        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2636    }
2637
2638    // =========================================================================
2639    // json_to_runtime_value extended tests
2640    // =========================================================================
2641
2642    #[test]
2643    fn test_json_to_runtime_value_array() {
2644        let arr = serde_json::json!([1, "hello", true]);
2645        let val = json_to_runtime_value(&arr);
2646        match val {
2647            varpulis_core::Value::Array(a) => {
2648                assert_eq!(a.len(), 3);
2649                assert_eq!(a[0], varpulis_core::Value::Int(1));
2650                assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2651                assert_eq!(a[2], varpulis_core::Value::Bool(true));
2652            }
2653            _ => panic!("Expected Array"),
2654        }
2655    }
2656
2657    #[test]
2658    fn test_json_to_runtime_value_object() {
2659        let obj = serde_json::json!({"key": "val", "num": 42});
2660        let val = json_to_runtime_value(&obj);
2661        match val {
2662            varpulis_core::Value::Map(m) => {
2663                assert_eq!(m.len(), 2);
2664            }
2665            _ => panic!("Expected Map"),
2666        }
2667    }
2668
2669    #[test]
2670    fn test_json_from_value_roundtrip() {
2671        use varpulis_core::Value;
2672        assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2673        assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2674        assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2675        assert_eq!(
2676            json_from_value(&Value::Float(2.71)),
2677            serde_json::json!(2.71)
2678        );
2679        assert_eq!(
2680            json_from_value(&Value::Str("hi".into())),
2681            serde_json::json!("hi")
2682        );
2683        assert_eq!(
2684            json_from_value(&Value::Timestamp(1000000)),
2685            serde_json::json!(1000000)
2686        );
2687        assert_eq!(
2688            json_from_value(&Value::Duration(5000)),
2689            serde_json::json!(5000)
2690        );
2691    }
2692
2693    // =========================================================================
2694    // Additional tenant_error_response coverage
2695    // =========================================================================
2696
2697    #[test]
2698    fn test_tenant_error_all_variants() {
2699        let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2700        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2701
2702        let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2703        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2704
2705        let resp = tenant_error_response(TenantError::EngineError(
2706            varpulis_runtime::EngineError::Pipeline("boom".into()),
2707        ));
2708        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2709
2710        let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2711        assert_eq!(resp.status(), StatusCode::CONFLICT);
2712
2713        let resp = tenant_error_response(TenantError::BackpressureExceeded {
2714            current: 50000,
2715            max: 50000,
2716        });
2717        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2718        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2719    }
2720
2721    // =========================================================================
2722    // Pagination tests
2723    // =========================================================================
2724
2725    #[tokio::test]
2726    async fn test_list_pipelines_default_pagination() {
2727        let mgr = setup_test_manager().await;
2728        let routes = api_routes(mgr, None, None, None);
2729
2730        let resp = test_request()
2731            .method("GET")
2732            .path("/api/v1/pipelines")
2733            .header("x-api-key", "test-key-123")
2734            .reply(&routes)
2735            .await;
2736
2737        assert_eq!(resp.status(), StatusCode::OK);
2738        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2739        assert_eq!(body.total, 1);
2740        let pagination = body.pagination.unwrap();
2741        assert_eq!(pagination.total, 1);
2742        assert_eq!(pagination.offset, 0);
2743        assert_eq!(pagination.limit, 50);
2744        assert!(!pagination.has_more);
2745    }
2746
2747    #[tokio::test]
2748    async fn test_list_pipelines_with_pagination_params() {
2749        let mgr = setup_test_manager().await;
2750
2751        // Deploy two more pipelines
2752        {
2753            let mut m = mgr.write().await;
2754            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2755            let tenant = m.get_tenant_mut(&tid).unwrap();
2756            tenant
2757                .deploy_pipeline(
2758                    "Pipeline B".into(),
2759                    "stream B = Events .where(y > 2)".into(),
2760                )
2761                .await
2762                .unwrap();
2763            tenant
2764                .deploy_pipeline(
2765                    "Pipeline C".into(),
2766                    "stream C = Events .where(z > 3)".into(),
2767                )
2768                .await
2769                .unwrap();
2770        }
2771
2772        let routes = api_routes(mgr, None, None, None);
2773
2774        // First page: limit=1, offset=0
2775        let resp = test_request()
2776            .method("GET")
2777            .path("/api/v1/pipelines?limit=1&offset=0")
2778            .header("x-api-key", "test-key-123")
2779            .reply(&routes)
2780            .await;
2781
2782        assert_eq!(resp.status(), StatusCode::OK);
2783        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2784        assert_eq!(body.pipelines.len(), 1);
2785        assert_eq!(body.total, 3);
2786        let pagination = body.pagination.unwrap();
2787        assert!(pagination.has_more);
2788        assert_eq!(pagination.limit, 1);
2789
2790        // Second page: limit=1, offset=2
2791        let resp = test_request()
2792            .method("GET")
2793            .path("/api/v1/pipelines?limit=1&offset=2")
2794            .header("x-api-key", "test-key-123")
2795            .reply(&routes)
2796            .await;
2797
2798        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2799        assert_eq!(body.pipelines.len(), 1);
2800        assert_eq!(body.total, 3);
2801        assert!(!body.pagination.unwrap().has_more);
2802    }
2803
2804    #[tokio::test]
2805    async fn test_list_pipelines_limit_exceeds_max() {
2806        let mgr = setup_test_manager().await;
2807        let routes = api_routes(mgr, None, None, None);
2808
2809        let resp = test_request()
2810            .method("GET")
2811            .path("/api/v1/pipelines?limit=1001")
2812            .header("x-api-key", "test-key-123")
2813            .reply(&routes)
2814            .await;
2815
2816        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2817    }
2818
2819    #[tokio::test]
2820    async fn test_list_tenants_with_pagination() {
2821        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2822
2823        // Create 3 tenants
2824        for name in &["T1", "T2", "T3"] {
2825            test_request()
2826                .method("POST")
2827                .path("/api/v1/tenants")
2828                .header("x-admin-key", "admin-secret")
2829                .json(&CreateTenantRequest {
2830                    name: name.to_string(),
2831                    quota_tier: None,
2832                })
2833                .reply(&routes)
2834                .await;
2835        }
2836
2837        // Page through with limit=2
2838        let resp = test_request()
2839            .method("GET")
2840            .path("/api/v1/tenants?limit=2&offset=0")
2841            .header("x-admin-key", "admin-secret")
2842            .reply(&routes)
2843            .await;
2844
2845        assert_eq!(resp.status(), StatusCode::OK);
2846        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2847        assert_eq!(body.tenants.len(), 2);
2848        assert_eq!(body.total, 3);
2849        assert!(body.pagination.unwrap().has_more);
2850
2851        // Last page
2852        let resp = test_request()
2853            .method("GET")
2854            .path("/api/v1/tenants?limit=2&offset=2")
2855            .header("x-admin-key", "admin-secret")
2856            .reply(&routes)
2857            .await;
2858
2859        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2860        assert_eq!(body.tenants.len(), 1);
2861        assert!(!body.pagination.unwrap().has_more);
2862    }
2863
2864    #[tokio::test]
2865    async fn test_inject_backpressure_429() {
2866        use std::sync::atomic::Ordering;
2867
2868        let mut mgr = TenantManager::new();
2869        mgr.set_max_queue_depth(5);
2870        let id = mgr
2871            .create_tenant(
2872                "BP Corp".into(),
2873                "bp-key-123".into(),
2874                TenantQuota::default(),
2875            )
2876            .unwrap();
2877
2878        let tenant = mgr.get_tenant_mut(&id).unwrap();
2879        let pid = tenant
2880            .deploy_pipeline(
2881                "BP Pipeline".into(),
2882                "stream A = SensorReading .where(x > 1)".into(),
2883            )
2884            .await
2885            .unwrap();
2886
2887        // Simulate queue being full
2888        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2889
2890        let shared = Arc::new(RwLock::new(mgr));
2891        let routes = api_routes(shared, None, None, None);
2892
2893        let resp = test_request()
2894            .method("POST")
2895            .path(&format!("/api/v1/pipelines/{pid}/events"))
2896            .header("x-api-key", "bp-key-123")
2897            .json(&InjectEventRequest {
2898                event_type: "SensorReading".into(),
2899                fields: serde_json::Map::new(),
2900            })
2901            .reply(&routes)
2902            .await;
2903
2904        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2905        // Check Retry-After header
2906        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2907        // Check response body
2908        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2909        assert_eq!(body["code"], "queue_depth_exceeded");
2910    }
2911
2912    #[tokio::test]
2913    async fn test_inject_batch_backpressure_429() {
2914        use std::sync::atomic::Ordering;
2915
2916        let mut mgr = TenantManager::new();
2917        mgr.set_max_queue_depth(5);
2918        let id = mgr
2919            .create_tenant(
2920                "BP Batch Corp".into(),
2921                "bp-batch-key".into(),
2922                TenantQuota::default(),
2923            )
2924            .unwrap();
2925
2926        let tenant = mgr.get_tenant_mut(&id).unwrap();
2927        let pid = tenant
2928            .deploy_pipeline(
2929                "BP Batch Pipeline".into(),
2930                "stream A = SensorReading .where(x > 1)".into(),
2931            )
2932            .await
2933            .unwrap();
2934
2935        // Simulate queue being full
2936        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2937
2938        let shared = Arc::new(RwLock::new(mgr));
2939        let routes = api_routes(shared, None, None, None);
2940
2941        let resp = test_request()
2942            .method("POST")
2943            .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2944            .header("x-api-key", "bp-batch-key")
2945            .json(&InjectBatchRequest {
2946                events: vec![InjectEventRequest {
2947                    event_type: "SensorReading".into(),
2948                    fields: serde_json::Map::new(),
2949                }],
2950            })
2951            .reply(&routes)
2952            .await;
2953
2954        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2955        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2956    }
2957}