Skip to main content

varpulis_cli/
api.rs

1//! REST API for SaaS pipeline management
2//!
3//! Provides RESTful endpoints for deploying and managing CEP pipelines
4//! in a multi-tenant environment.
5
6use crate::auth::constant_time_compare;
7use axum::extract::{Json, Path, Query, State};
8use axum::http::StatusCode;
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, post};
11use axum::Router;
12use futures_util::stream;
13use indexmap::IndexMap;
14use rustc_hash::FxBuildHasher;
15use serde::{Deserialize, Serialize};
16use std::convert::Infallible;
17use tower_http::cors::{Any, CorsLayer};
18use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
19use varpulis_runtime::Event;
20
21use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
22
23use crate::billing::SharedBillingState;
24
25// =============================================================================
26// Request/Response types
27// =============================================================================
28
29#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31    pub name: String,
32    pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37    pub id: String,
38    pub name: String,
39    pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44    pub id: String,
45    pub name: String,
46    pub status: String,
47    pub source: String,
48    pub uptime_secs: u64,
49}
50
51#[derive(Debug, Serialize, Deserialize)]
52pub struct PipelineListResponse {
53    pub pipelines: Vec<PipelineInfo>,
54    pub total: usize,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub pagination: Option<PaginationMeta>,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct PipelineMetricsResponse {
61    pub pipeline_id: String,
62    pub events_processed: u64,
63    pub output_events_emitted: u64,
64}
65
66#[derive(Debug, Deserialize, Serialize)]
67pub struct InjectEventRequest {
68    pub event_type: String,
69    pub fields: serde_json::Map<String, serde_json::Value>,
70}
71
72#[derive(Debug, Deserialize, Serialize)]
73pub struct InjectBatchRequest {
74    pub events: Vec<InjectEventRequest>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct InjectBatchResponse {
79    pub accepted: usize,
80    pub output_events: Vec<serde_json::Value>,
81    pub processing_time_us: u64,
82}
83
84#[derive(Debug, Deserialize, Serialize)]
85pub struct ReloadPipelineRequest {
86    pub source: String,
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct CheckpointResponse {
91    pub pipeline_id: String,
92    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
93    pub events_processed: u64,
94}
95
96#[derive(Debug, Deserialize, Serialize)]
97pub struct RestoreRequest {
98    pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102pub struct RestoreResponse {
103    pub pipeline_id: String,
104    pub restored: bool,
105    pub events_restored: u64,
106}
107
108#[derive(Debug, Serialize)]
109pub struct ApiError {
110    pub error: String,
111    pub code: String,
112}
113
114#[derive(Debug, Deserialize)]
115pub struct DlqQueryParams {
116    #[serde(default)]
117    pub offset: Option<usize>,
118    #[serde(default)]
119    pub limit: Option<usize>,
120}
121
122#[derive(Debug, Serialize)]
123pub struct DlqEntriesResponse {
124    pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
125    pub total: u64,
126}
127
128#[derive(Debug, Serialize)]
129pub struct DlqReplayResponse {
130    pub replayed: usize,
131}
132
133#[derive(Debug, Serialize)]
134pub struct DlqClearResponse {
135    pub cleared: bool,
136}
137
138#[derive(Debug, Serialize, Deserialize)]
139pub struct UsageResponse {
140    pub tenant_id: String,
141    pub events_processed: u64,
142    pub output_events_emitted: u64,
143    pub active_pipelines: usize,
144    pub quota: QuotaInfo,
145}
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct QuotaInfo {
149    pub max_pipelines: usize,
150    pub max_events_per_second: u64,
151    pub max_streams_per_pipeline: usize,
152}
153
154// =============================================================================
155// Tenant Admin Request/Response types
156// =============================================================================
157
158#[derive(Debug, Deserialize, Serialize)]
159pub struct CreateTenantRequest {
160    pub name: String,
161    #[serde(default)]
162    pub quota_tier: Option<String>,
163}
164
165#[derive(Debug, Serialize, Deserialize)]
166pub struct TenantResponse {
167    pub id: String,
168    pub name: String,
169    pub api_key: String,
170    pub quota: QuotaInfo,
171}
172
173#[derive(Debug, Serialize, Deserialize)]
174pub struct TenantListResponse {
175    pub tenants: Vec<TenantResponse>,
176    pub total: usize,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub pagination: Option<PaginationMeta>,
179}
180
181#[derive(Debug, Serialize, Deserialize)]
182pub struct TenantDetailResponse {
183    pub id: String,
184    pub name: String,
185    pub api_key: String,
186    pub quota: QuotaInfo,
187    pub usage: TenantUsageInfo,
188    pub pipeline_count: usize,
189}
190
191#[derive(Debug, Serialize, Deserialize)]
192pub struct TenantUsageInfo {
193    pub events_processed: u64,
194    pub output_events_emitted: u64,
195    pub active_pipelines: usize,
196}
197
198// =============================================================================
199// API Routes
200// =============================================================================
201
202/// Build a tower-http CORS layer from an optional list of allowed origins.
203///
204/// - `None` or a list containing `"*"`: allow any origin (backward-compatible default).
205/// - Otherwise: restrict to the given origins.
206fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
207    use axum::http::{HeaderValue, Method};
208
209    let base = CorsLayer::new()
210        .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
211        .allow_headers([
212            "content-type".parse().unwrap(),
213            "x-api-key".parse().unwrap(),
214            "authorization".parse().unwrap(),
215        ]);
216
217    match origins {
218        Some(ref list) if !list.is_empty() && !list.iter().any(|o| o == "*") => {
219            let origins: Vec<HeaderValue> = list.iter().filter_map(|s| s.parse().ok()).collect();
220            base.allow_origin(origins)
221        }
222        _ => base.allow_origin(Any),
223    }
224}
225
226/// Shared state for the API router.
227#[derive(Debug, Clone)]
228pub struct ApiState {
229    pub manager: SharedTenantManager,
230    pub admin_key: Option<String>,
231    pub billing_state: Option<SharedBillingState>,
232}
233
234/// Axum extractor for X-API-Key header.
235#[derive(Debug)]
236pub struct ApiKey(pub String);
237
238impl<S> axum::extract::FromRequestParts<S> for ApiKey
239where
240    S: Send + Sync,
241{
242    type Rejection = Response;
243
244    async fn from_request_parts(
245        parts: &mut axum::http::request::Parts,
246        _state: &S,
247    ) -> Result<Self, Self::Rejection> {
248        parts
249            .headers
250            .get("x-api-key")
251            .and_then(|v| v.to_str().ok())
252            .map(|s| Self(s.to_string()))
253            .ok_or_else(|| {
254                (
255                    StatusCode::UNAUTHORIZED,
256                    axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
257                )
258                    .into_response()
259            })
260    }
261}
262
263/// Axum extractor for X-Admin-Key header.
264#[derive(Debug)]
265pub struct AdminKey(pub String);
266
267impl<S> axum::extract::FromRequestParts<S> for AdminKey
268where
269    S: Send + Sync,
270{
271    type Rejection = Response;
272
273    async fn from_request_parts(
274        parts: &mut axum::http::request::Parts,
275        _state: &S,
276    ) -> Result<Self, Self::Rejection> {
277        parts
278            .headers
279            .get("x-admin-key")
280            .and_then(|v| v.to_str().ok())
281            .map(|s| Self(s.to_string()))
282            .ok_or_else(|| {
283                (
284                    StatusCode::UNAUTHORIZED,
285                    axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
286                )
287                    .into_response()
288            })
289    }
290}
291
292/// Build the complete API route tree
293pub fn api_routes(
294    manager: SharedTenantManager,
295    admin_key: Option<String>,
296    cors_origins: Option<Vec<String>>,
297    billing_state: Option<SharedBillingState>,
298) -> Router {
299    let state = ApiState {
300        manager,
301        admin_key,
302        billing_state,
303    };
304
305    let cors = build_cors(cors_origins);
306
307    Router::new()
308        // Pipeline CRUD
309        .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
310        .route(
311            "/api/v1/pipelines/{pipeline_id}",
312            get(handle_get).delete(handle_delete),
313        )
314        // Pipeline actions
315        .route(
316            "/api/v1/pipelines/{pipeline_id}/events",
317            post(handle_inject),
318        )
319        .route(
320            "/api/v1/pipelines/{pipeline_id}/events-batch",
321            post(handle_inject_batch),
322        )
323        .route(
324            "/api/v1/pipelines/{pipeline_id}/checkpoint",
325            post(handle_checkpoint),
326        )
327        .route(
328            "/api/v1/pipelines/{pipeline_id}/restore",
329            post(handle_restore),
330        )
331        .route(
332            "/api/v1/pipelines/{pipeline_id}/metrics",
333            get(handle_metrics),
334        )
335        .route(
336            "/api/v1/pipelines/{pipeline_id}/reload",
337            post(handle_reload),
338        )
339        .route("/api/v1/usage", get(handle_usage))
340        .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
341        // DLQ routes
342        .route(
343            "/api/v1/pipelines/{pipeline_id}/dlq",
344            get(handle_dlq_get).delete(handle_dlq_clear),
345        )
346        .route(
347            "/api/v1/pipelines/{pipeline_id}/dlq/replay",
348            post(handle_dlq_replay),
349        )
350        // Tenant admin routes
351        .route(
352            "/api/v1/tenants",
353            post(handle_create_tenant).get(handle_list_tenants),
354        )
355        .route(
356            "/api/v1/tenants/{tenant_id}",
357            get(handle_get_tenant).delete(handle_delete_tenant),
358        )
359        .layer(cors)
360        .with_state(state)
361}
362
363// =============================================================================
364// Handlers
365// =============================================================================
366
367async fn handle_deploy(
368    State(state): State<ApiState>,
369    ApiKey(api_key): ApiKey,
370    Json(body): Json<DeployPipelineRequest>,
371) -> Response {
372    let manager = &state.manager;
373    let mut mgr = manager.write().await;
374
375    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
376        Some(id) => id.clone(),
377        None => {
378            return error_response(
379                StatusCode::UNAUTHORIZED,
380                "invalid_api_key",
381                "Invalid API key",
382            )
383        }
384    };
385
386    let result = mgr
387        .deploy_pipeline_on_tenant(&tenant_id, body.name.clone(), body.source)
388        .await;
389
390    match result {
391        Ok(id) => {
392            mgr.persist_if_needed(&tenant_id);
393            let resp = DeployPipelineResponse {
394                id,
395                name: body.name,
396                status: "running".to_string(),
397            };
398            (StatusCode::CREATED, axum::Json(&resp)).into_response()
399        }
400        Err(e) => tenant_error_response(e),
401    }
402}
403
404async fn handle_list(
405    State(state): State<ApiState>,
406    ApiKey(api_key): ApiKey,
407    Query(pagination): Query<PaginationParams>,
408) -> Response {
409    let manager = &state.manager;
410    if pagination.exceeds_max() {
411        return error_response(
412            StatusCode::BAD_REQUEST,
413            "invalid_limit",
414            &format!("limit must not exceed {MAX_LIMIT}"),
415        );
416    }
417
418    let mgr = manager.read().await;
419
420    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
421        Some(id) => id.clone(),
422        None => {
423            return error_response(
424                StatusCode::UNAUTHORIZED,
425                "invalid_api_key",
426                "Invalid API key",
427            )
428        }
429    };
430
431    let tenant = match mgr.get_tenant(&tenant_id) {
432        Some(t) => t,
433        None => {
434            return error_response(
435                StatusCode::NOT_FOUND,
436                "tenant_not_found",
437                "Tenant not found",
438            )
439        }
440    };
441
442    let all_pipelines: Vec<PipelineInfo> = tenant
443        .pipelines
444        .values()
445        .map(|p| PipelineInfo {
446            id: p.id.clone(),
447            name: p.name.clone(),
448            status: p.status.to_string(),
449            source: p.source.clone(),
450            uptime_secs: p.created_at.elapsed().as_secs(),
451        })
452        .collect();
453
454    let (pipelines, meta) = pagination.paginate(all_pipelines);
455    let total = meta.total;
456    let resp = PipelineListResponse {
457        pipelines,
458        total,
459        pagination: Some(meta),
460    };
461    axum::Json(&resp).into_response()
462}
463
464async fn handle_get(
465    State(state): State<ApiState>,
466    Path(pipeline_id): Path<String>,
467    ApiKey(api_key): ApiKey,
468) -> Response {
469    let manager = &state.manager;
470    let mgr = manager.read().await;
471
472    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
473        Some(id) => id.clone(),
474        None => {
475            return error_response(
476                StatusCode::UNAUTHORIZED,
477                "invalid_api_key",
478                "Invalid API key",
479            )
480        }
481    };
482
483    let tenant = match mgr.get_tenant(&tenant_id) {
484        Some(t) => t,
485        None => {
486            return error_response(
487                StatusCode::NOT_FOUND,
488                "tenant_not_found",
489                "Tenant not found",
490            )
491        }
492    };
493
494    match tenant.pipelines.get(&pipeline_id) {
495        Some(p) => {
496            let info = PipelineInfo {
497                id: p.id.clone(),
498                name: p.name.clone(),
499                status: p.status.to_string(),
500                source: p.source.clone(),
501                uptime_secs: p.created_at.elapsed().as_secs(),
502            };
503            axum::Json(&info).into_response()
504        }
505        None => error_response(
506            StatusCode::NOT_FOUND,
507            "pipeline_not_found",
508            "Pipeline not found",
509        ),
510    }
511}
512
513async fn handle_delete(
514    State(state): State<ApiState>,
515    Path(pipeline_id): Path<String>,
516    ApiKey(api_key): ApiKey,
517) -> Response {
518    let manager = &state.manager;
519    let mut mgr = manager.write().await;
520
521    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
522        Some(id) => id.clone(),
523        None => {
524            return error_response(
525                StatusCode::UNAUTHORIZED,
526                "invalid_api_key",
527                "Invalid API key",
528            )
529        }
530    };
531
532    let result = {
533        let tenant = match mgr.get_tenant_mut(&tenant_id) {
534            Some(t) => t,
535            None => {
536                return error_response(
537                    StatusCode::NOT_FOUND,
538                    "tenant_not_found",
539                    "Tenant not found",
540                )
541            }
542        };
543        tenant.remove_pipeline(&pipeline_id)
544    };
545
546    match result {
547        Ok(()) => {
548            mgr.persist_if_needed(&tenant_id);
549            axum::Json(serde_json::json!({"deleted": true})).into_response()
550        }
551        Err(e) => tenant_error_response(e),
552    }
553}
554
555async fn handle_inject(
556    State(state): State<ApiState>,
557    Path(pipeline_id): Path<String>,
558    ApiKey(api_key): ApiKey,
559    Json(body): Json<InjectEventRequest>,
560) -> Response {
561    let manager = &state.manager;
562    let billing_state = &state.billing_state;
563    // Check usage limit (SaaS mode only)
564    #[cfg(feature = "saas")]
565    if let Some(ref bs) = billing_state {
566        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
567            if let Err(err) = bs.check_usage_limit(org_id, 1).await {
568                return crate::billing::usage_limit_response(&err);
569            }
570            // Record the event for usage tracking
571            bs.usage.write().await.record_events(org_id, 1);
572        }
573    }
574    #[cfg(not(feature = "saas"))]
575    let _ = &billing_state;
576
577    let mut mgr = manager.write().await;
578
579    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
580        Some(id) => id.clone(),
581        None => {
582            return error_response(
583                StatusCode::UNAUTHORIZED,
584                "invalid_api_key",
585                "Invalid API key",
586            )
587        }
588    };
589
590    // Check backpressure before processing
591    if let Err(e) = mgr.check_backpressure() {
592        return tenant_error_response(e);
593    }
594
595    let mut event = Event::new(body.event_type.clone());
596    for (key, value) in &body.fields {
597        let v = json_to_runtime_value(value);
598        event = event.with_field(key.as_str(), v);
599    }
600
601    match mgr
602        .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
603        .await
604    {
605        Ok(output_events) => {
606            let events_json: Vec<serde_json::Value> = output_events
607                .iter()
608                .map(|e| {
609                    let mut fields = serde_json::Map::new();
610                    for (k, v) in &e.data {
611                        fields.insert(k.to_string(), crate::websocket::value_to_json(v));
612                    }
613                    serde_json::json!({
614                        "event_type": e.event_type.to_string(),
615                        "fields": serde_json::Value::Object(fields),
616                    })
617                })
618                .collect();
619            let response = serde_json::json!({
620                "accepted": true,
621                "output_events": events_json,
622            });
623            axum::Json(response).into_response()
624        }
625        Err(e) => tenant_error_response(e),
626    }
627}
628
629async fn handle_inject_batch(
630    State(state): State<ApiState>,
631    Path(pipeline_id): Path<String>,
632    ApiKey(api_key): ApiKey,
633    Json(body): Json<InjectBatchRequest>,
634) -> Response {
635    let manager = &state.manager;
636    let billing_state = &state.billing_state;
637    let event_count = body.events.len() as i64;
638
639    // Check usage limit for the entire batch (SaaS mode only)
640    #[cfg(feature = "saas")]
641    if let Some(ref bs) = billing_state {
642        if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
643            if let Err(err) = bs.check_usage_limit(org_id, event_count).await {
644                return crate::billing::usage_limit_response(&err);
645            }
646            // Record the batch for usage tracking
647            bs.usage.write().await.record_events(org_id, event_count);
648        }
649    }
650    #[cfg(not(feature = "saas"))]
651    let _ = (&billing_state, event_count);
652
653    let mut mgr = manager.write().await;
654
655    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
656        Some(id) => id.clone(),
657        None => {
658            return error_response(
659                StatusCode::UNAUTHORIZED,
660                "invalid_api_key",
661                "Invalid API key",
662            )
663        }
664    };
665
666    // Check backpressure before processing the batch
667    if let Err(e) = mgr.check_backpressure() {
668        return tenant_error_response(e);
669    }
670
671    let start = std::time::Instant::now();
672    let mut accepted = 0usize;
673    let mut output_events = Vec::new();
674
675    for req in body.events {
676        let mut event = Event::new(req.event_type.clone());
677        for (key, value) in &req.fields {
678            let v = json_to_runtime_value(value);
679            event = event.with_field(key.as_str(), v);
680        }
681
682        match mgr
683            .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
684            .await
685        {
686            Ok(outputs) => {
687                accepted += 1;
688                for e in &outputs {
689                    let mut flat = serde_json::Map::new();
690                    flat.insert(
691                        "event_type".to_string(),
692                        serde_json::Value::String(e.event_type.to_string()),
693                    );
694                    for (k, v) in &e.data {
695                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
696                    }
697                    output_events.push(serde_json::Value::Object(flat));
698                }
699            }
700            Err(TenantError::BackpressureExceeded { .. }) => {
701                // Stop processing the rest of the batch on backpressure
702                break;
703            }
704            Err(_) => {
705                // Skip other failed events silently in batch mode
706            }
707        }
708    }
709
710    let processing_time_us = start.elapsed().as_micros() as u64;
711
712    let resp = InjectBatchResponse {
713        accepted,
714        output_events,
715        processing_time_us,
716    };
717    axum::Json(&resp).into_response()
718}
719
720async fn handle_checkpoint(
721    State(state): State<ApiState>,
722    Path(pipeline_id): Path<String>,
723    ApiKey(api_key): ApiKey,
724) -> Response {
725    let manager = &state.manager;
726    let mgr = manager.read().await;
727
728    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
729        Some(id) => id.clone(),
730        None => {
731            return error_response(
732                StatusCode::UNAUTHORIZED,
733                "invalid_api_key",
734                "Invalid API key",
735            )
736        }
737    };
738
739    let tenant = match mgr.get_tenant(&tenant_id) {
740        Some(t) => t,
741        None => {
742            return error_response(
743                StatusCode::NOT_FOUND,
744                "tenant_not_found",
745                "Tenant not found",
746            )
747        }
748    };
749
750    match tenant.checkpoint_pipeline(&pipeline_id).await {
751        Ok(checkpoint) => {
752            let resp = CheckpointResponse {
753                pipeline_id,
754                events_processed: checkpoint.events_processed,
755                checkpoint,
756            };
757            axum::Json(&resp).into_response()
758        }
759        Err(e) => tenant_error_response(e),
760    }
761}
762
763async fn handle_restore(
764    State(state): State<ApiState>,
765    Path(pipeline_id): Path<String>,
766    ApiKey(api_key): ApiKey,
767    Json(body): Json<RestoreRequest>,
768) -> Response {
769    let manager = &state.manager;
770    let mut mgr = manager.write().await;
771
772    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
773        Some(id) => id.clone(),
774        None => {
775            return error_response(
776                StatusCode::UNAUTHORIZED,
777                "invalid_api_key",
778                "Invalid API key",
779            )
780        }
781    };
782
783    let tenant = match mgr.get_tenant_mut(&tenant_id) {
784        Some(t) => t,
785        None => {
786            return error_response(
787                StatusCode::NOT_FOUND,
788                "tenant_not_found",
789                "Tenant not found",
790            )
791        }
792    };
793
794    match tenant
795        .restore_pipeline(&pipeline_id, &body.checkpoint)
796        .await
797    {
798        Ok(()) => {
799            let resp = RestoreResponse {
800                pipeline_id,
801                restored: true,
802                events_restored: body.checkpoint.events_processed,
803            };
804            axum::Json(&resp).into_response()
805        }
806        Err(e) => tenant_error_response(e),
807    }
808}
809
810async fn handle_metrics(
811    State(state): State<ApiState>,
812    Path(pipeline_id): Path<String>,
813    ApiKey(api_key): ApiKey,
814) -> Response {
815    let manager = &state.manager;
816    let mgr = manager.read().await;
817
818    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
819        Some(id) => id.clone(),
820        None => {
821            return error_response(
822                StatusCode::UNAUTHORIZED,
823                "invalid_api_key",
824                "Invalid API key",
825            )
826        }
827    };
828
829    let tenant = match mgr.get_tenant(&tenant_id) {
830        Some(t) => t,
831        None => {
832            return error_response(
833                StatusCode::NOT_FOUND,
834                "tenant_not_found",
835                "Tenant not found",
836            )
837        }
838    };
839
840    if !tenant.pipelines.contains_key(&pipeline_id) {
841        return error_response(
842            StatusCode::NOT_FOUND,
843            "pipeline_not_found",
844            "Pipeline not found",
845        );
846    }
847
848    let resp = PipelineMetricsResponse {
849        pipeline_id,
850        events_processed: tenant.usage.events_processed,
851        output_events_emitted: tenant.usage.output_events_emitted,
852    };
853    axum::Json(&resp).into_response()
854}
855
856async fn handle_reload(
857    State(state): State<ApiState>,
858    Path(pipeline_id): Path<String>,
859    ApiKey(api_key): ApiKey,
860    Json(body): Json<ReloadPipelineRequest>,
861) -> Response {
862    let manager = &state.manager;
863    let mut mgr = manager.write().await;
864
865    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
866        Some(id) => id.clone(),
867        None => {
868            return error_response(
869                StatusCode::UNAUTHORIZED,
870                "invalid_api_key",
871                "Invalid API key",
872            )
873        }
874    };
875
876    let result = {
877        let tenant = match mgr.get_tenant_mut(&tenant_id) {
878            Some(t) => t,
879            None => {
880                return error_response(
881                    StatusCode::NOT_FOUND,
882                    "tenant_not_found",
883                    "Tenant not found",
884                )
885            }
886        };
887        tenant.reload_pipeline(&pipeline_id, body.source).await
888    };
889
890    match result {
891        Ok(()) => {
892            mgr.persist_if_needed(&tenant_id);
893            axum::Json(serde_json::json!({"reloaded": true})).into_response()
894        }
895        Err(e) => tenant_error_response(e),
896    }
897}
898
899async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
900    let manager = &state.manager;
901    let mgr = manager.read().await;
902
903    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
904        Some(id) => id.clone(),
905        None => {
906            return error_response(
907                StatusCode::UNAUTHORIZED,
908                "invalid_api_key",
909                "Invalid API key",
910            )
911        }
912    };
913
914    let tenant = match mgr.get_tenant(&tenant_id) {
915        Some(t) => t,
916        None => {
917            return error_response(
918                StatusCode::NOT_FOUND,
919                "tenant_not_found",
920                "Tenant not found",
921            )
922        }
923    };
924
925    let resp = UsageResponse {
926        tenant_id: tenant.id.to_string(),
927        events_processed: tenant.usage.events_processed,
928        output_events_emitted: tenant.usage.output_events_emitted,
929        active_pipelines: tenant.usage.active_pipelines,
930        quota: QuotaInfo {
931            max_pipelines: tenant.quota.max_pipelines,
932            max_events_per_second: tenant.quota.max_events_per_second,
933            max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
934        },
935    };
936    axum::Json(&resp).into_response()
937}
938
939/// Handle SSE log streaming for a pipeline
940async fn handle_logs(
941    State(state): State<ApiState>,
942    Path(pipeline_id): Path<String>,
943    ApiKey(api_key): ApiKey,
944) -> Response {
945    let manager = &state.manager;
946    let mgr = manager.read().await;
947
948    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
949        Some(id) => id.clone(),
950        None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
951    };
952
953    // Verify tenant owns this pipeline
954    let tenant = match mgr.get_tenant(&tenant_id) {
955        Some(t) => t,
956        None => {
957            return error_response(
958                StatusCode::NOT_FOUND,
959                "tenant_not_found",
960                "Tenant not found",
961            )
962        }
963    };
964
965    let rx: tokio::sync::broadcast::Receiver<Event> =
966        match tenant.subscribe_pipeline_logs(&pipeline_id) {
967            Ok(rx) => rx,
968            Err(_) => {
969                return error_response(
970                    StatusCode::NOT_FOUND,
971                    "pipeline_not_found",
972                    &format!("Pipeline {pipeline_id} not found"),
973                )
974            }
975        };
976
977    drop(mgr); // Release the read lock before streaming
978
979    // Create SSE stream from broadcast receiver using futures unfold
980    let stream = stream::unfold(rx, |mut rx| async move {
981        match rx.recv().await {
982            Ok(event) => {
983                let data: serde_json::Map<String, serde_json::Value> = event
984                    .data
985                    .iter()
986                    .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
987                        (k.to_string(), json_from_value(v))
988                    })
989                    .collect();
990                let json = serde_json::to_string(&LogEvent {
991                    event_type: event.event_type.to_string(),
992                    timestamp: event.timestamp.to_rfc3339(),
993                    data,
994                })
995                .unwrap_or_default();
996                let sse = axum::response::sse::Event::default().data(json);
997                Some((Ok::<_, Infallible>(sse), rx))
998            }
999            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1000                let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1001                let sse = axum::response::sse::Event::default()
1002                    .event("warning")
1003                    .data(msg);
1004                Some((Ok(sse), rx))
1005            }
1006            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1007        }
1008    });
1009
1010    axum::response::sse::Sse::new(stream)
1011        .keep_alive(axum::response::sse::KeepAlive::default())
1012        .into_response()
1013}
1014
1015#[derive(Serialize)]
1016struct LogEvent {
1017    event_type: String,
1018    timestamp: String,
1019    data: serde_json::Map<String, serde_json::Value>,
1020}
1021
1022fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1023    match v {
1024        varpulis_core::Value::Null => serde_json::Value::Null,
1025        varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1026        varpulis_core::Value::Int(i) => serde_json::json!(*i),
1027        varpulis_core::Value::Float(f) => serde_json::json!(*f),
1028        varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1029        varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1030        varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1031        varpulis_core::Value::Array(arr) => {
1032            serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1033        }
1034        varpulis_core::Value::Map(map) => {
1035            let obj: serde_json::Map<String, serde_json::Value> = map
1036                .iter()
1037                .map(|(k, v)| (k.to_string(), json_from_value(v)))
1038                .collect();
1039            serde_json::Value::Object(obj)
1040        }
1041    }
1042}
1043
1044// =============================================================================
1045// DLQ Handlers
1046// =============================================================================
1047
1048async fn handle_dlq_get(
1049    State(state): State<ApiState>,
1050    Path(pipeline_id): Path<String>,
1051    ApiKey(api_key): ApiKey,
1052    Query(params): Query<DlqQueryParams>,
1053) -> Response {
1054    let manager = &state.manager;
1055    let mgr = manager.read().await;
1056
1057    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1058        Some(id) => id.clone(),
1059        None => {
1060            return error_response(
1061                StatusCode::UNAUTHORIZED,
1062                "invalid_api_key",
1063                "Invalid API key",
1064            )
1065        }
1066    };
1067
1068    let tenant = match mgr.get_tenant(&tenant_id) {
1069        Some(t) => t,
1070        None => {
1071            return error_response(
1072                StatusCode::NOT_FOUND,
1073                "tenant_not_found",
1074                "Tenant not found",
1075            )
1076        }
1077    };
1078
1079    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1080        Some(p) => p,
1081        None => {
1082            return error_response(
1083                StatusCode::NOT_FOUND,
1084                "pipeline_not_found",
1085                "Pipeline not found",
1086            )
1087        }
1088    };
1089
1090    let engine = pipeline.engine.lock().await;
1091    let dlq = match engine.dlq() {
1092        Some(d) => d,
1093        None => {
1094            let resp = DlqEntriesResponse {
1095                entries: Vec::new(),
1096                total: 0,
1097            };
1098            return axum::Json(&resp).into_response();
1099        }
1100    };
1101
1102    let offset = params.offset.unwrap_or(0);
1103    let limit = params.limit.unwrap_or(100).min(1000);
1104
1105    match dlq.read_entries(offset, limit) {
1106        Ok(entries) => {
1107            let resp = DlqEntriesResponse {
1108                total: dlq.line_count(),
1109                entries,
1110            };
1111            axum::Json(&resp).into_response()
1112        }
1113        Err(e) => error_response(
1114            StatusCode::INTERNAL_SERVER_ERROR,
1115            "dlq_read_error",
1116            &format!("Failed to read DLQ: {e}"),
1117        ),
1118    }
1119}
1120
1121async fn handle_dlq_replay(
1122    State(state): State<ApiState>,
1123    Path(pipeline_id): Path<String>,
1124    ApiKey(api_key): ApiKey,
1125) -> Response {
1126    let manager = &state.manager;
1127    let mgr = manager.read().await;
1128
1129    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1130        Some(id) => id.clone(),
1131        None => {
1132            return error_response(
1133                StatusCode::UNAUTHORIZED,
1134                "invalid_api_key",
1135                "Invalid API key",
1136            )
1137        }
1138    };
1139
1140    let tenant = match mgr.get_tenant(&tenant_id) {
1141        Some(t) => t,
1142        None => {
1143            return error_response(
1144                StatusCode::NOT_FOUND,
1145                "tenant_not_found",
1146                "Tenant not found",
1147            )
1148        }
1149    };
1150
1151    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1152        Some(p) => p,
1153        None => {
1154            return error_response(
1155                StatusCode::NOT_FOUND,
1156                "pipeline_not_found",
1157                "Pipeline not found",
1158            )
1159        }
1160    };
1161
1162    // Read all DLQ entries
1163    let entries = {
1164        let engine = pipeline.engine.lock().await;
1165        let dlq = match engine.dlq() {
1166            Some(d) => d,
1167            None => {
1168                let resp = DlqReplayResponse { replayed: 0 };
1169                return axum::Json(&resp).into_response();
1170            }
1171        };
1172        // Read all entries (up to a reasonable limit)
1173        match dlq.read_entries(0, 100_000) {
1174            Ok(entries) => entries,
1175            Err(e) => {
1176                return error_response(
1177                    StatusCode::INTERNAL_SERVER_ERROR,
1178                    "dlq_read_error",
1179                    &format!("Failed to read DLQ: {e}"),
1180                )
1181            }
1182        }
1183    };
1184
1185    // Replay each entry as an event into the pipeline engine
1186    let mut replayed = 0usize;
1187    {
1188        let mut engine = pipeline.engine.lock().await;
1189        for entry in &entries {
1190            // Reconstruct event from the DLQ entry
1191            let event_type = entry
1192                .event
1193                .get("event_type")
1194                .and_then(|v| v.as_str())
1195                .unwrap_or("unknown");
1196            let mut event = Event::new(event_type);
1197            if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1198                for (k, v) in data {
1199                    let rv = json_to_runtime_value(v);
1200                    event = event.with_field(k.as_str(), rv);
1201                }
1202            }
1203            if engine.process(event).await.is_ok() {
1204                replayed += 1;
1205            }
1206        }
1207    }
1208
1209    let resp = DlqReplayResponse { replayed };
1210    axum::Json(&resp).into_response()
1211}
1212
1213async fn handle_dlq_clear(
1214    State(state): State<ApiState>,
1215    Path(pipeline_id): Path<String>,
1216    ApiKey(api_key): ApiKey,
1217) -> Response {
1218    let manager = &state.manager;
1219    let mgr = manager.read().await;
1220
1221    let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1222        Some(id) => id.clone(),
1223        None => {
1224            return error_response(
1225                StatusCode::UNAUTHORIZED,
1226                "invalid_api_key",
1227                "Invalid API key",
1228            )
1229        }
1230    };
1231
1232    let tenant = match mgr.get_tenant(&tenant_id) {
1233        Some(t) => t,
1234        None => {
1235            return error_response(
1236                StatusCode::NOT_FOUND,
1237                "tenant_not_found",
1238                "Tenant not found",
1239            )
1240        }
1241    };
1242
1243    let pipeline = match tenant.pipelines.get(&pipeline_id) {
1244        Some(p) => p,
1245        None => {
1246            return error_response(
1247                StatusCode::NOT_FOUND,
1248                "pipeline_not_found",
1249                "Pipeline not found",
1250            )
1251        }
1252    };
1253
1254    let engine = pipeline.engine.lock().await;
1255    match engine.dlq() {
1256        Some(dlq) => match dlq.clear() {
1257            Ok(()) => {
1258                let resp = DlqClearResponse { cleared: true };
1259                axum::Json(&resp).into_response()
1260            }
1261            Err(e) => error_response(
1262                StatusCode::INTERNAL_SERVER_ERROR,
1263                "dlq_clear_error",
1264                &format!("Failed to clear DLQ: {e}"),
1265            ),
1266        },
1267        None => {
1268            let resp = DlqClearResponse { cleared: true };
1269            axum::Json(&resp).into_response()
1270        }
1271    }
1272}
1273
1274// =============================================================================
1275// Tenant Admin Routes
1276// =============================================================================
1277
1278// Tenant admin routes are now part of the main Router in api_routes()
1279
1280#[allow(clippy::result_large_err)]
1281fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1282    match configured {
1283        None => Err(error_response(
1284            StatusCode::FORBIDDEN,
1285            "admin_disabled",
1286            "Admin API is disabled (no --api-key configured)",
1287        )),
1288        Some(key) => {
1289            if constant_time_compare(key, provided) {
1290                Ok(())
1291            } else {
1292                Err(error_response(
1293                    StatusCode::UNAUTHORIZED,
1294                    "invalid_admin_key",
1295                    "Invalid admin key",
1296                ))
1297            }
1298        }
1299    }
1300}
1301
1302fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1303    match tier {
1304        Some("free") => TenantQuota::free(),
1305        Some("pro") => TenantQuota::pro(),
1306        Some("enterprise") => TenantQuota::enterprise(),
1307        _ => TenantQuota::default(),
1308    }
1309}
1310
1311async fn handle_create_tenant(
1312    State(state): State<ApiState>,
1313    AdminKey(admin_key): AdminKey,
1314    Json(body): Json<CreateTenantRequest>,
1315) -> Response {
1316    let manager = &state.manager;
1317    let configured_key = &state.admin_key;
1318    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1319        return resp;
1320    }
1321
1322    let api_key = uuid::Uuid::new_v4().to_string();
1323    let quota = quota_from_tier(body.quota_tier.as_deref());
1324
1325    let mut mgr = manager.write().await;
1326    match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1327        Ok(tenant_id) => {
1328            let resp = TenantResponse {
1329                id: tenant_id.as_str().to_string(),
1330                name: body.name,
1331                api_key,
1332                quota: QuotaInfo {
1333                    max_pipelines: quota.max_pipelines,
1334                    max_events_per_second: quota.max_events_per_second,
1335                    max_streams_per_pipeline: quota.max_streams_per_pipeline,
1336                },
1337            };
1338            (StatusCode::CREATED, axum::Json(&resp)).into_response()
1339        }
1340        Err(e) => tenant_error_response(e),
1341    }
1342}
1343
1344async fn handle_list_tenants(
1345    State(state): State<ApiState>,
1346    AdminKey(admin_key): AdminKey,
1347    Query(pagination): Query<PaginationParams>,
1348) -> Response {
1349    let manager = &state.manager;
1350    let configured_key = &state.admin_key;
1351    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1352        return resp;
1353    }
1354
1355    if pagination.exceeds_max() {
1356        return error_response(
1357            StatusCode::BAD_REQUEST,
1358            "invalid_limit",
1359            &format!("limit must not exceed {MAX_LIMIT}"),
1360        );
1361    }
1362
1363    let mgr = manager.read().await;
1364    let all_tenants: Vec<TenantResponse> = mgr
1365        .list_tenants()
1366        .iter()
1367        .map(|t| TenantResponse {
1368            id: t.id.as_str().to_string(),
1369            name: t.name.clone(),
1370            api_key: t.api_key.clone(),
1371            quota: QuotaInfo {
1372                max_pipelines: t.quota.max_pipelines,
1373                max_events_per_second: t.quota.max_events_per_second,
1374                max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1375            },
1376        })
1377        .collect();
1378    let (tenants, meta) = pagination.paginate(all_tenants);
1379    let total = meta.total;
1380    let resp = TenantListResponse {
1381        tenants,
1382        total,
1383        pagination: Some(meta),
1384    };
1385    axum::Json(&resp).into_response()
1386}
1387
1388async fn handle_get_tenant(
1389    State(state): State<ApiState>,
1390    Path(tenant_id_str): Path<String>,
1391    AdminKey(admin_key): AdminKey,
1392) -> Response {
1393    let manager = &state.manager;
1394    let configured_key = &state.admin_key;
1395    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1396        return resp;
1397    }
1398
1399    let mgr = manager.read().await;
1400    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1401    match mgr.get_tenant(&tenant_id) {
1402        Some(t) => {
1403            let resp = TenantDetailResponse {
1404                id: t.id.as_str().to_string(),
1405                name: t.name.clone(),
1406                api_key: t.api_key.clone(),
1407                quota: QuotaInfo {
1408                    max_pipelines: t.quota.max_pipelines,
1409                    max_events_per_second: t.quota.max_events_per_second,
1410                    max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1411                },
1412                usage: TenantUsageInfo {
1413                    events_processed: t.usage.events_processed,
1414                    output_events_emitted: t.usage.output_events_emitted,
1415                    active_pipelines: t.usage.active_pipelines,
1416                },
1417                pipeline_count: t.pipelines.len(),
1418            };
1419            axum::Json(&resp).into_response()
1420        }
1421        None => error_response(
1422            StatusCode::NOT_FOUND,
1423            "tenant_not_found",
1424            "Tenant not found",
1425        ),
1426    }
1427}
1428
1429async fn handle_delete_tenant(
1430    State(state): State<ApiState>,
1431    Path(tenant_id_str): Path<String>,
1432    AdminKey(admin_key): AdminKey,
1433) -> Response {
1434    let manager = &state.manager;
1435    let configured_key = &state.admin_key;
1436    if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1437        return resp;
1438    }
1439
1440    let mut mgr = manager.write().await;
1441    let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1442    match mgr.remove_tenant(&tenant_id) {
1443        Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1444        Err(e) => tenant_error_response(e),
1445    }
1446}
1447
1448// =============================================================================
1449// Helpers
1450// =============================================================================
1451
1452fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1453    let body = ApiError {
1454        error: message.to_string(),
1455        code: code.to_string(),
1456    };
1457    (status, axum::Json(body)).into_response()
1458}
1459
1460fn tenant_error_response(err: TenantError) -> Response {
1461    // BackpressureExceeded needs a Retry-After header, handle it specially
1462    if let TenantError::BackpressureExceeded { current, max } = &err {
1463        let body = serde_json::json!({
1464            "error": format!("queue depth {current} exceeds maximum {max}"),
1465            "code": "queue_depth_exceeded",
1466            "retry_after": 1,
1467        });
1468        return (
1469            StatusCode::TOO_MANY_REQUESTS,
1470            [("Retry-After", "1"), ("Content-Type", "application/json")],
1471            serde_json::to_string(&body).unwrap_or_default(),
1472        )
1473            .into_response();
1474    }
1475
1476    let (status, code) = match &err {
1477        TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1478        TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1479        TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1480        TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1481        TenantError::BackpressureExceeded { .. } => unreachable!(),
1482        TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1483        TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1484        TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1485    };
1486    error_response(status, code, &err.to_string())
1487}
1488
1489fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1490    match v {
1491        serde_json::Value::Null => varpulis_core::Value::Null,
1492        serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1493        serde_json::Value::Number(n) => {
1494            if let Some(i) = n.as_i64() {
1495                varpulis_core::Value::Int(i)
1496            } else if let Some(f) = n.as_f64() {
1497                varpulis_core::Value::Float(f)
1498            } else {
1499                varpulis_core::Value::Null
1500            }
1501        }
1502        serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1503        serde_json::Value::Array(arr) => {
1504            varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1505        }
1506        serde_json::Value::Object(map) => {
1507            let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1508                IndexMap::with_hasher(FxBuildHasher);
1509            for (k, v) in map {
1510                m.insert(k.as_str().into(), json_to_runtime_value(v));
1511            }
1512            varpulis_core::Value::map(m)
1513        }
1514    }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519    use super::*;
1520    use axum::body::Body;
1521    use axum::http::Request;
1522    use std::sync::Arc;
1523    use tokio::sync::RwLock;
1524    use tower::ServiceExt;
1525    use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1526
1527    /// Test response wrapper for axum integration tests.
1528    struct TestResponse {
1529        status: StatusCode,
1530        body: bytes::Bytes,
1531        headers: axum::http::HeaderMap,
1532    }
1533
1534    impl TestResponse {
1535        fn status(&self) -> StatusCode {
1536            self.status
1537        }
1538        fn body(&self) -> &[u8] {
1539            &self.body
1540        }
1541        fn headers(&self) -> &axum::http::HeaderMap {
1542            &self.headers
1543        }
1544    }
1545
1546    /// Test request builder for axum integration tests.
1547    struct TestRequestBuilder {
1548        method: String,
1549        path: String,
1550        headers: Vec<(String, String)>,
1551        body: Option<String>,
1552    }
1553
1554    impl TestRequestBuilder {
1555        fn new() -> Self {
1556            Self {
1557                method: "GET".to_string(),
1558                path: "/".to_string(),
1559                headers: Vec::new(),
1560                body: None,
1561            }
1562        }
1563        fn method(mut self, m: &str) -> Self {
1564            self.method = m.to_string();
1565            self
1566        }
1567        fn path(mut self, p: &str) -> Self {
1568            self.path = p.to_string();
1569            self
1570        }
1571        fn header(mut self, k: &str, v: &str) -> Self {
1572            self.headers.push((k.to_string(), v.to_string()));
1573            self
1574        }
1575        fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1576            self.body = Some(serde_json::to_string(body).unwrap());
1577            self.headers
1578                .push(("content-type".to_string(), "application/json".to_string()));
1579            self
1580        }
1581        async fn reply(self, app: &Router) -> TestResponse {
1582            let mut builder = Request::builder()
1583                .method(self.method.as_str())
1584                .uri(&self.path);
1585            for (k, v) in &self.headers {
1586                builder = builder.header(k.as_str(), v.as_str());
1587            }
1588            let body = match self.body {
1589                Some(b) => Body::from(b),
1590                None => Body::empty(),
1591            };
1592            let req = builder.body(body).unwrap();
1593            let resp = app.clone().oneshot(req).await.unwrap();
1594            let status = resp.status();
1595            let headers = resp.headers().clone();
1596            let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1597                .await
1598                .unwrap();
1599            TestResponse {
1600                status,
1601                body,
1602                headers,
1603            }
1604        }
1605    }
1606
1607    /// Mimics `test_request()`.
1608    fn test_request() -> TestRequestBuilder {
1609        TestRequestBuilder::new()
1610    }
1611
1612    async fn setup_test_manager() -> SharedTenantManager {
1613        let mut mgr = TenantManager::new();
1614        let id = mgr
1615            .create_tenant(
1616                "Test Corp".into(),
1617                "test-key-123".into(),
1618                TenantQuota::default(),
1619            )
1620            .unwrap();
1621
1622        // Deploy a pipeline
1623        let tenant = mgr.get_tenant_mut(&id).unwrap();
1624        tenant
1625            .deploy_pipeline(
1626                "Test Pipeline".into(),
1627                "stream A = SensorReading .where(x > 1)".into(),
1628            )
1629            .await
1630            .unwrap();
1631
1632        Arc::new(RwLock::new(mgr))
1633    }
1634
1635    #[tokio::test]
1636    async fn test_deploy_pipeline() {
1637        let mgr = setup_test_manager().await;
1638        let routes = api_routes(mgr, None, None, None);
1639
1640        let resp = test_request()
1641            .method("POST")
1642            .path("/api/v1/pipelines")
1643            .header("x-api-key", "test-key-123")
1644            .json(&DeployPipelineRequest {
1645                name: "New Pipeline".into(),
1646                source: "stream B = Events .where(y > 10)".into(),
1647            })
1648            .reply(&routes)
1649            .await;
1650
1651        assert_eq!(resp.status(), StatusCode::CREATED);
1652        let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1653        assert_eq!(body.name, "New Pipeline");
1654        assert_eq!(body.status, "running");
1655    }
1656
1657    #[tokio::test]
1658    async fn test_deploy_invalid_api_key() {
1659        let mgr = setup_test_manager().await;
1660        let routes = api_routes(mgr, None, None, None);
1661
1662        let resp = test_request()
1663            .method("POST")
1664            .path("/api/v1/pipelines")
1665            .header("x-api-key", "wrong-key")
1666            .json(&DeployPipelineRequest {
1667                name: "Bad".into(),
1668                source: "stream X = Y .where(z > 1)".into(),
1669            })
1670            .reply(&routes)
1671            .await;
1672
1673        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1674    }
1675
1676    #[tokio::test]
1677    async fn test_deploy_invalid_vpl() {
1678        let mgr = setup_test_manager().await;
1679        let routes = api_routes(mgr, None, None, None);
1680
1681        let resp = test_request()
1682            .method("POST")
1683            .path("/api/v1/pipelines")
1684            .header("x-api-key", "test-key-123")
1685            .json(&DeployPipelineRequest {
1686                name: "Bad VPL".into(),
1687                source: "this is not valid {{{".into(),
1688            })
1689            .reply(&routes)
1690            .await;
1691
1692        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1693    }
1694
1695    #[tokio::test]
1696    async fn test_list_pipelines() {
1697        let mgr = setup_test_manager().await;
1698        let routes = api_routes(mgr, None, None, None);
1699
1700        let resp = test_request()
1701            .method("GET")
1702            .path("/api/v1/pipelines")
1703            .header("x-api-key", "test-key-123")
1704            .reply(&routes)
1705            .await;
1706
1707        assert_eq!(resp.status(), StatusCode::OK);
1708        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1709        assert_eq!(body.total, 1);
1710        assert_eq!(body.pipelines[0].name, "Test Pipeline");
1711    }
1712
1713    #[tokio::test]
1714    async fn test_usage_endpoint() {
1715        let mgr = setup_test_manager().await;
1716        let routes = api_routes(mgr, None, None, None);
1717
1718        let resp = test_request()
1719            .method("GET")
1720            .path("/api/v1/usage")
1721            .header("x-api-key", "test-key-123")
1722            .reply(&routes)
1723            .await;
1724
1725        assert_eq!(resp.status(), StatusCode::OK);
1726        let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1727        assert_eq!(body.active_pipelines, 1);
1728    }
1729
1730    #[tokio::test]
1731    async fn test_inject_event() {
1732        let mgr = setup_test_manager().await;
1733
1734        // Get pipeline ID
1735        let pipeline_id = {
1736            let m = mgr.read().await;
1737            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1738            let tenant = m.get_tenant(&tid).unwrap();
1739            tenant.pipelines.keys().next().unwrap().clone()
1740        };
1741
1742        let routes = api_routes(mgr, None, None, None);
1743
1744        let resp = test_request()
1745            .method("POST")
1746            .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1747            .header("x-api-key", "test-key-123")
1748            .json(&InjectEventRequest {
1749                event_type: "SensorReading".into(),
1750                fields: {
1751                    let mut m = serde_json::Map::new();
1752                    m.insert(
1753                        "x".into(),
1754                        serde_json::Value::Number(serde_json::Number::from(42)),
1755                    );
1756                    m
1757                },
1758            })
1759            .reply(&routes)
1760            .await;
1761
1762        assert_eq!(resp.status(), StatusCode::OK);
1763    }
1764
1765    #[test]
1766    fn test_json_to_runtime_value() {
1767        assert_eq!(
1768            json_to_runtime_value(&serde_json::json!(null)),
1769            varpulis_core::Value::Null
1770        );
1771        assert_eq!(
1772            json_to_runtime_value(&serde_json::json!(true)),
1773            varpulis_core::Value::Bool(true)
1774        );
1775        assert_eq!(
1776            json_to_runtime_value(&serde_json::json!(42)),
1777            varpulis_core::Value::Int(42)
1778        );
1779        assert_eq!(
1780            json_to_runtime_value(&serde_json::json!(1.23)),
1781            varpulis_core::Value::Float(1.23)
1782        );
1783        assert_eq!(
1784            json_to_runtime_value(&serde_json::json!("hello")),
1785            varpulis_core::Value::Str("hello".into())
1786        );
1787    }
1788
1789    #[test]
1790    fn test_error_response_format() {
1791        let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
1792        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1793    }
1794
1795    #[test]
1796    fn test_tenant_error_mapping() {
1797        let resp = tenant_error_response(TenantError::NotFound("t1".into()));
1798        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1799
1800        let resp = tenant_error_response(TenantError::RateLimitExceeded);
1801        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
1802
1803        let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
1804        let resp = tenant_error_response(TenantError::ParseError(parse_err));
1805        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1806    }
1807
1808    // =========================================================================
1809    // Tenant Admin API tests
1810    // =========================================================================
1811
1812    fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
1813        let mgr = Arc::new(RwLock::new(TenantManager::new()));
1814        let key = admin_key.map(|k| k.to_string());
1815        let routes = api_routes(mgr.clone(), key, None, None);
1816        (mgr, routes)
1817    }
1818
1819    #[tokio::test]
1820    async fn test_create_tenant() {
1821        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1822
1823        let resp = test_request()
1824            .method("POST")
1825            .path("/api/v1/tenants")
1826            .header("x-admin-key", "admin-secret")
1827            .json(&CreateTenantRequest {
1828                name: "Acme Corp".into(),
1829                quota_tier: None,
1830            })
1831            .reply(&routes)
1832            .await;
1833
1834        assert_eq!(resp.status(), StatusCode::CREATED);
1835        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1836        assert_eq!(body.name, "Acme Corp");
1837        assert!(!body.api_key.is_empty());
1838        assert!(!body.id.is_empty());
1839    }
1840
1841    #[tokio::test]
1842    async fn test_list_tenants_admin() {
1843        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1844
1845        // Create two tenants
1846        for name in &["Tenant A", "Tenant B"] {
1847            test_request()
1848                .method("POST")
1849                .path("/api/v1/tenants")
1850                .header("x-admin-key", "admin-secret")
1851                .json(&CreateTenantRequest {
1852                    name: name.to_string(),
1853                    quota_tier: None,
1854                })
1855                .reply(&routes)
1856                .await;
1857        }
1858
1859        let resp = test_request()
1860            .method("GET")
1861            .path("/api/v1/tenants")
1862            .header("x-admin-key", "admin-secret")
1863            .reply(&routes)
1864            .await;
1865
1866        assert_eq!(resp.status(), StatusCode::OK);
1867        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
1868        assert_eq!(body.total, 2);
1869    }
1870
1871    #[tokio::test]
1872    async fn test_get_tenant_admin() {
1873        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1874
1875        // Create a tenant
1876        let create_resp = test_request()
1877            .method("POST")
1878            .path("/api/v1/tenants")
1879            .header("x-admin-key", "admin-secret")
1880            .json(&CreateTenantRequest {
1881                name: "Detail Corp".into(),
1882                quota_tier: Some("pro".into()),
1883            })
1884            .reply(&routes)
1885            .await;
1886
1887        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
1888
1889        let resp = test_request()
1890            .method("GET")
1891            .path(&format!("/api/v1/tenants/{}", created.id))
1892            .header("x-admin-key", "admin-secret")
1893            .reply(&routes)
1894            .await;
1895
1896        assert_eq!(resp.status(), StatusCode::OK);
1897        let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
1898        assert_eq!(body.name, "Detail Corp");
1899        assert_eq!(body.pipeline_count, 0);
1900        // Pro tier quotas
1901        assert_eq!(body.quota.max_pipelines, 20);
1902    }
1903
1904    #[tokio::test]
1905    async fn test_delete_tenant_admin() {
1906        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1907
1908        // Create then delete
1909        let create_resp = test_request()
1910            .method("POST")
1911            .path("/api/v1/tenants")
1912            .header("x-admin-key", "admin-secret")
1913            .json(&CreateTenantRequest {
1914                name: "Doomed".into(),
1915                quota_tier: None,
1916            })
1917            .reply(&routes)
1918            .await;
1919        let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
1920
1921        let resp = test_request()
1922            .method("DELETE")
1923            .path(&format!("/api/v1/tenants/{}", created.id))
1924            .header("x-admin-key", "admin-secret")
1925            .reply(&routes)
1926            .await;
1927
1928        assert_eq!(resp.status(), StatusCode::OK);
1929
1930        // Verify tenant is gone
1931        let list_resp = test_request()
1932            .method("GET")
1933            .path("/api/v1/tenants")
1934            .header("x-admin-key", "admin-secret")
1935            .reply(&routes)
1936            .await;
1937        let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
1938        assert_eq!(body.total, 0);
1939    }
1940
1941    #[tokio::test]
1942    async fn test_invalid_admin_key() {
1943        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1944
1945        let resp = test_request()
1946            .method("GET")
1947            .path("/api/v1/tenants")
1948            .header("x-admin-key", "wrong-key")
1949            .reply(&routes)
1950            .await;
1951
1952        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1953    }
1954
1955    #[tokio::test]
1956    async fn test_no_admin_key_configured() {
1957        let (_mgr, routes) = setup_admin_routes(None);
1958
1959        let resp = test_request()
1960            .method("GET")
1961            .path("/api/v1/tenants")
1962            .header("x-admin-key", "anything")
1963            .reply(&routes)
1964            .await;
1965
1966        assert_eq!(resp.status(), StatusCode::FORBIDDEN);
1967    }
1968
1969    #[tokio::test]
1970    async fn test_create_tenant_tier_selection() {
1971        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1972
1973        // Free tier
1974        let resp = test_request()
1975            .method("POST")
1976            .path("/api/v1/tenants")
1977            .header("x-admin-key", "admin-secret")
1978            .json(&CreateTenantRequest {
1979                name: "Free User".into(),
1980                quota_tier: Some("free".into()),
1981            })
1982            .reply(&routes)
1983            .await;
1984        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1985        assert_eq!(body.quota.max_pipelines, 2); // free tier
1986
1987        // Enterprise tier
1988        let resp = test_request()
1989            .method("POST")
1990            .path("/api/v1/tenants")
1991            .header("x-admin-key", "admin-secret")
1992            .json(&CreateTenantRequest {
1993                name: "Enterprise User".into(),
1994                quota_tier: Some("enterprise".into()),
1995            })
1996            .reply(&routes)
1997            .await;
1998        let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1999        assert_eq!(body.quota.max_pipelines, 1000); // enterprise tier
2000    }
2001
2002    // =========================================================================
2003    // Pipeline CRUD handler tests
2004    // =========================================================================
2005
2006    /// Helper: get the first pipeline ID from the test manager
2007    async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2008        let m = mgr.read().await;
2009        let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2010        let tenant = m.get_tenant(&tid).unwrap();
2011        tenant.pipelines.keys().next().unwrap().clone()
2012    }
2013
2014    #[tokio::test]
2015    async fn test_get_single_pipeline() {
2016        let mgr = setup_test_manager().await;
2017        let pipeline_id = get_first_pipeline_id(&mgr).await;
2018        let routes = api_routes(mgr, None, None, None);
2019
2020        let resp = test_request()
2021            .method("GET")
2022            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2023            .header("x-api-key", "test-key-123")
2024            .reply(&routes)
2025            .await;
2026
2027        assert_eq!(resp.status(), StatusCode::OK);
2028        let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2029        assert_eq!(body.id, pipeline_id);
2030        assert_eq!(body.name, "Test Pipeline");
2031        assert_eq!(body.status, "running");
2032        assert!(body.source.contains("SensorReading"));
2033    }
2034
2035    #[tokio::test]
2036    async fn test_get_pipeline_not_found() {
2037        let mgr = setup_test_manager().await;
2038        let routes = api_routes(mgr, None, None, None);
2039
2040        let resp = test_request()
2041            .method("GET")
2042            .path("/api/v1/pipelines/nonexistent-id")
2043            .header("x-api-key", "test-key-123")
2044            .reply(&routes)
2045            .await;
2046
2047        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2048    }
2049
2050    #[tokio::test]
2051    async fn test_delete_pipeline_api() {
2052        let mgr = setup_test_manager().await;
2053        let pipeline_id = get_first_pipeline_id(&mgr).await;
2054        let routes = api_routes(mgr.clone(), None, None, None);
2055
2056        let resp = test_request()
2057            .method("DELETE")
2058            .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2059            .header("x-api-key", "test-key-123")
2060            .reply(&routes)
2061            .await;
2062
2063        assert_eq!(resp.status(), StatusCode::OK);
2064        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2065        assert_eq!(body["deleted"], true);
2066
2067        // Verify it's gone
2068        let list_resp = test_request()
2069            .method("GET")
2070            .path("/api/v1/pipelines")
2071            .header("x-api-key", "test-key-123")
2072            .reply(&routes)
2073            .await;
2074        let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2075        assert_eq!(list.total, 0);
2076    }
2077
2078    #[tokio::test]
2079    async fn test_delete_pipeline_not_found() {
2080        let mgr = setup_test_manager().await;
2081        let routes = api_routes(mgr, None, None, None);
2082
2083        let resp = test_request()
2084            .method("DELETE")
2085            .path("/api/v1/pipelines/nonexistent-id")
2086            .header("x-api-key", "test-key-123")
2087            .reply(&routes)
2088            .await;
2089
2090        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2091    }
2092
2093    // =========================================================================
2094    // Batch inject handler tests
2095    // =========================================================================
2096
2097    #[tokio::test]
2098    async fn test_inject_batch() {
2099        let mgr = setup_test_manager().await;
2100        let pipeline_id = get_first_pipeline_id(&mgr).await;
2101        let routes = api_routes(mgr, None, None, None);
2102
2103        let resp = test_request()
2104            .method("POST")
2105            .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2106            .header("x-api-key", "test-key-123")
2107            .json(&InjectBatchRequest {
2108                events: vec![
2109                    InjectEventRequest {
2110                        event_type: "SensorReading".into(),
2111                        fields: {
2112                            let mut m = serde_json::Map::new();
2113                            m.insert("x".into(), serde_json::json!(5));
2114                            m
2115                        },
2116                    },
2117                    InjectEventRequest {
2118                        event_type: "SensorReading".into(),
2119                        fields: {
2120                            let mut m = serde_json::Map::new();
2121                            m.insert("x".into(), serde_json::json!(10));
2122                            m
2123                        },
2124                    },
2125                ],
2126            })
2127            .reply(&routes)
2128            .await;
2129
2130        assert_eq!(resp.status(), StatusCode::OK);
2131        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2132        assert_eq!(body.accepted, 2);
2133        assert!(body.processing_time_us > 0);
2134    }
2135
2136    #[tokio::test]
2137    async fn test_inject_batch_invalid_pipeline() {
2138        let mgr = setup_test_manager().await;
2139        let routes = api_routes(mgr, None, None, None);
2140
2141        // Batch mode silently skips failed events (including nonexistent pipeline)
2142        let resp = test_request()
2143            .method("POST")
2144            .path("/api/v1/pipelines/nonexistent/events-batch")
2145            .header("x-api-key", "test-key-123")
2146            .json(&InjectBatchRequest {
2147                events: vec![InjectEventRequest {
2148                    event_type: "Test".into(),
2149                    fields: serde_json::Map::new(),
2150                }],
2151            })
2152            .reply(&routes)
2153            .await;
2154
2155        // Returns 200 but accepted=0 since pipeline doesn't exist
2156        assert_eq!(resp.status(), StatusCode::OK);
2157        let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2158        assert_eq!(body.accepted, 0);
2159    }
2160
2161    // =========================================================================
2162    // Checkpoint/Restore handler tests
2163    // =========================================================================
2164
2165    #[tokio::test]
2166    async fn test_checkpoint_pipeline() {
2167        let mgr = setup_test_manager().await;
2168        let pipeline_id = get_first_pipeline_id(&mgr).await;
2169        let routes = api_routes(mgr, None, None, None);
2170
2171        let resp = test_request()
2172            .method("POST")
2173            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2174            .header("x-api-key", "test-key-123")
2175            .reply(&routes)
2176            .await;
2177
2178        assert_eq!(resp.status(), StatusCode::OK);
2179        let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2180        assert_eq!(body.pipeline_id, pipeline_id);
2181    }
2182
2183    #[tokio::test]
2184    async fn test_checkpoint_not_found() {
2185        let mgr = setup_test_manager().await;
2186        let routes = api_routes(mgr, None, None, None);
2187
2188        let resp = test_request()
2189            .method("POST")
2190            .path("/api/v1/pipelines/nonexistent/checkpoint")
2191            .header("x-api-key", "test-key-123")
2192            .reply(&routes)
2193            .await;
2194
2195        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2196    }
2197
2198    #[tokio::test]
2199    async fn test_restore_pipeline() {
2200        let mgr = setup_test_manager().await;
2201        let pipeline_id = get_first_pipeline_id(&mgr).await;
2202        let routes = api_routes(mgr, None, None, None);
2203
2204        // First checkpoint
2205        let cp_resp = test_request()
2206            .method("POST")
2207            .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2208            .header("x-api-key", "test-key-123")
2209            .reply(&routes)
2210            .await;
2211        let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2212
2213        // Then restore
2214        let resp = test_request()
2215            .method("POST")
2216            .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2217            .header("x-api-key", "test-key-123")
2218            .json(&RestoreRequest {
2219                checkpoint: cp.checkpoint,
2220            })
2221            .reply(&routes)
2222            .await;
2223
2224        assert_eq!(resp.status(), StatusCode::OK);
2225        let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2226        assert_eq!(body.pipeline_id, pipeline_id);
2227        assert!(body.restored);
2228    }
2229
2230    #[tokio::test]
2231    async fn test_restore_not_found() {
2232        let mgr = setup_test_manager().await;
2233        let routes = api_routes(mgr, None, None, None);
2234
2235        let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2236            version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2237            window_states: std::collections::HashMap::new(),
2238            sase_states: std::collections::HashMap::new(),
2239            join_states: std::collections::HashMap::new(),
2240            variables: std::collections::HashMap::new(),
2241            events_processed: 0,
2242            output_events_emitted: 0,
2243            watermark_state: None,
2244            distinct_states: std::collections::HashMap::new(),
2245            limit_states: std::collections::HashMap::new(),
2246        };
2247
2248        let resp = test_request()
2249            .method("POST")
2250            .path("/api/v1/pipelines/nonexistent/restore")
2251            .header("x-api-key", "test-key-123")
2252            .json(&RestoreRequest { checkpoint })
2253            .reply(&routes)
2254            .await;
2255
2256        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2257    }
2258
2259    // =========================================================================
2260    // Metrics handler tests
2261    // =========================================================================
2262
2263    #[tokio::test]
2264    async fn test_metrics_endpoint() {
2265        let mgr = setup_test_manager().await;
2266        let pipeline_id = get_first_pipeline_id(&mgr).await;
2267        let routes = api_routes(mgr, None, None, None);
2268
2269        let resp = test_request()
2270            .method("GET")
2271            .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2272            .header("x-api-key", "test-key-123")
2273            .reply(&routes)
2274            .await;
2275
2276        assert_eq!(resp.status(), StatusCode::OK);
2277        let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2278        assert_eq!(body.pipeline_id, pipeline_id);
2279    }
2280
2281    #[tokio::test]
2282    async fn test_metrics_not_found() {
2283        let mgr = setup_test_manager().await;
2284        let routes = api_routes(mgr, None, None, None);
2285
2286        let resp = test_request()
2287            .method("GET")
2288            .path("/api/v1/pipelines/nonexistent/metrics")
2289            .header("x-api-key", "test-key-123")
2290            .reply(&routes)
2291            .await;
2292
2293        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2294    }
2295
2296    // =========================================================================
2297    // Reload handler tests
2298    // =========================================================================
2299
2300    #[tokio::test]
2301    async fn test_reload_pipeline() {
2302        let mgr = setup_test_manager().await;
2303        let pipeline_id = get_first_pipeline_id(&mgr).await;
2304        let routes = api_routes(mgr, None, None, None);
2305
2306        let resp = test_request()
2307            .method("POST")
2308            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2309            .header("x-api-key", "test-key-123")
2310            .json(&ReloadPipelineRequest {
2311                source: "stream B = Events .where(y > 10)".into(),
2312            })
2313            .reply(&routes)
2314            .await;
2315
2316        assert_eq!(resp.status(), StatusCode::OK);
2317        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2318        assert_eq!(body["reloaded"], true);
2319    }
2320
2321    #[tokio::test]
2322    async fn test_reload_invalid_vpl() {
2323        let mgr = setup_test_manager().await;
2324        let pipeline_id = get_first_pipeline_id(&mgr).await;
2325        let routes = api_routes(mgr, None, None, None);
2326
2327        let resp = test_request()
2328            .method("POST")
2329            .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2330            .header("x-api-key", "test-key-123")
2331            .json(&ReloadPipelineRequest {
2332                source: "not valid {{{".into(),
2333            })
2334            .reply(&routes)
2335            .await;
2336
2337        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2338    }
2339
2340    #[tokio::test]
2341    async fn test_reload_not_found() {
2342        let mgr = setup_test_manager().await;
2343        let routes = api_routes(mgr, None, None, None);
2344
2345        let resp = test_request()
2346            .method("POST")
2347            .path("/api/v1/pipelines/nonexistent/reload")
2348            .header("x-api-key", "test-key-123")
2349            .json(&ReloadPipelineRequest {
2350                source: "stream B = Events .where(y > 10)".into(),
2351            })
2352            .reply(&routes)
2353            .await;
2354
2355        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2356    }
2357
2358    // =========================================================================
2359    // Logs (SSE) handler tests
2360    // =========================================================================
2361
2362    #[tokio::test]
2363    async fn test_logs_invalid_pipeline() {
2364        let mgr = setup_test_manager().await;
2365        let routes = api_routes(mgr, None, None, None);
2366
2367        let resp = test_request()
2368            .method("GET")
2369            .path("/api/v1/pipelines/nonexistent/logs")
2370            .header("x-api-key", "test-key-123")
2371            .reply(&routes)
2372            .await;
2373
2374        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2375    }
2376
2377    #[tokio::test]
2378    async fn test_logs_invalid_api_key() {
2379        let mgr = setup_test_manager().await;
2380        let pipeline_id = get_first_pipeline_id(&mgr).await;
2381        let routes = api_routes(mgr, None, None, None);
2382
2383        let resp = test_request()
2384            .method("GET")
2385            .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2386            .header("x-api-key", "wrong-key")
2387            .reply(&routes)
2388            .await;
2389
2390        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2391    }
2392
2393    // =========================================================================
2394    // json_to_runtime_value extended tests
2395    // =========================================================================
2396
2397    #[test]
2398    fn test_json_to_runtime_value_array() {
2399        let arr = serde_json::json!([1, "hello", true]);
2400        let val = json_to_runtime_value(&arr);
2401        match val {
2402            varpulis_core::Value::Array(a) => {
2403                assert_eq!(a.len(), 3);
2404                assert_eq!(a[0], varpulis_core::Value::Int(1));
2405                assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2406                assert_eq!(a[2], varpulis_core::Value::Bool(true));
2407            }
2408            _ => panic!("Expected Array"),
2409        }
2410    }
2411
2412    #[test]
2413    fn test_json_to_runtime_value_object() {
2414        let obj = serde_json::json!({"key": "val", "num": 42});
2415        let val = json_to_runtime_value(&obj);
2416        match val {
2417            varpulis_core::Value::Map(m) => {
2418                assert_eq!(m.len(), 2);
2419            }
2420            _ => panic!("Expected Map"),
2421        }
2422    }
2423
2424    #[test]
2425    fn test_json_from_value_roundtrip() {
2426        use varpulis_core::Value;
2427        assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2428        assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2429        assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2430        assert_eq!(
2431            json_from_value(&Value::Float(2.71)),
2432            serde_json::json!(2.71)
2433        );
2434        assert_eq!(
2435            json_from_value(&Value::Str("hi".into())),
2436            serde_json::json!("hi")
2437        );
2438        assert_eq!(
2439            json_from_value(&Value::Timestamp(1000000)),
2440            serde_json::json!(1000000)
2441        );
2442        assert_eq!(
2443            json_from_value(&Value::Duration(5000)),
2444            serde_json::json!(5000)
2445        );
2446    }
2447
2448    // =========================================================================
2449    // Additional tenant_error_response coverage
2450    // =========================================================================
2451
2452    #[test]
2453    fn test_tenant_error_all_variants() {
2454        let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2455        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2456
2457        let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2458        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2459
2460        let resp = tenant_error_response(TenantError::EngineError(
2461            varpulis_runtime::EngineError::Pipeline("boom".into()),
2462        ));
2463        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2464
2465        let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2466        assert_eq!(resp.status(), StatusCode::CONFLICT);
2467
2468        let resp = tenant_error_response(TenantError::BackpressureExceeded {
2469            current: 50000,
2470            max: 50000,
2471        });
2472        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2473        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2474    }
2475
2476    // =========================================================================
2477    // Pagination tests
2478    // =========================================================================
2479
2480    #[tokio::test]
2481    async fn test_list_pipelines_default_pagination() {
2482        let mgr = setup_test_manager().await;
2483        let routes = api_routes(mgr, None, None, None);
2484
2485        let resp = test_request()
2486            .method("GET")
2487            .path("/api/v1/pipelines")
2488            .header("x-api-key", "test-key-123")
2489            .reply(&routes)
2490            .await;
2491
2492        assert_eq!(resp.status(), StatusCode::OK);
2493        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2494        assert_eq!(body.total, 1);
2495        let pagination = body.pagination.unwrap();
2496        assert_eq!(pagination.total, 1);
2497        assert_eq!(pagination.offset, 0);
2498        assert_eq!(pagination.limit, 50);
2499        assert!(!pagination.has_more);
2500    }
2501
2502    #[tokio::test]
2503    async fn test_list_pipelines_with_pagination_params() {
2504        let mgr = setup_test_manager().await;
2505
2506        // Deploy two more pipelines
2507        {
2508            let mut m = mgr.write().await;
2509            let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2510            let tenant = m.get_tenant_mut(&tid).unwrap();
2511            tenant
2512                .deploy_pipeline(
2513                    "Pipeline B".into(),
2514                    "stream B = Events .where(y > 2)".into(),
2515                )
2516                .await
2517                .unwrap();
2518            tenant
2519                .deploy_pipeline(
2520                    "Pipeline C".into(),
2521                    "stream C = Events .where(z > 3)".into(),
2522                )
2523                .await
2524                .unwrap();
2525        }
2526
2527        let routes = api_routes(mgr, None, None, None);
2528
2529        // First page: limit=1, offset=0
2530        let resp = test_request()
2531            .method("GET")
2532            .path("/api/v1/pipelines?limit=1&offset=0")
2533            .header("x-api-key", "test-key-123")
2534            .reply(&routes)
2535            .await;
2536
2537        assert_eq!(resp.status(), StatusCode::OK);
2538        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2539        assert_eq!(body.pipelines.len(), 1);
2540        assert_eq!(body.total, 3);
2541        let pagination = body.pagination.unwrap();
2542        assert!(pagination.has_more);
2543        assert_eq!(pagination.limit, 1);
2544
2545        // Second page: limit=1, offset=2
2546        let resp = test_request()
2547            .method("GET")
2548            .path("/api/v1/pipelines?limit=1&offset=2")
2549            .header("x-api-key", "test-key-123")
2550            .reply(&routes)
2551            .await;
2552
2553        let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2554        assert_eq!(body.pipelines.len(), 1);
2555        assert_eq!(body.total, 3);
2556        assert!(!body.pagination.unwrap().has_more);
2557    }
2558
2559    #[tokio::test]
2560    async fn test_list_pipelines_limit_exceeds_max() {
2561        let mgr = setup_test_manager().await;
2562        let routes = api_routes(mgr, None, None, None);
2563
2564        let resp = test_request()
2565            .method("GET")
2566            .path("/api/v1/pipelines?limit=1001")
2567            .header("x-api-key", "test-key-123")
2568            .reply(&routes)
2569            .await;
2570
2571        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2572    }
2573
2574    #[tokio::test]
2575    async fn test_list_tenants_with_pagination() {
2576        let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2577
2578        // Create 3 tenants
2579        for name in &["T1", "T2", "T3"] {
2580            test_request()
2581                .method("POST")
2582                .path("/api/v1/tenants")
2583                .header("x-admin-key", "admin-secret")
2584                .json(&CreateTenantRequest {
2585                    name: name.to_string(),
2586                    quota_tier: None,
2587                })
2588                .reply(&routes)
2589                .await;
2590        }
2591
2592        // Page through with limit=2
2593        let resp = test_request()
2594            .method("GET")
2595            .path("/api/v1/tenants?limit=2&offset=0")
2596            .header("x-admin-key", "admin-secret")
2597            .reply(&routes)
2598            .await;
2599
2600        assert_eq!(resp.status(), StatusCode::OK);
2601        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2602        assert_eq!(body.tenants.len(), 2);
2603        assert_eq!(body.total, 3);
2604        assert!(body.pagination.unwrap().has_more);
2605
2606        // Last page
2607        let resp = test_request()
2608            .method("GET")
2609            .path("/api/v1/tenants?limit=2&offset=2")
2610            .header("x-admin-key", "admin-secret")
2611            .reply(&routes)
2612            .await;
2613
2614        let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2615        assert_eq!(body.tenants.len(), 1);
2616        assert!(!body.pagination.unwrap().has_more);
2617    }
2618
2619    #[tokio::test]
2620    async fn test_inject_backpressure_429() {
2621        use std::sync::atomic::Ordering;
2622
2623        let mut mgr = TenantManager::new();
2624        mgr.set_max_queue_depth(5);
2625        let id = mgr
2626            .create_tenant(
2627                "BP Corp".into(),
2628                "bp-key-123".into(),
2629                TenantQuota::default(),
2630            )
2631            .unwrap();
2632
2633        let tenant = mgr.get_tenant_mut(&id).unwrap();
2634        let pid = tenant
2635            .deploy_pipeline(
2636                "BP Pipeline".into(),
2637                "stream A = SensorReading .where(x > 1)".into(),
2638            )
2639            .await
2640            .unwrap();
2641
2642        // Simulate queue being full
2643        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2644
2645        let shared = Arc::new(RwLock::new(mgr));
2646        let routes = api_routes(shared, None, None, None);
2647
2648        let resp = test_request()
2649            .method("POST")
2650            .path(&format!("/api/v1/pipelines/{pid}/events"))
2651            .header("x-api-key", "bp-key-123")
2652            .json(&InjectEventRequest {
2653                event_type: "SensorReading".into(),
2654                fields: serde_json::Map::new(),
2655            })
2656            .reply(&routes)
2657            .await;
2658
2659        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2660        // Check Retry-After header
2661        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2662        // Check response body
2663        let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2664        assert_eq!(body["code"], "queue_depth_exceeded");
2665    }
2666
2667    #[tokio::test]
2668    async fn test_inject_batch_backpressure_429() {
2669        use std::sync::atomic::Ordering;
2670
2671        let mut mgr = TenantManager::new();
2672        mgr.set_max_queue_depth(5);
2673        let id = mgr
2674            .create_tenant(
2675                "BP Batch Corp".into(),
2676                "bp-batch-key".into(),
2677                TenantQuota::default(),
2678            )
2679            .unwrap();
2680
2681        let tenant = mgr.get_tenant_mut(&id).unwrap();
2682        let pid = tenant
2683            .deploy_pipeline(
2684                "BP Batch Pipeline".into(),
2685                "stream A = SensorReading .where(x > 1)".into(),
2686            )
2687            .await
2688            .unwrap();
2689
2690        // Simulate queue being full
2691        mgr.pending_events_counter().store(5, Ordering::Relaxed);
2692
2693        let shared = Arc::new(RwLock::new(mgr));
2694        let routes = api_routes(shared, None, None, None);
2695
2696        let resp = test_request()
2697            .method("POST")
2698            .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2699            .header("x-api-key", "bp-batch-key")
2700            .json(&InjectBatchRequest {
2701                events: vec![InjectEventRequest {
2702                    event_type: "SensorReading".into(),
2703                    fields: serde_json::Map::new(),
2704                }],
2705            })
2706            .reply(&routes)
2707            .await;
2708
2709        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2710        assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2711    }
2712}