Skip to main content

varpulis_cli/
api.rs

1//! REST API for SaaS pipeline management
2//!
3//! Provides RESTful endpoints for deploying and managing CEP pipelines
4//! in a multi-tenant environment.
5
6use std::convert::Infallible;
7
8use axum::extract::{Json, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::{IntoResponse, Response};
11use axum::routing::{get, post};
12use axum::Router;
13use futures_util::stream;
14use indexmap::IndexMap;
15use rustc_hash::FxBuildHasher;
16use serde::{Deserialize, Serialize};
17use tower_http::cors::{Any, CorsLayer};
18use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
19use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
20use varpulis_runtime::Event;
21
22use crate::auth::constant_time_compare;
23use crate::billing::SharedBillingState;
24
25// =============================================================================
26// Request/Response types
27// =============================================================================
28
29#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31    pub name: String,
32    pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37    pub id: String,
38    pub name: String,
39    pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44    pub id: String,
45    pub name: String,
46    pub status: String,
47    pub source: String,
48    pub uptime_secs: u64,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub global_template_id: Option<String>,
51    /// Pipeline scope: "global", "tenant", or "own".
52    #[serde(default = "default_scope")]
53    pub scope_level: String,
54    /// Source org for inherited pipelines (None = belongs to current org).
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub inherited_from_org_id: Option<String>,
57    /// Whether the pipeline is read-only (inherited from parent/global).
58    #[serde(default)]
59    pub read_only: bool,
60}
61
62fn default_scope() -> String {
63    "own".to_string()
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct PipelineListResponse {
68    pub pipelines: Vec<PipelineInfo>,
69    pub total: usize,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub pagination: Option<PaginationMeta>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct PipelineMetricsResponse {
76    pub pipeline_id: String,
77    pub events_processed: u64,
78    pub output_events_emitted: u64,
79}
80
81#[derive(Debug, Deserialize, Serialize)]
82pub struct InjectEventRequest {
83    pub event_type: String,
84    pub fields: serde_json::Map<String, serde_json::Value>,
85}
86
87#[derive(Debug, Deserialize, Serialize)]
88pub struct InjectBatchRequest {
89    pub events: Vec<InjectEventRequest>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93pub struct InjectBatchResponse {
94    pub accepted: usize,
95    pub output_events: Vec<serde_json::Value>,
96    pub processing_time_us: u64,
97}
98
99#[derive(Debug, Deserialize, Serialize)]
100pub struct ReloadPipelineRequest {
101    pub source: String,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105pub struct CheckpointResponse {
106    pub pipeline_id: String,
107    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
108    pub events_processed: u64,
109}
110
111#[derive(Debug, Deserialize, Serialize)]
112pub struct RestoreRequest {
113    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117pub struct RestoreResponse {
118    pub pipeline_id: String,
119    pub restored: bool,
120    pub events_restored: u64,
121}
122
123#[derive(Debug, Serialize)]
124pub struct ApiError {
125    pub error: String,
126    pub code: String,
127}
128
129#[derive(Debug, Deserialize)]
130pub struct DlqQueryParams {
131    #[serde(default)]
132    pub offset: Option<usize>,
133    #[serde(default)]
134    pub limit: Option<usize>,
135}
136
137#[derive(Debug, Serialize)]
138pub struct DlqEntriesResponse {
139    pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
140    pub total: u64,
141}
142
143#[derive(Debug, Serialize)]
144pub struct DlqReplayResponse {
145    pub replayed: usize,
146}
147
148#[derive(Debug, Serialize)]
149pub struct DlqClearResponse {
150    pub cleared: bool,
151}
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct UsageResponse {
155    pub tenant_id: String,
156    pub events_processed: u64,
157    pub output_events_emitted: u64,
158    pub active_pipelines: usize,
159    pub quota: QuotaInfo,
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct QuotaInfo {
164    pub max_pipelines: usize,
165    pub max_events_per_second: u64,
166    pub max_streams_per_pipeline: usize,
167}
168
169// =============================================================================
170// Tenant Admin Request/Response types
171// =============================================================================
172
173#[derive(Debug, Deserialize, Serialize)]
174pub struct CreateTenantRequest {
175    pub name: String,
176    #[serde(default)]
177    pub quota_tier: Option<String>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub struct TenantResponse {
182    pub id: String,
183    pub name: String,
184    pub api_key: String,
185    pub quota: QuotaInfo,
186}
187
188#[derive(Debug, Serialize, Deserialize)]
189pub struct TenantListResponse {
190    pub tenants: Vec<TenantResponse>,
191    pub total: usize,
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub pagination: Option<PaginationMeta>,
194}
195
196#[derive(Debug, Serialize, Deserialize)]
197pub struct TenantDetailResponse {
198    pub id: String,
199    pub name: String,
200    pub api_key: String,
201    pub quota: QuotaInfo,
202    pub usage: TenantUsageInfo,
203    pub pipeline_count: usize,
204}
205
206#[derive(Debug, Serialize, Deserialize)]
207pub struct TenantUsageInfo {
208    pub events_processed: u64,
209    pub output_events_emitted: u64,
210    pub active_pipelines: usize,
211}
212
213// =============================================================================
214// API Routes
215// =============================================================================
216
217/// Build a tower-http CORS layer from an optional list of allowed origins.
218///
219/// - `None` or a list containing `"*"`: allow any origin (backward-compatible default).
220/// - Otherwise: restrict to the given origins.
221fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
222    use axum::http::{HeaderValue, Method};
223
224    let base = CorsLayer::new()
225        .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
226        .allow_headers([
227            "content-type".parse().unwrap(),
228            "x-api-key".parse().unwrap(),
229            "authorization".parse().unwrap(),
230        ]);
231
232    match origins {
233        Some(ref list) if !list.is_empty() && !list.iter().any(|o| o == "*") => {
234            let origins: Vec<HeaderValue> = list.iter().filter_map(|s| s.parse().ok()).collect();
235            base.allow_origin(origins)
236        }
237        _ => base.allow_origin(Any),
238    }
239}
240
241/// Shared state for the API router.
242#[derive(Debug, Clone)]
243pub struct ApiState {
244    pub manager: SharedTenantManager,
245    pub admin_key: Option<String>,
246    pub billing_state: Option<SharedBillingState>,
247    #[cfg(feature = "saas")]
248    pub db_pool: Option<varpulis_db::PgPool>,
249}
250
251/// Axum extractor for X-API-Key header.
252#[derive(Debug)]
253pub struct ApiKey(pub String);
254
255impl<S> axum::extract::FromRequestParts<S> for ApiKey
256where
257    S: Send + Sync,
258{
259    type Rejection = Response;
260
261    async fn from_request_parts(
262        parts: &mut axum::http::request::Parts,
263        _state: &S,
264    ) -> Result<Self, Self::Rejection> {
265        parts
266            .headers
267            .get("x-api-key")
268            .and_then(|v| v.to_str().ok())
269            .map(|s| Self(s.to_string()))
270            .ok_or_else(|| {
271                (
272                    StatusCode::UNAUTHORIZED,
273                    axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
274                )
275                    .into_response()
276            })
277    }
278}
279
280/// Axum extractor for X-Admin-Key header.
281#[derive(Debug)]
282pub struct AdminKey(pub String);
283
284impl<S> axum::extract::FromRequestParts<S> for AdminKey
285where
286    S: Send + Sync,
287{
288    type Rejection = Response;
289
290    async fn from_request_parts(
291        parts: &mut axum::http::request::Parts,
292        _state: &S,
293    ) -> Result<Self, Self::Rejection> {
294        parts
295            .headers
296            .get("x-admin-key")
297            .and_then(|v| v.to_str().ok())
298            .map(|s| Self(s.to_string()))
299            .ok_or_else(|| {
300                (
301                    StatusCode::UNAUTHORIZED,
302                    axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
303                )
304                    .into_response()
305            })
306    }
307}
308
309/// Build the complete API route tree
310pub fn api_routes(
311    manager: SharedTenantManager,
312    admin_key: Option<String>,
313    cors_origins: Option<Vec<String>>,
314    billing_state: Option<SharedBillingState>,
315    #[cfg(feature = "saas")] db_pool: Option<varpulis_db::PgPool>,
316) -> Router {
317    let state = ApiState {
318        manager,
319        admin_key,
320        billing_state,
321        #[cfg(feature = "saas")]
322        db_pool,
323    };
324
325    let cors = build_cors(cors_origins);
326
327    Router::new()
328        // Pipeline CRUD
329        .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
330        .route(
331            "/api/v1/pipelines/{pipeline_id}",
332            get(handle_get).delete(handle_delete),
333        )
334        // Pipeline actions
335        .route(
336            "/api/v1/pipelines/{pipeline_id}/events",
337            post(handle_inject),
338        )
339        .route(
340            "/api/v1/pipelines/{pipeline_id}/events-batch",
341            post(handle_inject_batch),
342        )
343        .route(
344            "/api/v1/pipelines/{pipeline_id}/checkpoint",
345            post(handle_checkpoint),
346        )
347        .route(
348            "/api/v1/pipelines/{pipeline_id}/restore",
349            post(handle_restore),
350        )
351        .route(
352            "/api/v1/pipelines/{pipeline_id}/metrics",
353            get(handle_metrics),
354        )
355        .route(
356            "/api/v1/pipelines/{pipeline_id}/topology",
357            get(handle_topology),
358        )
359        .route(
360            "/api/v1/pipelines/{pipeline_id}/reload",
361            post(handle_reload),
362        )
363        .route("/api/v1/usage", get(handle_usage))
364        .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
365        // DLQ routes
366        .route(
367            "/api/v1/pipelines/{pipeline_id}/dlq",
368            get(handle_dlq_get).delete(handle_dlq_clear),
369        )
370        .route(
371            "/api/v1/pipelines/{pipeline_id}/dlq/replay",
372            post(handle_dlq_replay),
373        )
374        // Tenant admin routes
375        .route(
376            "/api/v1/tenants",
377            post(handle_create_tenant).get(handle_list_tenants),
378        )
379        .route(
380            "/api/v1/tenants/{tenant_id}",
381            get(handle_get_tenant).delete(handle_delete_tenant),
382        )
383        .layer(cors)
384        .with_state(state)
385}
386
387// =============================================================================
388// Handlers
389// =============================================================================
390
391async fn handle_deploy(
392    State(state): State<ApiState>,
393    ApiKey(api_key): ApiKey,
394    Json(body): Json<DeployPipelineRequest>,
395) -> Response {
396    let manager = &state.manager;
397    let mut mgr = manager.write().await;
398
399    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
400        Some(id) => id.clone(),
401        None => {
402            return error_response(
403                StatusCode::UNAUTHORIZED,
404                "invalid_api_key",
405                "Invalid API key",
406            )
407        }
408    };
409
410    let pipeline_name = body.name.clone();
411    #[cfg(feature = "saas")]
412    let vpl_source = body.source.clone();
413
414    let result = mgr
415        .deploy_pipeline_on_tenant(&tenant_id, body.name, body.source)
416        .await;
417
418    match result {
419        Ok(id) => {
420            mgr.persist_if_needed(&tenant_id);
421
422            // Sync pipeline to DB for hierarchy-aware views
423            #[cfg(feature = "saas")]
424            if let Some(ref pool) = state.db_pool {
425                if let Ok(org_uuid) = tenant_id.0.parse::<uuid::Uuid>() {
426                    if let Err(e) = varpulis_db::repo::create_scoped_pipeline(
427                        pool,
428                        org_uuid,
429                        &pipeline_name,
430                        &vpl_source,
431                        "own",
432                    )
433                    .await
434                    {
435                        tracing::warn!("Failed to sync pipeline to DB: {}", e);
436                    }
437                }
438            }
439
440            let resp = DeployPipelineResponse {
441                id,
442                name: pipeline_name,
443                status: "running".to_string(),
444            };
445            (StatusCode::CREATED, axum::Json(&resp)).into_response()
446        }
447        Err(e) => tenant_error_response(e),
448    }
449}
450
451async fn handle_list(
452    State(state): State<ApiState>,
453    ApiKey(api_key): ApiKey,
454    Query(pagination): Query<PaginationParams>,
455) -> Response {
456    let manager = &state.manager;
457    if pagination.exceeds_max() {
458        return error_response(
459            StatusCode::BAD_REQUEST,
460            "invalid_limit",
461            &format!("limit must not exceed {MAX_LIMIT}"),
462        );
463    }
464
465    let mgr = manager.read().await;
466
467    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
468        Some(id) => id.clone(),
469        None => {
470            return error_response(
471                StatusCode::UNAUTHORIZED,
472                "invalid_api_key",
473                "Invalid API key",
474            )
475        }
476    };
477
478    let tenant = match mgr.get_tenant(&tenant_id) {
479        Some(t) => t,
480        None => {
481            return error_response(
482                StatusCode::NOT_FOUND,
483                "tenant_not_found",
484                "Tenant not found",
485            )
486        }
487    };
488
489    let all_pipelines: Vec<PipelineInfo> = tenant
490        .pipelines
491        .values()
492        .map(|p| {
493            let is_global = p.global_template_id.is_some();
494            PipelineInfo {
495                id: p.id.clone(),
496                name: p.name.clone(),
497                status: p.status.to_string(),
498                source: p.source.clone(),
499                uptime_secs: p.created_at.elapsed().as_secs(),
500                global_template_id: p.global_template_id.clone(),
501                scope_level: if is_global {
502                    "global".to_string()
503                } else {
504                    "own".to_string()
505                },
506                inherited_from_org_id: None,
507                read_only: is_global,
508            }
509        })
510        .collect();
511
512    let (pipelines, meta) = pagination.paginate(all_pipelines);
513    let total = meta.total;
514    let resp = PipelineListResponse {
515        pipelines,
516        total,
517        pagination: Some(meta),
518    };
519    axum::Json(&resp).into_response()
520}
521
522async fn handle_get(
523    State(state): State<ApiState>,
524    Path(pipeline_id): Path<String>,
525    ApiKey(api_key): ApiKey,
526) -> Response {
527    let manager = &state.manager;
528    let mgr = manager.read().await;
529
530    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
531        Some(id) => id.clone(),
532        None => {
533            return error_response(
534                StatusCode::UNAUTHORIZED,
535                "invalid_api_key",
536                "Invalid API key",
537            )
538        }
539    };
540
541    let tenant = match mgr.get_tenant(&tenant_id) {
542        Some(t) => t,
543        None => {
544            return error_response(
545                StatusCode::NOT_FOUND,
546                "tenant_not_found",
547                "Tenant not found",
548            )
549        }
550    };
551
552    match tenant.pipelines.get(&pipeline_id) {
553        Some(p) => {
554            let is_global = p.global_template_id.is_some();
555            let info = PipelineInfo {
556                id: p.id.clone(),
557                name: p.name.clone(),
558                status: p.status.to_string(),
559                source: p.source.clone(),
560                uptime_secs: p.created_at.elapsed().as_secs(),
561                global_template_id: p.global_template_id.clone(),
562                scope_level: if is_global {
563                    "global".to_string()
564                } else {
565                    "own".to_string()
566                },
567                inherited_from_org_id: None,
568                read_only: is_global,
569            };
570            axum::Json(&info).into_response()
571        }
572        None => error_response(
573            StatusCode::NOT_FOUND,
574            "pipeline_not_found",
575            "Pipeline not found",
576        ),
577    }
578}
579
580async fn handle_delete(
581    State(state): State<ApiState>,
582    Path(pipeline_id): Path<String>,
583    ApiKey(api_key): ApiKey,
584) -> Response {
585    let manager = &state.manager;
586    let mut mgr = manager.write().await;
587
588    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
589        Some(id) => id.clone(),
590        None => {
591            return error_response(
592                StatusCode::UNAUTHORIZED,
593                "invalid_api_key",
594                "Invalid API key",
595            )
596        }
597    };
598
599    #[cfg(feature = "saas")]
600    let mut pipeline_name_for_db = None;
601
602    let result = {
603        let tenant = match mgr.get_tenant_mut(&tenant_id) {
604            Some(t) => t,
605            None => {
606                return error_response(
607                    StatusCode::NOT_FOUND,
608                    "tenant_not_found",
609                    "Tenant not found",
610                )
611            }
612        };
613
614        // Protect global pipelines from tenant deletion
615        if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
616            if pipeline.global_template_id.is_some() {
617                return error_response(
618                    StatusCode::FORBIDDEN,
619                    "global_pipeline_protected",
620                    "Global pipelines can only be managed by admin",
621                );
622            }
623            #[cfg(feature = "saas")]
624            {
625                pipeline_name_for_db = Some(pipeline.name.clone());
626            }
627        }
628
629        tenant.remove_pipeline(&pipeline_id)
630    };
631
632    match result {
633        Ok(()) => {
634            mgr.persist_if_needed(&tenant_id);
635
636            // Sync deletion to DB
637            #[cfg(feature = "saas")]
638            if let Some(ref pool) = state.db_pool {
639                if let (Ok(org_uuid), Some(name)) =
640                    (tenant_id.0.parse::<uuid::Uuid>(), &pipeline_name_for_db)
641                {
642                    let _ = varpulis_db::repo::delete_pipeline_by_name(pool, org_uuid, name).await;
643                }
644            }
645
646            axum::Json(serde_json::json!({"deleted": true})).into_response()
647        }
648        Err(e) => tenant_error_response(e),
649    }
650}
651
652async fn handle_inject(
653    State(state): State<ApiState>,
654    Path(pipeline_id): Path<String>,
655    ApiKey(api_key): ApiKey,
656    Json(body): Json<InjectEventRequest>,
657) -> Response {
658    let manager = &state.manager;
659    let billing_state = &state.billing_state;
660    // Check usage limit (SaaS mode only)
661    #[cfg(feature = "saas")]
662    let mut usage_warning: Option<f64> = None;
663    #[cfg(feature = "saas")]
664    if let Some(ref bs) = billing_state {
665        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
666            match bs.check_usage_limit(org_id, 1).await {
667                crate::billing::UsageCheckResult::Exceeded(err) => {
668                    return crate::billing::usage_limit_response(&err);
669                }
670                crate::billing::UsageCheckResult::ApproachingLimit { usage_percent } => {
671                    usage_warning = Some(usage_percent);
672                }
673                crate::billing::UsageCheckResult::Ok => {}
674            }
675            // Record the event for usage tracking
676            bs.usage.write().await.record_events(org_id, 1);
677        }
678    }
679    #[cfg(not(feature = "saas"))]
680    let _ = &billing_state;
681
682    let mut mgr = manager.write().await;
683
684    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
685        Some(id) => id.clone(),
686        None => {
687            return error_response(
688                StatusCode::UNAUTHORIZED,
689                "invalid_api_key",
690                "Invalid API key",
691            )
692        }
693    };
694
695    // Check backpressure before processing
696    if let Err(e) = mgr.check_backpressure() {
697        return tenant_error_response(e);
698    }
699
700    let mut event = Event::new(body.event_type.clone());
701    for (key, value) in &body.fields {
702        let v = json_to_runtime_value(value);
703        event = event.with_field(key.as_str(), v);
704    }
705
706    match mgr
707        .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
708        .await
709    {
710        Ok(output_events) => {
711            let events_json: Vec<serde_json::Value> = output_events
712                .iter()
713                .map(|e| {
714                    let mut fields = serde_json::Map::new();
715                    for (k, v) in &e.data {
716                        fields.insert(k.to_string(), crate::websocket::value_to_json(v));
717                    }
718                    serde_json::json!({
719                        "event_type": e.event_type.to_string(),
720                        "fields": serde_json::Value::Object(fields),
721                    })
722                })
723                .collect();
724            let response = serde_json::json!({
725                "accepted": true,
726                "output_events": events_json,
727            });
728            #[cfg(feature = "saas")]
729            if let Some(pct) = usage_warning {
730                return (
731                    StatusCode::OK,
732                    [("X-Usage-Warning", format!("approaching_limit ({pct:.0}%)"))],
733                    axum::Json(response),
734                )
735                    .into_response();
736            }
737            axum::Json(response).into_response()
738        }
739        Err(e) => tenant_error_response(e),
740    }
741}
742
743async fn handle_inject_batch(
744    State(state): State<ApiState>,
745    Path(pipeline_id): Path<String>,
746    ApiKey(api_key): ApiKey,
747    Json(body): Json<InjectBatchRequest>,
748) -> Response {
749    let manager = &state.manager;
750    let billing_state = &state.billing_state;
751    let event_count = body.events.len() as i64;
752
753    // Check usage limit for the entire batch (SaaS mode only)
754    #[cfg(feature = "saas")]
755    if let Some(ref bs) = billing_state {
756        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
757            match bs.check_usage_limit(org_id, event_count).await {
758                crate::billing::UsageCheckResult::Exceeded(err) => {
759                    return crate::billing::usage_limit_response(&err);
760                }
761                crate::billing::UsageCheckResult::ApproachingLimit { .. }
762                | crate::billing::UsageCheckResult::Ok => {}
763            }
764            // Record the batch for usage tracking
765            bs.usage.write().await.record_events(org_id, event_count);
766        }
767    }
768    #[cfg(not(feature = "saas"))]
769    let _ = (&billing_state, event_count);
770
771    let mut mgr = manager.write().await;
772
773    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
774        Some(id) => id.clone(),
775        None => {
776            return error_response(
777                StatusCode::UNAUTHORIZED,
778                "invalid_api_key",
779                "Invalid API key",
780            )
781        }
782    };
783
784    // Check backpressure before processing the batch
785    if let Err(e) = mgr.check_backpressure() {
786        return tenant_error_response(e);
787    }
788
789    let start = std::time::Instant::now();
790    let mut accepted = 0usize;
791    let mut output_events = Vec::new();
792
793    for req in body.events {
794        let mut event = Event::new(req.event_type.clone());
795        for (key, value) in &req.fields {
796            let v = json_to_runtime_value(value);
797            event = event.with_field(key.as_str(), v);
798        }
799
800        match mgr
801            .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
802            .await
803        {
804            Ok(outputs) => {
805                accepted += 1;
806                for e in &outputs {
807                    let mut flat = serde_json::Map::new();
808                    flat.insert(
809                        "event_type".to_string(),
810                        serde_json::Value::String(e.event_type.to_string()),
811                    );
812                    for (k, v) in &e.data {
813                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
814                    }
815                    output_events.push(serde_json::Value::Object(flat));
816                }
817            }
818            Err(TenantError::BackpressureExceeded { .. }) => {
819                // Stop processing the rest of the batch on backpressure
820                break;
821            }
822            Err(_) => {
823                // Skip other failed events silently in batch mode
824            }
825        }
826    }
827
828    let processing_time_us = start.elapsed().as_micros() as u64;
829
830    let resp = InjectBatchResponse {
831        accepted,
832        output_events,
833        processing_time_us,
834    };
835    axum::Json(&resp).into_response()
836}
837
838async fn handle_checkpoint(
839    State(state): State<ApiState>,
840    Path(pipeline_id): Path<String>,
841    ApiKey(api_key): ApiKey,
842) -> Response {
843    let manager = &state.manager;
844    let mgr = manager.read().await;
845
846    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
847        Some(id) => id.clone(),
848        None => {
849            return error_response(
850                StatusCode::UNAUTHORIZED,
851                "invalid_api_key",
852                "Invalid API key",
853            )
854        }
855    };
856
857    let tenant = match mgr.get_tenant(&tenant_id) {
858        Some(t) => t,
859        None => {
860            return error_response(
861                StatusCode::NOT_FOUND,
862                "tenant_not_found",
863                "Tenant not found",
864            )
865        }
866    };
867
868    match tenant.checkpoint_pipeline(&pipeline_id).await {
869        Ok(checkpoint) => {
870            let resp = CheckpointResponse {
871                pipeline_id,
872                events_processed: checkpoint.events_processed,
873                checkpoint,
874            };
875            axum::Json(&resp).into_response()
876        }
877        Err(e) => tenant_error_response(e),
878    }
879}
880
881async fn handle_restore(
882    State(state): State<ApiState>,
883    Path(pipeline_id): Path<String>,
884    ApiKey(api_key): ApiKey,
885    Json(body): Json<RestoreRequest>,
886) -> Response {
887    let manager = &state.manager;
888    let mut mgr = manager.write().await;
889
890    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
891        Some(id) => id.clone(),
892        None => {
893            return error_response(
894                StatusCode::UNAUTHORIZED,
895                "invalid_api_key",
896                "Invalid API key",
897            )
898        }
899    };
900
901    let tenant = match mgr.get_tenant_mut(&tenant_id) {
902        Some(t) => t,
903        None => {
904            return error_response(
905                StatusCode::NOT_FOUND,
906                "tenant_not_found",
907                "Tenant not found",
908            )
909        }
910    };
911
912    match tenant
913        .restore_pipeline(&pipeline_id, &body.checkpoint)
914        .await
915    {
916        Ok(()) => {
917            let resp = RestoreResponse {
918                pipeline_id,
919                restored: true,
920                events_restored: body.checkpoint.events_processed,
921            };
922            axum::Json(&resp).into_response()
923        }
924        Err(e) => tenant_error_response(e),
925    }
926}
927
928async fn handle_metrics(
929    State(state): State<ApiState>,
930    Path(pipeline_id): Path<String>,
931    ApiKey(api_key): ApiKey,
932) -> Response {
933    let manager = &state.manager;
934    let mgr = manager.read().await;
935
936    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
937        Some(id) => id.clone(),
938        None => {
939            return error_response(
940                StatusCode::UNAUTHORIZED,
941                "invalid_api_key",
942                "Invalid API key",
943            )
944        }
945    };
946
947    let tenant = match mgr.get_tenant(&tenant_id) {
948        Some(t) => t,
949        None => {
950            return error_response(
951                StatusCode::NOT_FOUND,
952                "tenant_not_found",
953                "Tenant not found",
954            )
955        }
956    };
957
958    if !tenant.pipelines.contains_key(&pipeline_id) {
959        return error_response(
960            StatusCode::NOT_FOUND,
961            "pipeline_not_found",
962            "Pipeline not found",
963        );
964    }
965
966    let resp = PipelineMetricsResponse {
967        pipeline_id,
968        events_processed: tenant.usage.events_processed,
969        output_events_emitted: tenant.usage.output_events_emitted,
970    };
971    axum::Json(&resp).into_response()
972}
973
974async fn handle_topology(
975    State(state): State<ApiState>,
976    Path(pipeline_id): Path<String>,
977    ApiKey(api_key): ApiKey,
978) -> Response {
979    let manager = &state.manager;
980    let mgr = manager.read().await;
981
982    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
983        Some(id) => id.clone(),
984        None => {
985            return error_response(
986                StatusCode::UNAUTHORIZED,
987                "invalid_api_key",
988                "Invalid API key",
989            )
990        }
991    };
992
993    let tenant = match mgr.get_tenant(&tenant_id) {
994        Some(t) => t,
995        None => {
996            return error_response(
997                StatusCode::NOT_FOUND,
998                "tenant_not_found",
999                "Tenant not found",
1000            )
1001        }
1002    };
1003
1004    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1005        Some(p) => p,
1006        None => {
1007            return error_response(
1008                StatusCode::NOT_FOUND,
1009                "pipeline_not_found",
1010                "Pipeline not found",
1011            )
1012        }
1013    };
1014
1015    let engine = pipeline.engine.lock().await;
1016    let topology = engine.topology();
1017    axum::Json(&topology).into_response()
1018}
1019
1020async fn handle_reload(
1021    State(state): State<ApiState>,
1022    Path(pipeline_id): Path<String>,
1023    ApiKey(api_key): ApiKey,
1024    Json(body): Json<ReloadPipelineRequest>,
1025) -> Response {
1026    let manager = &state.manager;
1027    let mut mgr = manager.write().await;
1028
1029    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1030        Some(id) => id.clone(),
1031        None => {
1032            return error_response(
1033                StatusCode::UNAUTHORIZED,
1034                "invalid_api_key",
1035                "Invalid API key",
1036            )
1037        }
1038    };
1039
1040    let result = {
1041        let tenant = match mgr.get_tenant_mut(&tenant_id) {
1042            Some(t) => t,
1043            None => {
1044                return error_response(
1045                    StatusCode::NOT_FOUND,
1046                    "tenant_not_found",
1047                    "Tenant not found",
1048                )
1049            }
1050        };
1051
1052        // Protect global pipelines from tenant reload
1053        if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
1054            if pipeline.global_template_id.is_some() {
1055                return error_response(
1056                    StatusCode::FORBIDDEN,
1057                    "global_pipeline_protected",
1058                    "Global pipelines can only be managed by admin",
1059                );
1060            }
1061        }
1062
1063        tenant.reload_pipeline(&pipeline_id, body.source).await
1064    };
1065
1066    match result {
1067        Ok(()) => {
1068            mgr.persist_if_needed(&tenant_id);
1069            axum::Json(serde_json::json!({"reloaded": true})).into_response()
1070        }
1071        Err(e) => tenant_error_response(e),
1072    }
1073}
1074
1075async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
1076    let manager = &state.manager;
1077    let mgr = manager.read().await;
1078
1079    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1080        Some(id) => id.clone(),
1081        None => {
1082            return error_response(
1083                StatusCode::UNAUTHORIZED,
1084                "invalid_api_key",
1085                "Invalid API key",
1086            )
1087        }
1088    };
1089
1090    let tenant = match mgr.get_tenant(&tenant_id) {
1091        Some(t) => t,
1092        None => {
1093            return error_response(
1094                StatusCode::NOT_FOUND,
1095                "tenant_not_found",
1096                "Tenant not found",
1097            )
1098        }
1099    };
1100
1101    let resp = UsageResponse {
1102        tenant_id: tenant.id.to_string(),
1103        events_processed: tenant.usage.events_processed,
1104        output_events_emitted: tenant.usage.output_events_emitted,
1105        active_pipelines: tenant.usage.active_pipelines,
1106        quota: QuotaInfo {
1107            max_pipelines: tenant.quota.max_pipelines,
1108            max_events_per_second: tenant.quota.max_events_per_second,
1109            max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
1110        },
1111    };
1112    axum::Json(&resp).into_response()
1113}
1114
1115/// Handle SSE log streaming for a pipeline
1116async fn handle_logs(
1117    State(state): State<ApiState>,
1118    Path(pipeline_id): Path<String>,
1119    ApiKey(api_key): ApiKey,
1120) -> Response {
1121    let manager = &state.manager;
1122    let mgr = manager.read().await;
1123
1124    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1125        Some(id) => id.clone(),
1126        None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
1127    };
1128
1129    // Verify tenant owns this pipeline
1130    let tenant = match mgr.get_tenant(&tenant_id) {
1131        Some(t) => t,
1132        None => {
1133            return error_response(
1134                StatusCode::NOT_FOUND,
1135                "tenant_not_found",
1136                "Tenant not found",
1137            )
1138        }
1139    };
1140
1141    let rx: tokio::sync::broadcast::Receiver<Event> =
1142        match tenant.subscribe_pipeline_logs(&pipeline_id) {
1143            Ok(rx) => rx,
1144            Err(_) => {
1145                return error_response(
1146                    StatusCode::NOT_FOUND,
1147                    "pipeline_not_found",
1148                    &format!("Pipeline {pipeline_id} not found"),
1149                )
1150            }
1151        };
1152
1153    drop(mgr); // Release the read lock before streaming
1154
1155    // Create SSE stream from broadcast receiver using futures unfold
1156    let stream = stream::unfold(rx, |mut rx| async move {
1157        match rx.recv().await {
1158            Ok(event) => {
1159                let data: serde_json::Map<String, serde_json::Value> = event
1160                    .data
1161                    .iter()
1162                    .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
1163                        (k.to_string(), json_from_value(v))
1164                    })
1165                    .collect();
1166                let json = serde_json::to_string(&LogEvent {
1167                    event_type: event.event_type.to_string(),
1168                    timestamp: event.timestamp.to_rfc3339(),
1169                    data,
1170                })
1171                .unwrap_or_default();
1172                let sse = axum::response::sse::Event::default().data(json);
1173                Some((Ok::<_, Infallible>(sse), rx))
1174            }
1175            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1176                let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1177                let sse = axum::response::sse::Event::default()
1178                    .event("warning")
1179                    .data(msg);
1180                Some((Ok(sse), rx))
1181            }
1182            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1183        }
1184    });
1185
1186    axum::response::sse::Sse::new(stream)
1187        .keep_alive(axum::response::sse::KeepAlive::default())
1188        .into_response()
1189}
1190
1191#[derive(Serialize)]
1192struct LogEvent {
1193    event_type: String,
1194    timestamp: String,
1195    data: serde_json::Map<String, serde_json::Value>,
1196}
1197
1198fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1199    match v {
1200        varpulis_core::Value::Null => serde_json::Value::Null,
1201        varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1202        varpulis_core::Value::Int(i) => serde_json::json!(*i),
1203        varpulis_core::Value::Float(f) => serde_json::json!(*f),
1204        varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1205        varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1206        varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1207        varpulis_core::Value::Array(arr) => {
1208            serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1209        }
1210        varpulis_core::Value::Map(map) => {
1211            let obj: serde_json::Map<String, serde_json::Value> = map
1212                .iter()
1213                .map(|(k, v)| (k.to_string(), json_from_value(v)))
1214                .collect();
1215            serde_json::Value::Object(obj)
1216        }
1217    }
1218}
1219
1220// =============================================================================
1221// DLQ Handlers
1222// =============================================================================
1223
1224async fn handle_dlq_get(
1225    State(state): State<ApiState>,
1226    Path(pipeline_id): Path<String>,
1227    ApiKey(api_key): ApiKey,
1228    Query(params): Query<DlqQueryParams>,
1229) -> Response {
1230    let manager = &state.manager;
1231    let mgr = manager.read().await;
1232
1233    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1234        Some(id) => id.clone(),
1235        None => {
1236            return error_response(
1237                StatusCode::UNAUTHORIZED,
1238                "invalid_api_key",
1239                "Invalid API key",
1240            )
1241        }
1242    };
1243
1244    let tenant = match mgr.get_tenant(&tenant_id) {
1245        Some(t) => t,
1246        None => {
1247            return error_response(
1248                StatusCode::NOT_FOUND,
1249                "tenant_not_found",
1250                "Tenant not found",
1251            )
1252        }
1253    };
1254
1255    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1256        Some(p) => p,
1257        None => {
1258            return error_response(
1259                StatusCode::NOT_FOUND,
1260                "pipeline_not_found",
1261                "Pipeline not found",
1262            )
1263        }
1264    };
1265
1266    let engine = pipeline.engine.lock().await;
1267    let dlq = match engine.dlq() {
1268        Some(d) => d,
1269        None => {
1270            let resp = DlqEntriesResponse {
1271                entries: Vec::new(),
1272                total: 0,
1273            };
1274            return axum::Json(&resp).into_response();
1275        }
1276    };
1277
1278    let offset = params.offset.unwrap_or(0);
1279    let limit = params.limit.unwrap_or(100).min(1000);
1280
1281    match dlq.read_entries(offset, limit) {
1282        Ok(entries) => {
1283            let resp = DlqEntriesResponse {
1284                total: dlq.line_count(),
1285                entries,
1286            };
1287            axum::Json(&resp).into_response()
1288        }
1289        Err(e) => error_response(
1290            StatusCode::INTERNAL_SERVER_ERROR,
1291            "dlq_read_error",
1292            &format!("Failed to read DLQ: {e}"),
1293        ),
1294    }
1295}
1296
1297async fn handle_dlq_replay(
1298    State(state): State<ApiState>,
1299    Path(pipeline_id): Path<String>,
1300    ApiKey(api_key): ApiKey,
1301) -> Response {
1302    let manager = &state.manager;
1303    let mgr = manager.read().await;
1304
1305    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1306        Some(id) => id.clone(),
1307        None => {
1308            return error_response(
1309                StatusCode::UNAUTHORIZED,
1310                "invalid_api_key",
1311                "Invalid API key",
1312            )
1313        }
1314    };
1315
1316    let tenant = match mgr.get_tenant(&tenant_id) {
1317        Some(t) => t,
1318        None => {
1319            return error_response(
1320                StatusCode::NOT_FOUND,
1321                "tenant_not_found",
1322                "Tenant not found",
1323            )
1324        }
1325    };
1326
1327    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1328        Some(p) => p,
1329        None => {
1330            return error_response(
1331                StatusCode::NOT_FOUND,
1332                "pipeline_not_found",
1333                "Pipeline not found",
1334            )
1335        }
1336    };
1337
1338    // Read all DLQ entries
1339    let entries = {
1340        let engine = pipeline.engine.lock().await;
1341        let dlq = match engine.dlq() {
1342            Some(d) => d,
1343            None => {
1344                let resp = DlqReplayResponse { replayed: 0 };
1345                return axum::Json(&resp).into_response();
1346            }
1347        };
1348        // Read all entries (up to a reasonable limit)
1349        match dlq.read_entries(0, 100_000) {
1350            Ok(entries) => entries,
1351            Err(e) => {
1352                return error_response(
1353                    StatusCode::INTERNAL_SERVER_ERROR,
1354                    "dlq_read_error",
1355                    &format!("Failed to read DLQ: {e}"),
1356                )
1357            }
1358        }
1359    };
1360
1361    // Replay each entry as an event into the pipeline engine
1362    let mut replayed = 0usize;
1363    {
1364        let mut engine = pipeline.engine.lock().await;
1365        for entry in &entries {
1366            // Reconstruct event from the DLQ entry
1367            let event_type = entry
1368                .event
1369                .get("event_type")
1370                .and_then(|v| v.as_str())
1371                .unwrap_or("unknown");
1372            let mut event = Event::new(event_type);
1373            if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1374                for (k, v) in data {
1375                    let rv = json_to_runtime_value(v);
1376                    event = event.with_field(k.as_str(), rv);
1377                }
1378            }
1379            if engine.process(event).await.is_ok() {
1380                replayed += 1;
1381            }
1382        }
1383    }
1384
1385    let resp = DlqReplayResponse { replayed };
1386    axum::Json(&resp).into_response()
1387}
1388
1389async fn handle_dlq_clear(
1390    State(state): State<ApiState>,
1391    Path(pipeline_id): Path<String>,
1392    ApiKey(api_key): ApiKey,
1393) -> Response {
1394    let manager = &state.manager;
1395    let mgr = manager.read().await;
1396
1397    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1398        Some(id) => id.clone(),
1399        None => {
1400            return error_response(
1401                StatusCode::UNAUTHORIZED,
1402                "invalid_api_key",
1403                "Invalid API key",
1404            )
1405        }
1406    };
1407
1408    let tenant = match mgr.get_tenant(&tenant_id) {
1409        Some(t) => t,
1410        None => {
1411            return error_response(
1412                StatusCode::NOT_FOUND,
1413                "tenant_not_found",
1414                "Tenant not found",
1415            )
1416        }
1417    };
1418
1419    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1420        Some(p) => p,
1421        None => {
1422            return error_response(
1423                StatusCode::NOT_FOUND,
1424                "pipeline_not_found",
1425                "Pipeline not found",
1426            )
1427        }
1428    };
1429
1430    let engine = pipeline.engine.lock().await;
1431    match engine.dlq() {
1432        Some(dlq) => match dlq.clear() {
1433            Ok(()) => {
1434                let resp = DlqClearResponse { cleared: true };
1435                axum::Json(&resp).into_response()
1436            }
1437            Err(e) => error_response(
1438                StatusCode::INTERNAL_SERVER_ERROR,
1439                "dlq_clear_error",
1440                &format!("Failed to clear DLQ: {e}"),
1441            ),
1442        },
1443        None => {
1444            let resp = DlqClearResponse { cleared: true };
1445            axum::Json(&resp).into_response()
1446        }
1447    }
1448}
1449
1450// =============================================================================
1451// Tenant Admin Routes
1452// =============================================================================
1453
1454// Tenant admin routes are now part of the main Router in api_routes()
1455
1456#[allow(clippy::result_large_err)]
1457fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1458    match configured {
1459        None => Err(error_response(
1460            StatusCode::FORBIDDEN,
1461            "admin_disabled",
1462            "Admin API is disabled (no --api-key configured)",
1463        )),
1464        Some(key) => {
1465            if constant_time_compare(key, provided) {
1466                Ok(())
1467            } else {
1468                Err(error_response(
1469                    StatusCode::UNAUTHORIZED,
1470                    "invalid_admin_key",
1471                    "Invalid admin key",
1472                ))
1473            }
1474        }
1475    }
1476}
1477
1478fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1479    match tier {
1480        Some("free") => TenantQuota::free(),
1481        Some("pro") => TenantQuota::pro(),
1482        Some("enterprise") => TenantQuota::enterprise(),
1483        _ => TenantQuota::default(),
1484    }
1485}
1486
1487async fn handle_create_tenant(
1488    State(state): State<ApiState>,
1489    AdminKey(admin_key): AdminKey,
1490    Json(body): Json<CreateTenantRequest>,
1491) -> Response {
1492    let manager = &state.manager;
1493    let configured_key = &state.admin_key;
1494    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1495        return resp;
1496    }
1497
1498    let api_key = uuid::Uuid::new_v4().to_string();
1499    let quota = quota_from_tier(body.quota_tier.as_deref());
1500
1501    let mut mgr = manager.write().await;
1502    match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1503        Ok(tenant_id) => {
1504            let resp = TenantResponse {
1505                id: tenant_id.as_str().to_string(),
1506                name: body.name,
1507                api_key,
1508                quota: QuotaInfo {
1509                    max_pipelines: quota.max_pipelines,
1510                    max_events_per_second: quota.max_events_per_second,
1511                    max_streams_per_pipeline: quota.max_streams_per_pipeline,
1512                },
1513            };
1514            (StatusCode::CREATED, axum::Json(&resp)).into_response()
1515        }
1516        Err(e) => tenant_error_response(e),
1517    }
1518}
1519
1520async fn handle_list_tenants(
1521    State(state): State<ApiState>,
1522    AdminKey(admin_key): AdminKey,
1523    Query(pagination): Query<PaginationParams>,
1524) -> Response {
1525    let manager = &state.manager;
1526    let configured_key = &state.admin_key;
1527    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1528        return resp;
1529    }
1530
1531    if pagination.exceeds_max() {
1532        return error_response(
1533            StatusCode::BAD_REQUEST,
1534            "invalid_limit",
1535            &format!("limit must not exceed {MAX_LIMIT}"),
1536        );
1537    }
1538
1539    let mgr = manager.read().await;
1540    let all_tenants: Vec<TenantResponse> = mgr
1541        .list_tenants()
1542        .iter()
1543        .map(|t| TenantResponse {
1544            id: t.id.as_str().to_string(),
1545            name: t.name.clone(),
1546            api_key: format!("{}...", &t.api_key_hash[..8]),
1547            quota: QuotaInfo {
1548                max_pipelines: t.quota.max_pipelines,
1549                max_events_per_second: t.quota.max_events_per_second,
1550                max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1551            },
1552        })
1553        .collect();
1554    let (tenants, meta) = pagination.paginate(all_tenants);
1555    let total = meta.total;
1556    let resp = TenantListResponse {
1557        tenants,
1558        total,
1559        pagination: Some(meta),
1560    };
1561    axum::Json(&resp).into_response()
1562}
1563
1564async fn handle_get_tenant(
1565    State(state): State<ApiState>,
1566    Path(tenant_id_str): Path<String>,
1567    AdminKey(admin_key): AdminKey,
1568) -> Response {
1569    let manager = &state.manager;
1570    let configured_key = &state.admin_key;
1571    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1572        return resp;
1573    }
1574
1575    let mgr = manager.read().await;
1576    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1577    match mgr.get_tenant(&tenant_id) {
1578        Some(t) => {
1579            let resp = TenantDetailResponse {
1580                id: t.id.as_str().to_string(),
1581                name: t.name.clone(),
1582                api_key: format!("{}...", &t.api_key_hash[..8]),
1583                quota: QuotaInfo {
1584                    max_pipelines: t.quota.max_pipelines,
1585                    max_events_per_second: t.quota.max_events_per_second,
1586                    max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1587                },
1588                usage: TenantUsageInfo {
1589                    events_processed: t.usage.events_processed,
1590                    output_events_emitted: t.usage.output_events_emitted,
1591                    active_pipelines: t.usage.active_pipelines,
1592                },
1593                pipeline_count: t.pipelines.len(),
1594            };
1595            axum::Json(&resp).into_response()
1596        }
1597        None => error_response(
1598            StatusCode::NOT_FOUND,
1599            "tenant_not_found",
1600            "Tenant not found",
1601        ),
1602    }
1603}
1604
1605async fn handle_delete_tenant(
1606    State(state): State<ApiState>,
1607    Path(tenant_id_str): Path<String>,
1608    AdminKey(admin_key): AdminKey,
1609) -> Response {
1610    let manager = &state.manager;
1611    let configured_key = &state.admin_key;
1612    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1613        return resp;
1614    }
1615
1616    let mut mgr = manager.write().await;
1617    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1618    match mgr.remove_tenant(&tenant_id) {
1619        Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1620        Err(e) => tenant_error_response(e),
1621    }
1622}
1623
1624// =============================================================================
1625// Helpers
1626// =============================================================================
1627
1628fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1629    let body = ApiError {
1630        error: message.to_string(),
1631        code: code.to_string(),
1632    };
1633    (status, axum::Json(body)).into_response()
1634}
1635
1636fn tenant_error_response(err: TenantError) -> Response {
1637    // BackpressureExceeded needs a Retry-After header, handle it specially
1638    if let TenantError::BackpressureExceeded { current, max } = &err {
1639        let body = serde_json::json!({
1640            "error": format!("queue depth {current} exceeds maximum {max}"),
1641            "code": "queue_depth_exceeded",
1642            "retry_after": 1,
1643        });
1644        return (
1645            StatusCode::TOO_MANY_REQUESTS,
1646            [("Retry-After", "1"), ("Content-Type", "application/json")],
1647            serde_json::to_string(&body).unwrap_or_default(),
1648        )
1649            .into_response();
1650    }
1651
1652    let (status, code) = match &err {
1653        TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1654        TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1655        TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1656        TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1657        TenantError::BackpressureExceeded { .. } => unreachable!(),
1658        TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1659        TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1660        TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1661    };
1662    error_response(status, code, &err.to_string())
1663}
1664
1665fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1666    match v {
1667        serde_json::Value::Null => varpulis_core::Value::Null,
1668        serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1669        serde_json::Value::Number(n) => {
1670            if let Some(i) = n.as_i64() {
1671                varpulis_core::Value::Int(i)
1672            } else if let Some(f) = n.as_f64() {
1673                varpulis_core::Value::Float(f)
1674            } else {
1675                varpulis_core::Value::Null
1676            }
1677        }
1678        serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1679        serde_json::Value::Array(arr) => {
1680            varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1681        }
1682        serde_json::Value::Object(map) => {
1683            let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1684                IndexMap::with_hasher(FxBuildHasher);
1685            for (k, v) in map {
1686                m.insert(k.as_str().into(), json_to_runtime_value(v));
1687            }
1688            varpulis_core::Value::map(m)
1689        }
1690    }
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695    use std::sync::Arc;
1696
1697    use axum::body::Body;
1698    use axum::http::Request;
1699    use tokio::sync::RwLock;
1700    use tower::ServiceExt;
1701    use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1702
1703    use super::*;
1704
1705    /// Test response wrapper for axum integration tests.
1706    struct TestResponse {
1707        status: StatusCode,
1708        body: bytes::Bytes,
1709        headers: axum::http::HeaderMap,
1710    }
1711
1712    impl TestResponse {
1713        fn status(&self) -> StatusCode {
1714            self.status
1715        }
1716        fn body(&self) -> &[u8] {
1717            &self.body
1718        }
1719        fn headers(&self) -> &axum::http::HeaderMap {
1720            &self.headers
1721        }
1722    }
1723
1724    /// Test request builder for axum integration tests.
1725    struct TestRequestBuilder {
1726        method: String,
1727        path: String,
1728        headers: Vec<(String, String)>,
1729        body: Option<String>,
1730    }
1731
1732    impl TestRequestBuilder {
1733        fn new() -> Self {
1734            Self {
1735                method: "GET".to_string(),
1736                path: "/".to_string(),
1737                headers: Vec::new(),
1738                body: None,
1739            }
1740        }
1741        fn method(mut self, m: &str) -> Self {
1742            self.method = m.to_string();
1743            self
1744        }
1745        fn path(mut self, p: &str) -> Self {
1746            self.path = p.to_string();
1747            self
1748        }
1749        fn header(mut self, k: &str, v: &str) -> Self {
1750            self.headers.push((k.to_string(), v.to_string()));
1751            self
1752        }
1753        fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1754            self.body = Some(serde_json::to_string(body).unwrap());
1755            self.headers
1756                .push(("content-type".to_string(), "application/json".to_string()));
1757            self
1758        }
1759        async fn reply(self, app: &Router) -> TestResponse {
1760            let mut builder = Request::builder()
1761                .method(self.method.as_str())
1762                .uri(&self.path);
1763            for (k, v) in &self.headers {
1764                builder = builder.header(k.as_str(), v.as_str());
1765            }
1766            let body = match self.body {
1767                Some(b) => Body::from(b),
1768                None => Body::empty(),
1769            };
1770            let req = builder.body(body).unwrap();
1771            let resp = app.clone().oneshot(req).await.unwrap();
1772            let status = resp.status();
1773            let headers = resp.headers().clone();
1774            let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1775                .await
1776                .unwrap();
1777            TestResponse {
1778                status,
1779                body,
1780                headers,
1781            }
1782        }
1783    }
1784
1785    /// Mimics `test_request()`.
1786    fn test_request() -> TestRequestBuilder {
1787        TestRequestBuilder::new()
1788    }
1789
1790    async fn setup_test_manager() -> SharedTenantManager {
1791        let mut mgr = TenantManager::new();
1792        let id = mgr
1793            .create_tenant(
1794                "Test Corp".into(),
1795                "test-key-123".into(),
1796                TenantQuota::default(),
1797            )
1798            .unwrap();
1799
1800        // Deploy a pipeline
1801        let tenant = mgr.get_tenant_mut(&id).unwrap();
1802        tenant
1803            .deploy_pipeline(
1804                "Test Pipeline".into(),
1805                "stream A = SensorReading .where(x > 1)".into(),
1806            )
1807            .await
1808            .unwrap();
1809
1810        Arc::new(RwLock::new(mgr))
1811    }
1812
1813    #[tokio::test]
1814    async fn test_deploy_pipeline() {
1815        let mgr = setup_test_manager().await;
1816        let routes = api_routes(mgr, None, None, None);
1817
1818        let resp = test_request()
1819            .method("POST")
1820            .path("/api/v1/pipelines")
1821            .header("x-api-key", "test-key-123")
1822            .json(&DeployPipelineRequest {
1823                name: "New Pipeline".into(),
1824                source: "stream B = Events .where(y > 10)".into(),
1825            })
1826            .reply(&routes)
1827            .await;
1828
1829        assert_eq!(resp.status(), StatusCode::CREATED);
1830        let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1831        assert_eq!(body.name, "New Pipeline");
1832        assert_eq!(body.status, "running");
1833    }
1834
1835    #[tokio::test]
1836    async fn test_deploy_invalid_api_key() {
1837        let mgr = setup_test_manager().await;
1838        let routes = api_routes(mgr, None, None, None);
1839
1840        let resp = test_request()
1841            .method("POST")
1842            .path("/api/v1/pipelines")
1843            .header("x-api-key", "wrong-key")
1844            .json(&DeployPipelineRequest {
1845                name: "Bad".into(),
1846                source: "stream X = Y .where(z > 1)".into(),
1847            })
1848            .reply(&routes)
1849            .await;
1850
1851        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1852    }
1853
1854    #[tokio::test]
1855    async fn test_deploy_invalid_vpl() {
1856        let mgr = setup_test_manager().await;
1857        let routes = api_routes(mgr, None, None, None);
1858
1859        let resp = test_request()
1860            .method("POST")
1861            .path("/api/v1/pipelines")
1862            .header("x-api-key", "test-key-123")
1863            .json(&DeployPipelineRequest {
1864                name: "Bad VPL".into(),
1865                source: "this is not valid {{{".into(),
1866            })
1867            .reply(&routes)
1868            .await;
1869
1870        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1871    }
1872
1873    #[tokio::test]
1874    async fn test_list_pipelines() {
1875        let mgr = setup_test_manager().await;
1876        let routes = api_routes(mgr, None, None, None);
1877
1878        let resp = test_request()
1879            .method("GET")
1880            .path("/api/v1/pipelines")
1881            .header("x-api-key", "test-key-123")
1882            .reply(&routes)
1883            .await;
1884
1885        assert_eq!(resp.status(), StatusCode::OK);
1886        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1887        assert_eq!(body.total, 1);
1888        assert_eq!(body.pipelines[0].name, "Test Pipeline");
1889    }
1890
1891    #[tokio::test]
1892    async fn test_usage_endpoint() {
1893        let mgr = setup_test_manager().await;
1894        let routes = api_routes(mgr, None, None, None);
1895
1896        let resp = test_request()
1897            .method("GET")
1898            .path("/api/v1/usage")
1899            .header("x-api-key", "test-key-123")
1900            .reply(&routes)
1901            .await;
1902
1903        assert_eq!(resp.status(), StatusCode::OK);
1904        let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1905        assert_eq!(body.active_pipelines, 1);
1906    }
1907
1908    #[tokio::test]
1909    async fn test_inject_event() {
1910        let mgr = setup_test_manager().await;
1911
1912        // Get pipeline ID
1913        let pipeline_id = {
1914            let m = mgr.read().await;
1915            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1916            let tenant = m.get_tenant(&tid).unwrap();
1917            tenant.pipelines.keys().next().unwrap().clone()
1918        };
1919
1920        let routes = api_routes(mgr, None, None, None);
1921
1922        let resp = test_request()
1923            .method("POST")
1924            .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1925            .header("x-api-key", "test-key-123")
1926            .json(&InjectEventRequest {
1927                event_type: "SensorReading".into(),
1928                fields: {
1929                    let mut m = serde_json::Map::new();
1930                    m.insert(
1931                        "x".into(),
1932                        serde_json::Value::Number(serde_json::Number::from(42)),
1933                    );
1934                    m
1935                },
1936            })
1937            .reply(&routes)
1938            .await;
1939
1940        assert_eq!(resp.status(), StatusCode::OK);
1941    }
1942
1943    #[test]
1944    fn test_json_to_runtime_value() {
1945        assert_eq!(
1946            json_to_runtime_value(&serde_json::json!(null)),
1947            varpulis_core::Value::Null
1948        );
1949        assert_eq!(
1950            json_to_runtime_value(&serde_json::json!(true)),
1951            varpulis_core::Value::Bool(true)
1952        );
1953        assert_eq!(
1954            json_to_runtime_value(&serde_json::json!(42)),
1955            varpulis_core::Value::Int(42)
1956        );
1957        assert_eq!(
1958            json_to_runtime_value(&serde_json::json!(1.23)),
1959            varpulis_core::Value::Float(1.23)
1960        );
1961        assert_eq!(
1962            json_to_runtime_value(&serde_json::json!("hello")),
1963            varpulis_core::Value::Str("hello".into())
1964        );
1965    }
1966
1967    #[test]
1968    fn test_error_response_format() {
1969        let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
1970        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1971    }
1972
1973    #[test]
1974    fn test_tenant_error_mapping() {
1975        let resp = tenant_error_response(TenantError::NotFound("t1".into()));
1976        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1977
1978        let resp = tenant_error_response(TenantError::RateLimitExceeded);
1979        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
1980
1981        let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
1982        let resp = tenant_error_response(TenantError::ParseError(parse_err));
1983        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1984    }
1985
1986    // =========================================================================
1987    // Tenant Admin API tests
1988    // =========================================================================
1989
1990    fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
1991        let mgr = Arc::new(RwLock::new(TenantManager::new()));
1992        let key = admin_key.map(|k| k.to_string());
1993        let routes = api_routes(mgr.clone(), key, None, None);
1994        (mgr, routes)
1995    }
1996
1997    #[tokio::test]
1998    async fn test_create_tenant() {
1999        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2000
2001        let resp = test_request()
2002            .method("POST")
2003            .path("/api/v1/tenants")
2004            .header("x-admin-key", "admin-secret")
2005            .json(&CreateTenantRequest {
2006                name: "Acme Corp".into(),
2007                quota_tier: None,
2008            })
2009            .reply(&routes)
2010            .await;
2011
2012        assert_eq!(resp.status(), StatusCode::CREATED);
2013        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2014        assert_eq!(body.name, "Acme Corp");
2015        assert!(!body.api_key.is_empty());
2016        assert!(!body.id.is_empty());
2017    }
2018
2019    #[tokio::test]
2020    async fn test_list_tenants_admin() {
2021        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2022
2023        // Create two tenants
2024        for name in &["Tenant A", "Tenant B"] {
2025            test_request()
2026                .method("POST")
2027                .path("/api/v1/tenants")
2028                .header("x-admin-key", "admin-secret")
2029                .json(&CreateTenantRequest {
2030                    name: name.to_string(),
2031                    quota_tier: None,
2032                })
2033                .reply(&routes)
2034                .await;
2035        }
2036
2037        let resp = test_request()
2038            .method("GET")
2039            .path("/api/v1/tenants")
2040            .header("x-admin-key", "admin-secret")
2041            .reply(&routes)
2042            .await;
2043
2044        assert_eq!(resp.status(), StatusCode::OK);
2045        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2046        assert_eq!(body.total, 2);
2047    }
2048
2049    #[tokio::test]
2050    async fn test_get_tenant_admin() {
2051        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2052
2053        // Create a tenant
2054        let create_resp = test_request()
2055            .method("POST")
2056            .path("/api/v1/tenants")
2057            .header("x-admin-key", "admin-secret")
2058            .json(&CreateTenantRequest {
2059                name: "Detail Corp".into(),
2060                quota_tier: Some("pro".into()),
2061            })
2062            .reply(&routes)
2063            .await;
2064
2065        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2066
2067        let resp = test_request()
2068            .method("GET")
2069            .path(&format!("/api/v1/tenants/{}", created.id))
2070            .header("x-admin-key", "admin-secret")
2071            .reply(&routes)
2072            .await;
2073
2074        assert_eq!(resp.status(), StatusCode::OK);
2075        let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
2076        assert_eq!(body.name, "Detail Corp");
2077        assert_eq!(body.pipeline_count, 0);
2078        // Pro tier quotas
2079        assert_eq!(body.quota.max_pipelines, 20);
2080    }
2081
2082    #[tokio::test]
2083    async fn test_delete_tenant_admin() {
2084        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2085
2086        // Create then delete
2087        let create_resp = test_request()
2088            .method("POST")
2089            .path("/api/v1/tenants")
2090            .header("x-admin-key", "admin-secret")
2091            .json(&CreateTenantRequest {
2092                name: "Doomed".into(),
2093                quota_tier: None,
2094            })
2095            .reply(&routes)
2096            .await;
2097        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2098
2099        let resp = test_request()
2100            .method("DELETE")
2101            .path(&format!("/api/v1/tenants/{}", created.id))
2102            .header("x-admin-key", "admin-secret")
2103            .reply(&routes)
2104            .await;
2105
2106        assert_eq!(resp.status(), StatusCode::OK);
2107
2108        // Verify tenant is gone
2109        let list_resp = test_request()
2110            .method("GET")
2111            .path("/api/v1/tenants")
2112            .header("x-admin-key", "admin-secret")
2113            .reply(&routes)
2114            .await;
2115        let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2116        assert_eq!(body.total, 0);
2117    }
2118
2119    #[tokio::test]
2120    async fn test_invalid_admin_key() {
2121        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2122
2123        let resp = test_request()
2124            .method("GET")
2125            .path("/api/v1/tenants")
2126            .header("x-admin-key", "wrong-key")
2127            .reply(&routes)
2128            .await;
2129
2130        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2131    }
2132
2133    #[tokio::test]
2134    async fn test_no_admin_key_configured() {
2135        let (_mgr, routes) = setup_admin_routes(None);
2136
2137        let resp = test_request()
2138            .method("GET")
2139            .path("/api/v1/tenants")
2140            .header("x-admin-key", "anything")
2141            .reply(&routes)
2142            .await;
2143
2144        assert_eq!(resp.status(), StatusCode::FORBIDDEN);
2145    }
2146
2147    #[tokio::test]
2148    async fn test_create_tenant_tier_selection() {
2149        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2150
2151        // Free tier
2152        let resp = test_request()
2153            .method("POST")
2154            .path("/api/v1/tenants")
2155            .header("x-admin-key", "admin-secret")
2156            .json(&CreateTenantRequest {
2157                name: "Free User".into(),
2158                quota_tier: Some("free".into()),
2159            })
2160            .reply(&routes)
2161            .await;
2162        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2163        assert_eq!(body.quota.max_pipelines, 5); // free tier
2164
2165        // Enterprise tier
2166        let resp = test_request()
2167            .method("POST")
2168            .path("/api/v1/tenants")
2169            .header("x-admin-key", "admin-secret")
2170            .json(&CreateTenantRequest {
2171                name: "Enterprise User".into(),
2172                quota_tier: Some("enterprise".into()),
2173            })
2174            .reply(&routes)
2175            .await;
2176        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2177        assert_eq!(body.quota.max_pipelines, 1000); // enterprise tier
2178    }
2179
2180    // =========================================================================
2181    // Pipeline CRUD handler tests
2182    // =========================================================================
2183
2184    /// Helper: get the first pipeline ID from the test manager
2185    async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2186        let m = mgr.read().await;
2187        let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2188        let tenant = m.get_tenant(&tid).unwrap();
2189        tenant.pipelines.keys().next().unwrap().clone()
2190    }
2191
2192    #[tokio::test]
2193    async fn test_get_single_pipeline() {
2194        let mgr = setup_test_manager().await;
2195        let pipeline_id = get_first_pipeline_id(&mgr).await;
2196        let routes = api_routes(mgr, None, None, None);
2197
2198        let resp = test_request()
2199            .method("GET")
2200            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2201            .header("x-api-key", "test-key-123")
2202            .reply(&routes)
2203            .await;
2204
2205        assert_eq!(resp.status(), StatusCode::OK);
2206        let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2207        assert_eq!(body.id, pipeline_id);
2208        assert_eq!(body.name, "Test Pipeline");
2209        assert_eq!(body.status, "running");
2210        assert!(body.source.contains("SensorReading"));
2211    }
2212
2213    #[tokio::test]
2214    async fn test_get_pipeline_not_found() {
2215        let mgr = setup_test_manager().await;
2216        let routes = api_routes(mgr, None, None, None);
2217
2218        let resp = test_request()
2219            .method("GET")
2220            .path("/api/v1/pipelines/nonexistent-id")
2221            .header("x-api-key", "test-key-123")
2222            .reply(&routes)
2223            .await;
2224
2225        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2226    }
2227
2228    #[tokio::test]
2229    async fn test_delete_pipeline_api() {
2230        let mgr = setup_test_manager().await;
2231        let pipeline_id = get_first_pipeline_id(&mgr).await;
2232        let routes = api_routes(mgr.clone(), None, None, None);
2233
2234        let resp = test_request()
2235            .method("DELETE")
2236            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2237            .header("x-api-key", "test-key-123")
2238            .reply(&routes)
2239            .await;
2240
2241        assert_eq!(resp.status(), StatusCode::OK);
2242        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2243        assert_eq!(body["deleted"], true);
2244
2245        // Verify it's gone
2246        let list_resp = test_request()
2247            .method("GET")
2248            .path("/api/v1/pipelines")
2249            .header("x-api-key", "test-key-123")
2250            .reply(&routes)
2251            .await;
2252        let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2253        assert_eq!(list.total, 0);
2254    }
2255
2256    #[tokio::test]
2257    async fn test_delete_pipeline_not_found() {
2258        let mgr = setup_test_manager().await;
2259        let routes = api_routes(mgr, None, None, None);
2260
2261        let resp = test_request()
2262            .method("DELETE")
2263            .path("/api/v1/pipelines/nonexistent-id")
2264            .header("x-api-key", "test-key-123")
2265            .reply(&routes)
2266            .await;
2267
2268        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2269    }
2270
2271    // =========================================================================
2272    // Batch inject handler tests
2273    // =========================================================================
2274
2275    #[tokio::test]
2276    async fn test_inject_batch() {
2277        let mgr = setup_test_manager().await;
2278        let pipeline_id = get_first_pipeline_id(&mgr).await;
2279        let routes = api_routes(mgr, None, None, None);
2280
2281        let resp = test_request()
2282            .method("POST")
2283            .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2284            .header("x-api-key", "test-key-123")
2285            .json(&InjectBatchRequest {
2286                events: vec![
2287                    InjectEventRequest {
2288                        event_type: "SensorReading".into(),
2289                        fields: {
2290                            let mut m = serde_json::Map::new();
2291                            m.insert("x".into(), serde_json::json!(5));
2292                            m
2293                        },
2294                    },
2295                    InjectEventRequest {
2296                        event_type: "SensorReading".into(),
2297                        fields: {
2298                            let mut m = serde_json::Map::new();
2299                            m.insert("x".into(), serde_json::json!(10));
2300                            m
2301                        },
2302                    },
2303                ],
2304            })
2305            .reply(&routes)
2306            .await;
2307
2308        assert_eq!(resp.status(), StatusCode::OK);
2309        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2310        assert_eq!(body.accepted, 2);
2311        assert!(body.processing_time_us > 0);
2312    }
2313
2314    #[tokio::test]
2315    async fn test_inject_batch_invalid_pipeline() {
2316        let mgr = setup_test_manager().await;
2317        let routes = api_routes(mgr, None, None, None);
2318
2319        // Batch mode silently skips failed events (including nonexistent pipeline)
2320        let resp = test_request()
2321            .method("POST")
2322            .path("/api/v1/pipelines/nonexistent/events-batch")
2323            .header("x-api-key", "test-key-123")
2324            .json(&InjectBatchRequest {
2325                events: vec![InjectEventRequest {
2326                    event_type: "Test".into(),
2327                    fields: serde_json::Map::new(),
2328                }],
2329            })
2330            .reply(&routes)
2331            .await;
2332
2333        // Returns 200 but accepted=0 since pipeline doesn't exist
2334        assert_eq!(resp.status(), StatusCode::OK);
2335        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2336        assert_eq!(body.accepted, 0);
2337    }
2338
2339    // =========================================================================
2340    // Checkpoint/Restore handler tests
2341    // =========================================================================
2342
2343    #[tokio::test]
2344    async fn test_checkpoint_pipeline() {
2345        let mgr = setup_test_manager().await;
2346        let pipeline_id = get_first_pipeline_id(&mgr).await;
2347        let routes = api_routes(mgr, None, None, None);
2348
2349        let resp = test_request()
2350            .method("POST")
2351            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2352            .header("x-api-key", "test-key-123")
2353            .reply(&routes)
2354            .await;
2355
2356        assert_eq!(resp.status(), StatusCode::OK);
2357        let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2358        assert_eq!(body.pipeline_id, pipeline_id);
2359    }
2360
2361    #[tokio::test]
2362    async fn test_checkpoint_not_found() {
2363        let mgr = setup_test_manager().await;
2364        let routes = api_routes(mgr, None, None, None);
2365
2366        let resp = test_request()
2367            .method("POST")
2368            .path("/api/v1/pipelines/nonexistent/checkpoint")
2369            .header("x-api-key", "test-key-123")
2370            .reply(&routes)
2371            .await;
2372
2373        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2374    }
2375
2376    #[tokio::test]
2377    async fn test_restore_pipeline() {
2378        let mgr = setup_test_manager().await;
2379        let pipeline_id = get_first_pipeline_id(&mgr).await;
2380        let routes = api_routes(mgr, None, None, None);
2381
2382        // First checkpoint
2383        let cp_resp = test_request()
2384            .method("POST")
2385            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2386            .header("x-api-key", "test-key-123")
2387            .reply(&routes)
2388            .await;
2389        let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2390
2391        // Then restore
2392        let resp = test_request()
2393            .method("POST")
2394            .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2395            .header("x-api-key", "test-key-123")
2396            .json(&RestoreRequest {
2397                checkpoint: cp.checkpoint,
2398            })
2399            .reply(&routes)
2400            .await;
2401
2402        assert_eq!(resp.status(), StatusCode::OK);
2403        let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2404        assert_eq!(body.pipeline_id, pipeline_id);
2405        assert!(body.restored);
2406    }
2407
2408    #[tokio::test]
2409    async fn test_restore_not_found() {
2410        let mgr = setup_test_manager().await;
2411        let routes = api_routes(mgr, None, None, None);
2412
2413        let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2414            version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2415            window_states: std::collections::HashMap::new(),
2416            sase_states: std::collections::HashMap::new(),
2417            join_states: std::collections::HashMap::new(),
2418            variables: std::collections::HashMap::new(),
2419            events_processed: 0,
2420            output_events_emitted: 0,
2421            watermark_state: None,
2422            distinct_states: std::collections::HashMap::new(),
2423            limit_states: std::collections::HashMap::new(),
2424        };
2425
2426        let resp = test_request()
2427            .method("POST")
2428            .path("/api/v1/pipelines/nonexistent/restore")
2429            .header("x-api-key", "test-key-123")
2430            .json(&RestoreRequest { checkpoint })
2431            .reply(&routes)
2432            .await;
2433
2434        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2435    }
2436
2437    // =========================================================================
2438    // Metrics handler tests
2439    // =========================================================================
2440
2441    #[tokio::test]
2442    async fn test_metrics_endpoint() {
2443        let mgr = setup_test_manager().await;
2444        let pipeline_id = get_first_pipeline_id(&mgr).await;
2445        let routes = api_routes(mgr, None, None, None);
2446
2447        let resp = test_request()
2448            .method("GET")
2449            .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2450            .header("x-api-key", "test-key-123")
2451            .reply(&routes)
2452            .await;
2453
2454        assert_eq!(resp.status(), StatusCode::OK);
2455        let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2456        assert_eq!(body.pipeline_id, pipeline_id);
2457    }
2458
2459    #[tokio::test]
2460    async fn test_metrics_not_found() {
2461        let mgr = setup_test_manager().await;
2462        let routes = api_routes(mgr, None, None, None);
2463
2464        let resp = test_request()
2465            .method("GET")
2466            .path("/api/v1/pipelines/nonexistent/metrics")
2467            .header("x-api-key", "test-key-123")
2468            .reply(&routes)
2469            .await;
2470
2471        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2472    }
2473
2474    // =========================================================================
2475    // Reload handler tests
2476    // =========================================================================
2477
2478    #[tokio::test]
2479    async fn test_reload_pipeline() {
2480        let mgr = setup_test_manager().await;
2481        let pipeline_id = get_first_pipeline_id(&mgr).await;
2482        let routes = api_routes(mgr, None, None, None);
2483
2484        let resp = test_request()
2485            .method("POST")
2486            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2487            .header("x-api-key", "test-key-123")
2488            .json(&ReloadPipelineRequest {
2489                source: "stream B = Events .where(y > 10)".into(),
2490            })
2491            .reply(&routes)
2492            .await;
2493
2494        assert_eq!(resp.status(), StatusCode::OK);
2495        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2496        assert_eq!(body["reloaded"], true);
2497    }
2498
2499    #[tokio::test]
2500    async fn test_reload_invalid_vpl() {
2501        let mgr = setup_test_manager().await;
2502        let pipeline_id = get_first_pipeline_id(&mgr).await;
2503        let routes = api_routes(mgr, None, None, None);
2504
2505        let resp = test_request()
2506            .method("POST")
2507            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2508            .header("x-api-key", "test-key-123")
2509            .json(&ReloadPipelineRequest {
2510                source: "not valid {{{".into(),
2511            })
2512            .reply(&routes)
2513            .await;
2514
2515        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2516    }
2517
2518    #[tokio::test]
2519    async fn test_reload_not_found() {
2520        let mgr = setup_test_manager().await;
2521        let routes = api_routes(mgr, None, None, None);
2522
2523        let resp = test_request()
2524            .method("POST")
2525            .path("/api/v1/pipelines/nonexistent/reload")
2526            .header("x-api-key", "test-key-123")
2527            .json(&ReloadPipelineRequest {
2528                source: "stream B = Events .where(y > 10)".into(),
2529            })
2530            .reply(&routes)
2531            .await;
2532
2533        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2534    }
2535
2536    // =========================================================================
2537    // Logs (SSE) handler tests
2538    // =========================================================================
2539
2540    #[tokio::test]
2541    async fn test_logs_invalid_pipeline() {
2542        let mgr = setup_test_manager().await;
2543        let routes = api_routes(mgr, None, None, None);
2544
2545        let resp = test_request()
2546            .method("GET")
2547            .path("/api/v1/pipelines/nonexistent/logs")
2548            .header("x-api-key", "test-key-123")
2549            .reply(&routes)
2550            .await;
2551
2552        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2553    }
2554
2555    #[tokio::test]
2556    async fn test_logs_invalid_api_key() {
2557        let mgr = setup_test_manager().await;
2558        let pipeline_id = get_first_pipeline_id(&mgr).await;
2559        let routes = api_routes(mgr, None, None, None);
2560
2561        let resp = test_request()
2562            .method("GET")
2563            .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2564            .header("x-api-key", "wrong-key")
2565            .reply(&routes)
2566            .await;
2567
2568        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2569    }
2570
2571    // =========================================================================
2572    // json_to_runtime_value extended tests
2573    // =========================================================================
2574
2575    #[test]
2576    fn test_json_to_runtime_value_array() {
2577        let arr = serde_json::json!([1, "hello", true]);
2578        let val = json_to_runtime_value(&arr);
2579        match val {
2580            varpulis_core::Value::Array(a) => {
2581                assert_eq!(a.len(), 3);
2582                assert_eq!(a[0], varpulis_core::Value::Int(1));
2583                assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2584                assert_eq!(a[2], varpulis_core::Value::Bool(true));
2585            }
2586            _ => panic!("Expected Array"),
2587        }
2588    }
2589
2590    #[test]
2591    fn test_json_to_runtime_value_object() {
2592        let obj = serde_json::json!({"key": "val", "num": 42});
2593        let val = json_to_runtime_value(&obj);
2594        match val {
2595            varpulis_core::Value::Map(m) => {
2596                assert_eq!(m.len(), 2);
2597            }
2598            _ => panic!("Expected Map"),
2599        }
2600    }
2601
2602    #[test]
2603    fn test_json_from_value_roundtrip() {
2604        use varpulis_core::Value;
2605        assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2606        assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2607        assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2608        assert_eq!(
2609            json_from_value(&Value::Float(2.71)),
2610            serde_json::json!(2.71)
2611        );
2612        assert_eq!(
2613            json_from_value(&Value::Str("hi".into())),
2614            serde_json::json!("hi")
2615        );
2616        assert_eq!(
2617            json_from_value(&Value::Timestamp(1000000)),
2618            serde_json::json!(1000000)
2619        );
2620        assert_eq!(
2621            json_from_value(&Value::Duration(5000)),
2622            serde_json::json!(5000)
2623        );
2624    }
2625
2626    // =========================================================================
2627    // Additional tenant_error_response coverage
2628    // =========================================================================
2629
2630    #[test]
2631    fn test_tenant_error_all_variants() {
2632        let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2633        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2634
2635        let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2636        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2637
2638        let resp = tenant_error_response(TenantError::EngineError(
2639            varpulis_runtime::EngineError::Pipeline("boom".into()),
2640        ));
2641        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2642
2643        let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2644        assert_eq!(resp.status(), StatusCode::CONFLICT);
2645
2646        let resp = tenant_error_response(TenantError::BackpressureExceeded {
2647            current: 50000,
2648            max: 50000,
2649        });
2650        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2651        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2652    }
2653
2654    // =========================================================================
2655    // Pagination tests
2656    // =========================================================================
2657
2658    #[tokio::test]
2659    async fn test_list_pipelines_default_pagination() {
2660        let mgr = setup_test_manager().await;
2661        let routes = api_routes(mgr, None, None, None);
2662
2663        let resp = test_request()
2664            .method("GET")
2665            .path("/api/v1/pipelines")
2666            .header("x-api-key", "test-key-123")
2667            .reply(&routes)
2668            .await;
2669
2670        assert_eq!(resp.status(), StatusCode::OK);
2671        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2672        assert_eq!(body.total, 1);
2673        let pagination = body.pagination.unwrap();
2674        assert_eq!(pagination.total, 1);
2675        assert_eq!(pagination.offset, 0);
2676        assert_eq!(pagination.limit, 50);
2677        assert!(!pagination.has_more);
2678    }
2679
2680    #[tokio::test]
2681    async fn test_list_pipelines_with_pagination_params() {
2682        let mgr = setup_test_manager().await;
2683
2684        // Deploy two more pipelines
2685        {
2686            let mut m = mgr.write().await;
2687            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2688            let tenant = m.get_tenant_mut(&tid).unwrap();
2689            tenant
2690                .deploy_pipeline(
2691                    "Pipeline B".into(),
2692                    "stream B = Events .where(y > 2)".into(),
2693                )
2694                .await
2695                .unwrap();
2696            tenant
2697                .deploy_pipeline(
2698                    "Pipeline C".into(),
2699                    "stream C = Events .where(z > 3)".into(),
2700                )
2701                .await
2702                .unwrap();
2703        }
2704
2705        let routes = api_routes(mgr, None, None, None);
2706
2707        // First page: limit=1, offset=0
2708        let resp = test_request()
2709            .method("GET")
2710            .path("/api/v1/pipelines?limit=1&offset=0")
2711            .header("x-api-key", "test-key-123")
2712            .reply(&routes)
2713            .await;
2714
2715        assert_eq!(resp.status(), StatusCode::OK);
2716        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2717        assert_eq!(body.pipelines.len(), 1);
2718        assert_eq!(body.total, 3);
2719        let pagination = body.pagination.unwrap();
2720        assert!(pagination.has_more);
2721        assert_eq!(pagination.limit, 1);
2722
2723        // Second page: limit=1, offset=2
2724        let resp = test_request()
2725            .method("GET")
2726            .path("/api/v1/pipelines?limit=1&offset=2")
2727            .header("x-api-key", "test-key-123")
2728            .reply(&routes)
2729            .await;
2730
2731        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2732        assert_eq!(body.pipelines.len(), 1);
2733        assert_eq!(body.total, 3);
2734        assert!(!body.pagination.unwrap().has_more);
2735    }
2736
2737    #[tokio::test]
2738    async fn test_list_pipelines_limit_exceeds_max() {
2739        let mgr = setup_test_manager().await;
2740        let routes = api_routes(mgr, None, None, None);
2741
2742        let resp = test_request()
2743            .method("GET")
2744            .path("/api/v1/pipelines?limit=1001")
2745            .header("x-api-key", "test-key-123")
2746            .reply(&routes)
2747            .await;
2748
2749        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2750    }
2751
2752    #[tokio::test]
2753    async fn test_list_tenants_with_pagination() {
2754        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2755
2756        // Create 3 tenants
2757        for name in &["T1", "T2", "T3"] {
2758            test_request()
2759                .method("POST")
2760                .path("/api/v1/tenants")
2761                .header("x-admin-key", "admin-secret")
2762                .json(&CreateTenantRequest {
2763                    name: name.to_string(),
2764                    quota_tier: None,
2765                })
2766                .reply(&routes)
2767                .await;
2768        }
2769
2770        // Page through with limit=2
2771        let resp = test_request()
2772            .method("GET")
2773            .path("/api/v1/tenants?limit=2&offset=0")
2774            .header("x-admin-key", "admin-secret")
2775            .reply(&routes)
2776            .await;
2777
2778        assert_eq!(resp.status(), StatusCode::OK);
2779        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2780        assert_eq!(body.tenants.len(), 2);
2781        assert_eq!(body.total, 3);
2782        assert!(body.pagination.unwrap().has_more);
2783
2784        // Last page
2785        let resp = test_request()
2786            .method("GET")
2787            .path("/api/v1/tenants?limit=2&offset=2")
2788            .header("x-admin-key", "admin-secret")
2789            .reply(&routes)
2790            .await;
2791
2792        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2793        assert_eq!(body.tenants.len(), 1);
2794        assert!(!body.pagination.unwrap().has_more);
2795    }
2796
2797    #[tokio::test]
2798    async fn test_inject_backpressure_429() {
2799        use std::sync::atomic::Ordering;
2800
2801        let mut mgr = TenantManager::new();
2802        mgr.set_max_queue_depth(5);
2803        let id = mgr
2804            .create_tenant(
2805                "BP Corp".into(),
2806                "bp-key-123".into(),
2807                TenantQuota::default(),
2808            )
2809            .unwrap();
2810
2811        let tenant = mgr.get_tenant_mut(&id).unwrap();
2812        let pid = tenant
2813            .deploy_pipeline(
2814                "BP Pipeline".into(),
2815                "stream A = SensorReading .where(x > 1)".into(),
2816            )
2817            .await
2818            .unwrap();
2819
2820        // Simulate queue being full
2821        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2822
2823        let shared = Arc::new(RwLock::new(mgr));
2824        let routes = api_routes(shared, None, None, None);
2825
2826        let resp = test_request()
2827            .method("POST")
2828            .path(&format!("/api/v1/pipelines/{pid}/events"))
2829            .header("x-api-key", "bp-key-123")
2830            .json(&InjectEventRequest {
2831                event_type: "SensorReading".into(),
2832                fields: serde_json::Map::new(),
2833            })
2834            .reply(&routes)
2835            .await;
2836
2837        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2838        // Check Retry-After header
2839        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2840        // Check response body
2841        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2842        assert_eq!(body["code"], "queue_depth_exceeded");
2843    }
2844
2845    #[tokio::test]
2846    async fn test_inject_batch_backpressure_429() {
2847        use std::sync::atomic::Ordering;
2848
2849        let mut mgr = TenantManager::new();
2850        mgr.set_max_queue_depth(5);
2851        let id = mgr
2852            .create_tenant(
2853                "BP Batch Corp".into(),
2854                "bp-batch-key".into(),
2855                TenantQuota::default(),
2856            )
2857            .unwrap();
2858
2859        let tenant = mgr.get_tenant_mut(&id).unwrap();
2860        let pid = tenant
2861            .deploy_pipeline(
2862                "BP Batch Pipeline".into(),
2863                "stream A = SensorReading .where(x > 1)".into(),
2864            )
2865            .await
2866            .unwrap();
2867
2868        // Simulate queue being full
2869        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2870
2871        let shared = Arc::new(RwLock::new(mgr));
2872        let routes = api_routes(shared, None, None, None);
2873
2874        let resp = test_request()
2875            .method("POST")
2876            .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2877            .header("x-api-key", "bp-batch-key")
2878            .json(&InjectBatchRequest {
2879                events: vec![InjectEventRequest {
2880                    event_type: "SensorReading".into(),
2881                    fields: serde_json::Map::new(),
2882                }],
2883            })
2884            .reply(&routes)
2885            .await;
2886
2887        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2888        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2889    }
2890}