1use std::convert::Infallible;
7
8use axum::extract::{Json, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::{IntoResponse, Response};
11use axum::routing::{get, post};
12use axum::Router;
13use futures_util::stream;
14use indexmap::IndexMap;
15use rustc_hash::FxBuildHasher;
16use serde::{Deserialize, Serialize};
17use tower_http::cors::{Any, CorsLayer};
18use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
19use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
20use varpulis_runtime::Event;
21
22use crate::auth::constant_time_compare;
23use crate::billing::SharedBillingState;
24
25#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31 pub name: String,
32 pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37 pub id: String,
38 pub name: String,
39 pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44 pub id: String,
45 pub name: String,
46 pub status: String,
47 pub source: String,
48 pub uptime_secs: u64,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub global_template_id: Option<String>,
51 #[serde(default = "default_scope")]
53 pub scope_level: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub inherited_from_org_id: Option<String>,
57 #[serde(default)]
59 pub read_only: bool,
60}
61
62fn default_scope() -> String {
63 "own".to_string()
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct PipelineListResponse {
68 pub pipelines: Vec<PipelineInfo>,
69 pub total: usize,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub pagination: Option<PaginationMeta>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct PipelineMetricsResponse {
76 pub pipeline_id: String,
77 pub events_processed: u64,
78 pub output_events_emitted: u64,
79}
80
81#[derive(Debug, Deserialize, Serialize)]
82pub struct InjectEventRequest {
83 pub event_type: String,
84 pub fields: serde_json::Map<String, serde_json::Value>,
85}
86
87#[derive(Debug, Deserialize, Serialize)]
88pub struct InjectBatchRequest {
89 pub events: Vec<InjectEventRequest>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93pub struct InjectBatchResponse {
94 pub accepted: usize,
95 pub output_events: Vec<serde_json::Value>,
96 pub processing_time_us: u64,
97}
98
99#[derive(Debug, Deserialize, Serialize)]
100pub struct ReloadPipelineRequest {
101 pub source: String,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105pub struct CheckpointResponse {
106 pub pipeline_id: String,
107 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
108 pub events_processed: u64,
109}
110
111#[derive(Debug, Deserialize, Serialize)]
112pub struct RestoreRequest {
113 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117pub struct RestoreResponse {
118 pub pipeline_id: String,
119 pub restored: bool,
120 pub events_restored: u64,
121}
122
123#[derive(Debug, Serialize)]
124pub struct ApiError {
125 pub error: String,
126 pub code: String,
127}
128
129#[derive(Debug, Deserialize)]
130pub struct DlqQueryParams {
131 #[serde(default)]
132 pub offset: Option<usize>,
133 #[serde(default)]
134 pub limit: Option<usize>,
135}
136
137#[derive(Debug, Serialize)]
138pub struct DlqEntriesResponse {
139 pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
140 pub total: u64,
141}
142
143#[derive(Debug, Serialize)]
144pub struct DlqReplayResponse {
145 pub replayed: usize,
146}
147
148#[derive(Debug, Serialize)]
149pub struct DlqClearResponse {
150 pub cleared: bool,
151}
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct UsageResponse {
155 pub tenant_id: String,
156 pub events_processed: u64,
157 pub output_events_emitted: u64,
158 pub active_pipelines: usize,
159 pub quota: QuotaInfo,
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct QuotaInfo {
164 pub max_pipelines: usize,
165 pub max_events_per_second: u64,
166 pub max_streams_per_pipeline: usize,
167}
168
169#[derive(Debug, Deserialize, Serialize)]
174pub struct CreateTenantRequest {
175 pub name: String,
176 #[serde(default)]
177 pub quota_tier: Option<String>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub struct TenantResponse {
182 pub id: String,
183 pub name: String,
184 pub api_key: String,
185 pub quota: QuotaInfo,
186}
187
188#[derive(Debug, Serialize, Deserialize)]
189pub struct TenantListResponse {
190 pub tenants: Vec<TenantResponse>,
191 pub total: usize,
192 #[serde(skip_serializing_if = "Option::is_none")]
193 pub pagination: Option<PaginationMeta>,
194}
195
196#[derive(Debug, Serialize, Deserialize)]
197pub struct TenantDetailResponse {
198 pub id: String,
199 pub name: String,
200 pub api_key: String,
201 pub quota: QuotaInfo,
202 pub usage: TenantUsageInfo,
203 pub pipeline_count: usize,
204}
205
206#[derive(Debug, Serialize, Deserialize)]
207pub struct TenantUsageInfo {
208 pub events_processed: u64,
209 pub output_events_emitted: u64,
210 pub active_pipelines: usize,
211}
212
213fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
222 use axum::http::{HeaderValue, Method};
223
224 let base = CorsLayer::new()
225 .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
226 .allow_headers([
227 "content-type".parse().unwrap(),
228 "x-api-key".parse().unwrap(),
229 "authorization".parse().unwrap(),
230 ]);
231
232 match origins {
233 Some(ref list) if !list.is_empty() && !list.iter().any(|o| o == "*") => {
234 let origins: Vec<HeaderValue> = list.iter().filter_map(|s| s.parse().ok()).collect();
235 base.allow_origin(origins)
236 }
237 _ => base.allow_origin(Any),
238 }
239}
240
241#[derive(Debug, Clone)]
243pub struct ApiState {
244 pub manager: SharedTenantManager,
245 pub admin_key: Option<String>,
246 pub billing_state: Option<SharedBillingState>,
247 #[cfg(feature = "saas")]
248 pub db_pool: Option<varpulis_db::PgPool>,
249}
250
251#[derive(Debug)]
253pub struct ApiKey(pub String);
254
255impl<S> axum::extract::FromRequestParts<S> for ApiKey
256where
257 S: Send + Sync,
258{
259 type Rejection = Response;
260
261 async fn from_request_parts(
262 parts: &mut axum::http::request::Parts,
263 _state: &S,
264 ) -> Result<Self, Self::Rejection> {
265 parts
266 .headers
267 .get("x-api-key")
268 .and_then(|v| v.to_str().ok())
269 .map(|s| Self(s.to_string()))
270 .ok_or_else(|| {
271 (
272 StatusCode::UNAUTHORIZED,
273 axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
274 )
275 .into_response()
276 })
277 }
278}
279
280#[derive(Debug)]
282pub struct AdminKey(pub String);
283
284impl<S> axum::extract::FromRequestParts<S> for AdminKey
285where
286 S: Send + Sync,
287{
288 type Rejection = Response;
289
290 async fn from_request_parts(
291 parts: &mut axum::http::request::Parts,
292 _state: &S,
293 ) -> Result<Self, Self::Rejection> {
294 parts
295 .headers
296 .get("x-admin-key")
297 .and_then(|v| v.to_str().ok())
298 .map(|s| Self(s.to_string()))
299 .ok_or_else(|| {
300 (
301 StatusCode::UNAUTHORIZED,
302 axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
303 )
304 .into_response()
305 })
306 }
307}
308
309pub fn api_routes(
311 manager: SharedTenantManager,
312 admin_key: Option<String>,
313 cors_origins: Option<Vec<String>>,
314 billing_state: Option<SharedBillingState>,
315 #[cfg(feature = "saas")] db_pool: Option<varpulis_db::PgPool>,
316) -> Router {
317 let state = ApiState {
318 manager,
319 admin_key,
320 billing_state,
321 #[cfg(feature = "saas")]
322 db_pool,
323 };
324
325 let cors = build_cors(cors_origins);
326
327 Router::new()
328 .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
330 .route(
331 "/api/v1/pipelines/{pipeline_id}",
332 get(handle_get).delete(handle_delete),
333 )
334 .route(
336 "/api/v1/pipelines/{pipeline_id}/events",
337 post(handle_inject),
338 )
339 .route(
340 "/api/v1/pipelines/{pipeline_id}/events-batch",
341 post(handle_inject_batch),
342 )
343 .route(
344 "/api/v1/pipelines/{pipeline_id}/checkpoint",
345 post(handle_checkpoint),
346 )
347 .route(
348 "/api/v1/pipelines/{pipeline_id}/restore",
349 post(handle_restore),
350 )
351 .route(
352 "/api/v1/pipelines/{pipeline_id}/metrics",
353 get(handle_metrics),
354 )
355 .route(
356 "/api/v1/pipelines/{pipeline_id}/topology",
357 get(handle_topology),
358 )
359 .route(
360 "/api/v1/pipelines/{pipeline_id}/reload",
361 post(handle_reload),
362 )
363 .route("/api/v1/usage", get(handle_usage))
364 .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
365 .route(
367 "/api/v1/pipelines/{pipeline_id}/dlq",
368 get(handle_dlq_get).delete(handle_dlq_clear),
369 )
370 .route(
371 "/api/v1/pipelines/{pipeline_id}/dlq/replay",
372 post(handle_dlq_replay),
373 )
374 .route(
376 "/api/v1/tenants",
377 post(handle_create_tenant).get(handle_list_tenants),
378 )
379 .route(
380 "/api/v1/tenants/{tenant_id}",
381 get(handle_get_tenant).delete(handle_delete_tenant),
382 )
383 .layer(cors)
384 .with_state(state)
385}
386
387async fn handle_deploy(
392 State(state): State<ApiState>,
393 ApiKey(api_key): ApiKey,
394 Json(body): Json<DeployPipelineRequest>,
395) -> Response {
396 let manager = &state.manager;
397 let mut mgr = manager.write().await;
398
399 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
400 Some(id) => id.clone(),
401 None => {
402 return error_response(
403 StatusCode::UNAUTHORIZED,
404 "invalid_api_key",
405 "Invalid API key",
406 )
407 }
408 };
409
410 let pipeline_name = body.name.clone();
411 #[cfg(feature = "saas")]
412 let vpl_source = body.source.clone();
413
414 let result = mgr
415 .deploy_pipeline_on_tenant(&tenant_id, body.name, body.source)
416 .await;
417
418 match result {
419 Ok(id) => {
420 mgr.persist_if_needed(&tenant_id);
421
422 #[cfg(feature = "saas")]
424 if let Some(ref pool) = state.db_pool {
425 if let Ok(org_uuid) = tenant_id.0.parse::<uuid::Uuid>() {
426 if let Err(e) = varpulis_db::repo::create_scoped_pipeline(
427 pool,
428 org_uuid,
429 &pipeline_name,
430 &vpl_source,
431 "own",
432 )
433 .await
434 {
435 tracing::warn!("Failed to sync pipeline to DB: {}", e);
436 }
437 }
438 }
439
440 let resp = DeployPipelineResponse {
441 id,
442 name: pipeline_name,
443 status: "running".to_string(),
444 };
445 (StatusCode::CREATED, axum::Json(&resp)).into_response()
446 }
447 Err(e) => tenant_error_response(e),
448 }
449}
450
451async fn handle_list(
452 State(state): State<ApiState>,
453 ApiKey(api_key): ApiKey,
454 Query(pagination): Query<PaginationParams>,
455) -> Response {
456 let manager = &state.manager;
457 if pagination.exceeds_max() {
458 return error_response(
459 StatusCode::BAD_REQUEST,
460 "invalid_limit",
461 &format!("limit must not exceed {MAX_LIMIT}"),
462 );
463 }
464
465 let mgr = manager.read().await;
466
467 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
468 Some(id) => id.clone(),
469 None => {
470 return error_response(
471 StatusCode::UNAUTHORIZED,
472 "invalid_api_key",
473 "Invalid API key",
474 )
475 }
476 };
477
478 let tenant = match mgr.get_tenant(&tenant_id) {
479 Some(t) => t,
480 None => {
481 return error_response(
482 StatusCode::NOT_FOUND,
483 "tenant_not_found",
484 "Tenant not found",
485 )
486 }
487 };
488
489 let all_pipelines: Vec<PipelineInfo> = tenant
490 .pipelines
491 .values()
492 .map(|p| {
493 let is_global = p.global_template_id.is_some();
494 PipelineInfo {
495 id: p.id.clone(),
496 name: p.name.clone(),
497 status: p.status.to_string(),
498 source: p.source.clone(),
499 uptime_secs: p.created_at.elapsed().as_secs(),
500 global_template_id: p.global_template_id.clone(),
501 scope_level: if is_global {
502 "global".to_string()
503 } else {
504 "own".to_string()
505 },
506 inherited_from_org_id: None,
507 read_only: is_global,
508 }
509 })
510 .collect();
511
512 let (pipelines, meta) = pagination.paginate(all_pipelines);
513 let total = meta.total;
514 let resp = PipelineListResponse {
515 pipelines,
516 total,
517 pagination: Some(meta),
518 };
519 axum::Json(&resp).into_response()
520}
521
522async fn handle_get(
523 State(state): State<ApiState>,
524 Path(pipeline_id): Path<String>,
525 ApiKey(api_key): ApiKey,
526) -> Response {
527 let manager = &state.manager;
528 let mgr = manager.read().await;
529
530 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
531 Some(id) => id.clone(),
532 None => {
533 return error_response(
534 StatusCode::UNAUTHORIZED,
535 "invalid_api_key",
536 "Invalid API key",
537 )
538 }
539 };
540
541 let tenant = match mgr.get_tenant(&tenant_id) {
542 Some(t) => t,
543 None => {
544 return error_response(
545 StatusCode::NOT_FOUND,
546 "tenant_not_found",
547 "Tenant not found",
548 )
549 }
550 };
551
552 match tenant.pipelines.get(&pipeline_id) {
553 Some(p) => {
554 let is_global = p.global_template_id.is_some();
555 let info = PipelineInfo {
556 id: p.id.clone(),
557 name: p.name.clone(),
558 status: p.status.to_string(),
559 source: p.source.clone(),
560 uptime_secs: p.created_at.elapsed().as_secs(),
561 global_template_id: p.global_template_id.clone(),
562 scope_level: if is_global {
563 "global".to_string()
564 } else {
565 "own".to_string()
566 },
567 inherited_from_org_id: None,
568 read_only: is_global,
569 };
570 axum::Json(&info).into_response()
571 }
572 None => error_response(
573 StatusCode::NOT_FOUND,
574 "pipeline_not_found",
575 "Pipeline not found",
576 ),
577 }
578}
579
580async fn handle_delete(
581 State(state): State<ApiState>,
582 Path(pipeline_id): Path<String>,
583 ApiKey(api_key): ApiKey,
584) -> Response {
585 let manager = &state.manager;
586 let mut mgr = manager.write().await;
587
588 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
589 Some(id) => id.clone(),
590 None => {
591 return error_response(
592 StatusCode::UNAUTHORIZED,
593 "invalid_api_key",
594 "Invalid API key",
595 )
596 }
597 };
598
599 #[cfg(feature = "saas")]
600 let mut pipeline_name_for_db = None;
601
602 let result = {
603 let tenant = match mgr.get_tenant_mut(&tenant_id) {
604 Some(t) => t,
605 None => {
606 return error_response(
607 StatusCode::NOT_FOUND,
608 "tenant_not_found",
609 "Tenant not found",
610 )
611 }
612 };
613
614 if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
616 if pipeline.global_template_id.is_some() {
617 return error_response(
618 StatusCode::FORBIDDEN,
619 "global_pipeline_protected",
620 "Global pipelines can only be managed by admin",
621 );
622 }
623 #[cfg(feature = "saas")]
624 {
625 pipeline_name_for_db = Some(pipeline.name.clone());
626 }
627 }
628
629 tenant.remove_pipeline(&pipeline_id)
630 };
631
632 match result {
633 Ok(()) => {
634 mgr.persist_if_needed(&tenant_id);
635
636 #[cfg(feature = "saas")]
638 if let Some(ref pool) = state.db_pool {
639 if let (Ok(org_uuid), Some(name)) =
640 (tenant_id.0.parse::<uuid::Uuid>(), &pipeline_name_for_db)
641 {
642 let _ = varpulis_db::repo::delete_pipeline_by_name(pool, org_uuid, name).await;
643 }
644 }
645
646 axum::Json(serde_json::json!({"deleted": true})).into_response()
647 }
648 Err(e) => tenant_error_response(e),
649 }
650}
651
652async fn handle_inject(
653 State(state): State<ApiState>,
654 Path(pipeline_id): Path<String>,
655 ApiKey(api_key): ApiKey,
656 Json(body): Json<InjectEventRequest>,
657) -> Response {
658 let manager = &state.manager;
659 let billing_state = &state.billing_state;
660 #[cfg(feature = "saas")]
662 let mut usage_warning: Option<f64> = None;
663 #[cfg(feature = "saas")]
664 if let Some(ref bs) = billing_state {
665 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
666 match bs.check_usage_limit(org_id, 1).await {
667 crate::billing::UsageCheckResult::Exceeded(err) => {
668 return crate::billing::usage_limit_response(&err);
669 }
670 crate::billing::UsageCheckResult::ApproachingLimit { usage_percent } => {
671 usage_warning = Some(usage_percent);
672 }
673 crate::billing::UsageCheckResult::Ok => {}
674 }
675 bs.usage.write().await.record_events(org_id, 1);
677 }
678 }
679 #[cfg(not(feature = "saas"))]
680 let _ = &billing_state;
681
682 let mut mgr = manager.write().await;
683
684 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
685 Some(id) => id.clone(),
686 None => {
687 return error_response(
688 StatusCode::UNAUTHORIZED,
689 "invalid_api_key",
690 "Invalid API key",
691 )
692 }
693 };
694
695 if let Err(e) = mgr.check_backpressure() {
697 return tenant_error_response(e);
698 }
699
700 let mut event = Event::new(body.event_type.clone());
701 for (key, value) in &body.fields {
702 let v = json_to_runtime_value(value);
703 event = event.with_field(key.as_str(), v);
704 }
705
706 match mgr
707 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
708 .await
709 {
710 Ok(output_events) => {
711 let events_json: Vec<serde_json::Value> = output_events
712 .iter()
713 .map(|e| {
714 let mut fields = serde_json::Map::new();
715 for (k, v) in &e.data {
716 fields.insert(k.to_string(), crate::websocket::value_to_json(v));
717 }
718 serde_json::json!({
719 "event_type": e.event_type.to_string(),
720 "fields": serde_json::Value::Object(fields),
721 })
722 })
723 .collect();
724 let response = serde_json::json!({
725 "accepted": true,
726 "output_events": events_json,
727 });
728 #[cfg(feature = "saas")]
729 if let Some(pct) = usage_warning {
730 return (
731 StatusCode::OK,
732 [("X-Usage-Warning", format!("approaching_limit ({pct:.0}%)"))],
733 axum::Json(response),
734 )
735 .into_response();
736 }
737 axum::Json(response).into_response()
738 }
739 Err(e) => tenant_error_response(e),
740 }
741}
742
743async fn handle_inject_batch(
744 State(state): State<ApiState>,
745 Path(pipeline_id): Path<String>,
746 ApiKey(api_key): ApiKey,
747 Json(body): Json<InjectBatchRequest>,
748) -> Response {
749 let manager = &state.manager;
750 let billing_state = &state.billing_state;
751 let event_count = body.events.len() as i64;
752
753 #[cfg(feature = "saas")]
755 if let Some(ref bs) = billing_state {
756 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
757 match bs.check_usage_limit(org_id, event_count).await {
758 crate::billing::UsageCheckResult::Exceeded(err) => {
759 return crate::billing::usage_limit_response(&err);
760 }
761 crate::billing::UsageCheckResult::ApproachingLimit { .. }
762 | crate::billing::UsageCheckResult::Ok => {}
763 }
764 bs.usage.write().await.record_events(org_id, event_count);
766 }
767 }
768 #[cfg(not(feature = "saas"))]
769 let _ = (&billing_state, event_count);
770
771 let mut mgr = manager.write().await;
772
773 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
774 Some(id) => id.clone(),
775 None => {
776 return error_response(
777 StatusCode::UNAUTHORIZED,
778 "invalid_api_key",
779 "Invalid API key",
780 )
781 }
782 };
783
784 if let Err(e) = mgr.check_backpressure() {
786 return tenant_error_response(e);
787 }
788
789 let start = std::time::Instant::now();
790 let mut accepted = 0usize;
791 let mut output_events = Vec::new();
792
793 for req in body.events {
794 let mut event = Event::new(req.event_type.clone());
795 for (key, value) in &req.fields {
796 let v = json_to_runtime_value(value);
797 event = event.with_field(key.as_str(), v);
798 }
799
800 match mgr
801 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
802 .await
803 {
804 Ok(outputs) => {
805 accepted += 1;
806 for e in &outputs {
807 let mut flat = serde_json::Map::new();
808 flat.insert(
809 "event_type".to_string(),
810 serde_json::Value::String(e.event_type.to_string()),
811 );
812 for (k, v) in &e.data {
813 flat.insert(k.to_string(), crate::websocket::value_to_json(v));
814 }
815 output_events.push(serde_json::Value::Object(flat));
816 }
817 }
818 Err(TenantError::BackpressureExceeded { .. }) => {
819 break;
821 }
822 Err(_) => {
823 }
825 }
826 }
827
828 let processing_time_us = start.elapsed().as_micros() as u64;
829
830 let resp = InjectBatchResponse {
831 accepted,
832 output_events,
833 processing_time_us,
834 };
835 axum::Json(&resp).into_response()
836}
837
838async fn handle_checkpoint(
839 State(state): State<ApiState>,
840 Path(pipeline_id): Path<String>,
841 ApiKey(api_key): ApiKey,
842) -> Response {
843 let manager = &state.manager;
844 let mgr = manager.read().await;
845
846 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
847 Some(id) => id.clone(),
848 None => {
849 return error_response(
850 StatusCode::UNAUTHORIZED,
851 "invalid_api_key",
852 "Invalid API key",
853 )
854 }
855 };
856
857 let tenant = match mgr.get_tenant(&tenant_id) {
858 Some(t) => t,
859 None => {
860 return error_response(
861 StatusCode::NOT_FOUND,
862 "tenant_not_found",
863 "Tenant not found",
864 )
865 }
866 };
867
868 match tenant.checkpoint_pipeline(&pipeline_id).await {
869 Ok(checkpoint) => {
870 let resp = CheckpointResponse {
871 pipeline_id,
872 events_processed: checkpoint.events_processed,
873 checkpoint,
874 };
875 axum::Json(&resp).into_response()
876 }
877 Err(e) => tenant_error_response(e),
878 }
879}
880
881async fn handle_restore(
882 State(state): State<ApiState>,
883 Path(pipeline_id): Path<String>,
884 ApiKey(api_key): ApiKey,
885 Json(body): Json<RestoreRequest>,
886) -> Response {
887 let manager = &state.manager;
888 let mut mgr = manager.write().await;
889
890 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
891 Some(id) => id.clone(),
892 None => {
893 return error_response(
894 StatusCode::UNAUTHORIZED,
895 "invalid_api_key",
896 "Invalid API key",
897 )
898 }
899 };
900
901 let tenant = match mgr.get_tenant_mut(&tenant_id) {
902 Some(t) => t,
903 None => {
904 return error_response(
905 StatusCode::NOT_FOUND,
906 "tenant_not_found",
907 "Tenant not found",
908 )
909 }
910 };
911
912 match tenant
913 .restore_pipeline(&pipeline_id, &body.checkpoint)
914 .await
915 {
916 Ok(()) => {
917 let resp = RestoreResponse {
918 pipeline_id,
919 restored: true,
920 events_restored: body.checkpoint.events_processed,
921 };
922 axum::Json(&resp).into_response()
923 }
924 Err(e) => tenant_error_response(e),
925 }
926}
927
928async fn handle_metrics(
929 State(state): State<ApiState>,
930 Path(pipeline_id): Path<String>,
931 ApiKey(api_key): ApiKey,
932) -> Response {
933 let manager = &state.manager;
934 let mgr = manager.read().await;
935
936 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
937 Some(id) => id.clone(),
938 None => {
939 return error_response(
940 StatusCode::UNAUTHORIZED,
941 "invalid_api_key",
942 "Invalid API key",
943 )
944 }
945 };
946
947 let tenant = match mgr.get_tenant(&tenant_id) {
948 Some(t) => t,
949 None => {
950 return error_response(
951 StatusCode::NOT_FOUND,
952 "tenant_not_found",
953 "Tenant not found",
954 )
955 }
956 };
957
958 if !tenant.pipelines.contains_key(&pipeline_id) {
959 return error_response(
960 StatusCode::NOT_FOUND,
961 "pipeline_not_found",
962 "Pipeline not found",
963 );
964 }
965
966 let resp = PipelineMetricsResponse {
967 pipeline_id,
968 events_processed: tenant.usage.events_processed,
969 output_events_emitted: tenant.usage.output_events_emitted,
970 };
971 axum::Json(&resp).into_response()
972}
973
974async fn handle_topology(
975 State(state): State<ApiState>,
976 Path(pipeline_id): Path<String>,
977 ApiKey(api_key): ApiKey,
978) -> Response {
979 let manager = &state.manager;
980 let mgr = manager.read().await;
981
982 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
983 Some(id) => id.clone(),
984 None => {
985 return error_response(
986 StatusCode::UNAUTHORIZED,
987 "invalid_api_key",
988 "Invalid API key",
989 )
990 }
991 };
992
993 let tenant = match mgr.get_tenant(&tenant_id) {
994 Some(t) => t,
995 None => {
996 return error_response(
997 StatusCode::NOT_FOUND,
998 "tenant_not_found",
999 "Tenant not found",
1000 )
1001 }
1002 };
1003
1004 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1005 Some(p) => p,
1006 None => {
1007 return error_response(
1008 StatusCode::NOT_FOUND,
1009 "pipeline_not_found",
1010 "Pipeline not found",
1011 )
1012 }
1013 };
1014
1015 let engine = pipeline.engine.lock().await;
1016 let topology = engine.topology();
1017 axum::Json(&topology).into_response()
1018}
1019
1020async fn handle_reload(
1021 State(state): State<ApiState>,
1022 Path(pipeline_id): Path<String>,
1023 ApiKey(api_key): ApiKey,
1024 Json(body): Json<ReloadPipelineRequest>,
1025) -> Response {
1026 let manager = &state.manager;
1027 let mut mgr = manager.write().await;
1028
1029 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1030 Some(id) => id.clone(),
1031 None => {
1032 return error_response(
1033 StatusCode::UNAUTHORIZED,
1034 "invalid_api_key",
1035 "Invalid API key",
1036 )
1037 }
1038 };
1039
1040 let result = {
1041 let tenant = match mgr.get_tenant_mut(&tenant_id) {
1042 Some(t) => t,
1043 None => {
1044 return error_response(
1045 StatusCode::NOT_FOUND,
1046 "tenant_not_found",
1047 "Tenant not found",
1048 )
1049 }
1050 };
1051
1052 if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
1054 if pipeline.global_template_id.is_some() {
1055 return error_response(
1056 StatusCode::FORBIDDEN,
1057 "global_pipeline_protected",
1058 "Global pipelines can only be managed by admin",
1059 );
1060 }
1061 }
1062
1063 tenant.reload_pipeline(&pipeline_id, body.source).await
1064 };
1065
1066 match result {
1067 Ok(()) => {
1068 mgr.persist_if_needed(&tenant_id);
1069 axum::Json(serde_json::json!({"reloaded": true})).into_response()
1070 }
1071 Err(e) => tenant_error_response(e),
1072 }
1073}
1074
1075async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
1076 let manager = &state.manager;
1077 let mgr = manager.read().await;
1078
1079 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1080 Some(id) => id.clone(),
1081 None => {
1082 return error_response(
1083 StatusCode::UNAUTHORIZED,
1084 "invalid_api_key",
1085 "Invalid API key",
1086 )
1087 }
1088 };
1089
1090 let tenant = match mgr.get_tenant(&tenant_id) {
1091 Some(t) => t,
1092 None => {
1093 return error_response(
1094 StatusCode::NOT_FOUND,
1095 "tenant_not_found",
1096 "Tenant not found",
1097 )
1098 }
1099 };
1100
1101 let resp = UsageResponse {
1102 tenant_id: tenant.id.to_string(),
1103 events_processed: tenant.usage.events_processed,
1104 output_events_emitted: tenant.usage.output_events_emitted,
1105 active_pipelines: tenant.usage.active_pipelines,
1106 quota: QuotaInfo {
1107 max_pipelines: tenant.quota.max_pipelines,
1108 max_events_per_second: tenant.quota.max_events_per_second,
1109 max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
1110 },
1111 };
1112 axum::Json(&resp).into_response()
1113}
1114
1115async fn handle_logs(
1117 State(state): State<ApiState>,
1118 Path(pipeline_id): Path<String>,
1119 ApiKey(api_key): ApiKey,
1120) -> Response {
1121 let manager = &state.manager;
1122 let mgr = manager.read().await;
1123
1124 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1125 Some(id) => id.clone(),
1126 None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
1127 };
1128
1129 let tenant = match mgr.get_tenant(&tenant_id) {
1131 Some(t) => t,
1132 None => {
1133 return error_response(
1134 StatusCode::NOT_FOUND,
1135 "tenant_not_found",
1136 "Tenant not found",
1137 )
1138 }
1139 };
1140
1141 let rx: tokio::sync::broadcast::Receiver<Event> =
1142 match tenant.subscribe_pipeline_logs(&pipeline_id) {
1143 Ok(rx) => rx,
1144 Err(_) => {
1145 return error_response(
1146 StatusCode::NOT_FOUND,
1147 "pipeline_not_found",
1148 &format!("Pipeline {pipeline_id} not found"),
1149 )
1150 }
1151 };
1152
1153 drop(mgr); let stream = stream::unfold(rx, |mut rx| async move {
1157 match rx.recv().await {
1158 Ok(event) => {
1159 let data: serde_json::Map<String, serde_json::Value> = event
1160 .data
1161 .iter()
1162 .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
1163 (k.to_string(), json_from_value(v))
1164 })
1165 .collect();
1166 let json = serde_json::to_string(&LogEvent {
1167 event_type: event.event_type.to_string(),
1168 timestamp: event.timestamp.to_rfc3339(),
1169 data,
1170 })
1171 .unwrap_or_default();
1172 let sse = axum::response::sse::Event::default().data(json);
1173 Some((Ok::<_, Infallible>(sse), rx))
1174 }
1175 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1176 let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1177 let sse = axum::response::sse::Event::default()
1178 .event("warning")
1179 .data(msg);
1180 Some((Ok(sse), rx))
1181 }
1182 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1183 }
1184 });
1185
1186 axum::response::sse::Sse::new(stream)
1187 .keep_alive(axum::response::sse::KeepAlive::default())
1188 .into_response()
1189}
1190
1191#[derive(Serialize)]
1192struct LogEvent {
1193 event_type: String,
1194 timestamp: String,
1195 data: serde_json::Map<String, serde_json::Value>,
1196}
1197
1198fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1199 match v {
1200 varpulis_core::Value::Null => serde_json::Value::Null,
1201 varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1202 varpulis_core::Value::Int(i) => serde_json::json!(*i),
1203 varpulis_core::Value::Float(f) => serde_json::json!(*f),
1204 varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1205 varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1206 varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1207 varpulis_core::Value::Array(arr) => {
1208 serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1209 }
1210 varpulis_core::Value::Map(map) => {
1211 let obj: serde_json::Map<String, serde_json::Value> = map
1212 .iter()
1213 .map(|(k, v)| (k.to_string(), json_from_value(v)))
1214 .collect();
1215 serde_json::Value::Object(obj)
1216 }
1217 }
1218}
1219
1220async fn handle_dlq_get(
1225 State(state): State<ApiState>,
1226 Path(pipeline_id): Path<String>,
1227 ApiKey(api_key): ApiKey,
1228 Query(params): Query<DlqQueryParams>,
1229) -> Response {
1230 let manager = &state.manager;
1231 let mgr = manager.read().await;
1232
1233 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1234 Some(id) => id.clone(),
1235 None => {
1236 return error_response(
1237 StatusCode::UNAUTHORIZED,
1238 "invalid_api_key",
1239 "Invalid API key",
1240 )
1241 }
1242 };
1243
1244 let tenant = match mgr.get_tenant(&tenant_id) {
1245 Some(t) => t,
1246 None => {
1247 return error_response(
1248 StatusCode::NOT_FOUND,
1249 "tenant_not_found",
1250 "Tenant not found",
1251 )
1252 }
1253 };
1254
1255 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1256 Some(p) => p,
1257 None => {
1258 return error_response(
1259 StatusCode::NOT_FOUND,
1260 "pipeline_not_found",
1261 "Pipeline not found",
1262 )
1263 }
1264 };
1265
1266 let engine = pipeline.engine.lock().await;
1267 let dlq = match engine.dlq() {
1268 Some(d) => d,
1269 None => {
1270 let resp = DlqEntriesResponse {
1271 entries: Vec::new(),
1272 total: 0,
1273 };
1274 return axum::Json(&resp).into_response();
1275 }
1276 };
1277
1278 let offset = params.offset.unwrap_or(0);
1279 let limit = params.limit.unwrap_or(100).min(1000);
1280
1281 match dlq.read_entries(offset, limit) {
1282 Ok(entries) => {
1283 let resp = DlqEntriesResponse {
1284 total: dlq.line_count(),
1285 entries,
1286 };
1287 axum::Json(&resp).into_response()
1288 }
1289 Err(e) => error_response(
1290 StatusCode::INTERNAL_SERVER_ERROR,
1291 "dlq_read_error",
1292 &format!("Failed to read DLQ: {e}"),
1293 ),
1294 }
1295}
1296
1297async fn handle_dlq_replay(
1298 State(state): State<ApiState>,
1299 Path(pipeline_id): Path<String>,
1300 ApiKey(api_key): ApiKey,
1301) -> Response {
1302 let manager = &state.manager;
1303 let mgr = manager.read().await;
1304
1305 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1306 Some(id) => id.clone(),
1307 None => {
1308 return error_response(
1309 StatusCode::UNAUTHORIZED,
1310 "invalid_api_key",
1311 "Invalid API key",
1312 )
1313 }
1314 };
1315
1316 let tenant = match mgr.get_tenant(&tenant_id) {
1317 Some(t) => t,
1318 None => {
1319 return error_response(
1320 StatusCode::NOT_FOUND,
1321 "tenant_not_found",
1322 "Tenant not found",
1323 )
1324 }
1325 };
1326
1327 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1328 Some(p) => p,
1329 None => {
1330 return error_response(
1331 StatusCode::NOT_FOUND,
1332 "pipeline_not_found",
1333 "Pipeline not found",
1334 )
1335 }
1336 };
1337
1338 let entries = {
1340 let engine = pipeline.engine.lock().await;
1341 let dlq = match engine.dlq() {
1342 Some(d) => d,
1343 None => {
1344 let resp = DlqReplayResponse { replayed: 0 };
1345 return axum::Json(&resp).into_response();
1346 }
1347 };
1348 match dlq.read_entries(0, 100_000) {
1350 Ok(entries) => entries,
1351 Err(e) => {
1352 return error_response(
1353 StatusCode::INTERNAL_SERVER_ERROR,
1354 "dlq_read_error",
1355 &format!("Failed to read DLQ: {e}"),
1356 )
1357 }
1358 }
1359 };
1360
1361 let mut replayed = 0usize;
1363 {
1364 let mut engine = pipeline.engine.lock().await;
1365 for entry in &entries {
1366 let event_type = entry
1368 .event
1369 .get("event_type")
1370 .and_then(|v| v.as_str())
1371 .unwrap_or("unknown");
1372 let mut event = Event::new(event_type);
1373 if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1374 for (k, v) in data {
1375 let rv = json_to_runtime_value(v);
1376 event = event.with_field(k.as_str(), rv);
1377 }
1378 }
1379 if engine.process(event).await.is_ok() {
1380 replayed += 1;
1381 }
1382 }
1383 }
1384
1385 let resp = DlqReplayResponse { replayed };
1386 axum::Json(&resp).into_response()
1387}
1388
1389async fn handle_dlq_clear(
1390 State(state): State<ApiState>,
1391 Path(pipeline_id): Path<String>,
1392 ApiKey(api_key): ApiKey,
1393) -> Response {
1394 let manager = &state.manager;
1395 let mgr = manager.read().await;
1396
1397 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1398 Some(id) => id.clone(),
1399 None => {
1400 return error_response(
1401 StatusCode::UNAUTHORIZED,
1402 "invalid_api_key",
1403 "Invalid API key",
1404 )
1405 }
1406 };
1407
1408 let tenant = match mgr.get_tenant(&tenant_id) {
1409 Some(t) => t,
1410 None => {
1411 return error_response(
1412 StatusCode::NOT_FOUND,
1413 "tenant_not_found",
1414 "Tenant not found",
1415 )
1416 }
1417 };
1418
1419 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1420 Some(p) => p,
1421 None => {
1422 return error_response(
1423 StatusCode::NOT_FOUND,
1424 "pipeline_not_found",
1425 "Pipeline not found",
1426 )
1427 }
1428 };
1429
1430 let engine = pipeline.engine.lock().await;
1431 match engine.dlq() {
1432 Some(dlq) => match dlq.clear() {
1433 Ok(()) => {
1434 let resp = DlqClearResponse { cleared: true };
1435 axum::Json(&resp).into_response()
1436 }
1437 Err(e) => error_response(
1438 StatusCode::INTERNAL_SERVER_ERROR,
1439 "dlq_clear_error",
1440 &format!("Failed to clear DLQ: {e}"),
1441 ),
1442 },
1443 None => {
1444 let resp = DlqClearResponse { cleared: true };
1445 axum::Json(&resp).into_response()
1446 }
1447 }
1448}
1449
1450#[allow(clippy::result_large_err)]
1457fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1458 match configured {
1459 None => Err(error_response(
1460 StatusCode::FORBIDDEN,
1461 "admin_disabled",
1462 "Admin API is disabled (no --api-key configured)",
1463 )),
1464 Some(key) => {
1465 if constant_time_compare(key, provided) {
1466 Ok(())
1467 } else {
1468 Err(error_response(
1469 StatusCode::UNAUTHORIZED,
1470 "invalid_admin_key",
1471 "Invalid admin key",
1472 ))
1473 }
1474 }
1475 }
1476}
1477
1478fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1479 match tier {
1480 Some("free") => TenantQuota::free(),
1481 Some("pro") => TenantQuota::pro(),
1482 Some("enterprise") => TenantQuota::enterprise(),
1483 _ => TenantQuota::default(),
1484 }
1485}
1486
1487async fn handle_create_tenant(
1488 State(state): State<ApiState>,
1489 AdminKey(admin_key): AdminKey,
1490 Json(body): Json<CreateTenantRequest>,
1491) -> Response {
1492 let manager = &state.manager;
1493 let configured_key = &state.admin_key;
1494 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1495 return resp;
1496 }
1497
1498 let api_key = uuid::Uuid::new_v4().to_string();
1499 let quota = quota_from_tier(body.quota_tier.as_deref());
1500
1501 let mut mgr = manager.write().await;
1502 match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1503 Ok(tenant_id) => {
1504 let resp = TenantResponse {
1505 id: tenant_id.as_str().to_string(),
1506 name: body.name,
1507 api_key,
1508 quota: QuotaInfo {
1509 max_pipelines: quota.max_pipelines,
1510 max_events_per_second: quota.max_events_per_second,
1511 max_streams_per_pipeline: quota.max_streams_per_pipeline,
1512 },
1513 };
1514 (StatusCode::CREATED, axum::Json(&resp)).into_response()
1515 }
1516 Err(e) => tenant_error_response(e),
1517 }
1518}
1519
1520async fn handle_list_tenants(
1521 State(state): State<ApiState>,
1522 AdminKey(admin_key): AdminKey,
1523 Query(pagination): Query<PaginationParams>,
1524) -> Response {
1525 let manager = &state.manager;
1526 let configured_key = &state.admin_key;
1527 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1528 return resp;
1529 }
1530
1531 if pagination.exceeds_max() {
1532 return error_response(
1533 StatusCode::BAD_REQUEST,
1534 "invalid_limit",
1535 &format!("limit must not exceed {MAX_LIMIT}"),
1536 );
1537 }
1538
1539 let mgr = manager.read().await;
1540 let all_tenants: Vec<TenantResponse> = mgr
1541 .list_tenants()
1542 .iter()
1543 .map(|t| TenantResponse {
1544 id: t.id.as_str().to_string(),
1545 name: t.name.clone(),
1546 api_key: format!("{}...", &t.api_key_hash[..8]),
1547 quota: QuotaInfo {
1548 max_pipelines: t.quota.max_pipelines,
1549 max_events_per_second: t.quota.max_events_per_second,
1550 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1551 },
1552 })
1553 .collect();
1554 let (tenants, meta) = pagination.paginate(all_tenants);
1555 let total = meta.total;
1556 let resp = TenantListResponse {
1557 tenants,
1558 total,
1559 pagination: Some(meta),
1560 };
1561 axum::Json(&resp).into_response()
1562}
1563
1564async fn handle_get_tenant(
1565 State(state): State<ApiState>,
1566 Path(tenant_id_str): Path<String>,
1567 AdminKey(admin_key): AdminKey,
1568) -> Response {
1569 let manager = &state.manager;
1570 let configured_key = &state.admin_key;
1571 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1572 return resp;
1573 }
1574
1575 let mgr = manager.read().await;
1576 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1577 match mgr.get_tenant(&tenant_id) {
1578 Some(t) => {
1579 let resp = TenantDetailResponse {
1580 id: t.id.as_str().to_string(),
1581 name: t.name.clone(),
1582 api_key: format!("{}...", &t.api_key_hash[..8]),
1583 quota: QuotaInfo {
1584 max_pipelines: t.quota.max_pipelines,
1585 max_events_per_second: t.quota.max_events_per_second,
1586 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1587 },
1588 usage: TenantUsageInfo {
1589 events_processed: t.usage.events_processed,
1590 output_events_emitted: t.usage.output_events_emitted,
1591 active_pipelines: t.usage.active_pipelines,
1592 },
1593 pipeline_count: t.pipelines.len(),
1594 };
1595 axum::Json(&resp).into_response()
1596 }
1597 None => error_response(
1598 StatusCode::NOT_FOUND,
1599 "tenant_not_found",
1600 "Tenant not found",
1601 ),
1602 }
1603}
1604
1605async fn handle_delete_tenant(
1606 State(state): State<ApiState>,
1607 Path(tenant_id_str): Path<String>,
1608 AdminKey(admin_key): AdminKey,
1609) -> Response {
1610 let manager = &state.manager;
1611 let configured_key = &state.admin_key;
1612 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1613 return resp;
1614 }
1615
1616 let mut mgr = manager.write().await;
1617 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1618 match mgr.remove_tenant(&tenant_id) {
1619 Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1620 Err(e) => tenant_error_response(e),
1621 }
1622}
1623
1624fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1629 let body = ApiError {
1630 error: message.to_string(),
1631 code: code.to_string(),
1632 };
1633 (status, axum::Json(body)).into_response()
1634}
1635
1636fn tenant_error_response(err: TenantError) -> Response {
1637 if let TenantError::BackpressureExceeded { current, max } = &err {
1639 let body = serde_json::json!({
1640 "error": format!("queue depth {current} exceeds maximum {max}"),
1641 "code": "queue_depth_exceeded",
1642 "retry_after": 1,
1643 });
1644 return (
1645 StatusCode::TOO_MANY_REQUESTS,
1646 [("Retry-After", "1"), ("Content-Type", "application/json")],
1647 serde_json::to_string(&body).unwrap_or_default(),
1648 )
1649 .into_response();
1650 }
1651
1652 let (status, code) = match &err {
1653 TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1654 TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1655 TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1656 TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1657 TenantError::BackpressureExceeded { .. } => unreachable!(),
1658 TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1659 TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1660 TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1661 };
1662 error_response(status, code, &err.to_string())
1663}
1664
1665fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1666 match v {
1667 serde_json::Value::Null => varpulis_core::Value::Null,
1668 serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1669 serde_json::Value::Number(n) => {
1670 if let Some(i) = n.as_i64() {
1671 varpulis_core::Value::Int(i)
1672 } else if let Some(f) = n.as_f64() {
1673 varpulis_core::Value::Float(f)
1674 } else {
1675 varpulis_core::Value::Null
1676 }
1677 }
1678 serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1679 serde_json::Value::Array(arr) => {
1680 varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1681 }
1682 serde_json::Value::Object(map) => {
1683 let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1684 IndexMap::with_hasher(FxBuildHasher);
1685 for (k, v) in map {
1686 m.insert(k.as_str().into(), json_to_runtime_value(v));
1687 }
1688 varpulis_core::Value::map(m)
1689 }
1690 }
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695 use std::sync::Arc;
1696
1697 use axum::body::Body;
1698 use axum::http::Request;
1699 use tokio::sync::RwLock;
1700 use tower::ServiceExt;
1701 use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1702
1703 use super::*;
1704
1705 struct TestResponse {
1707 status: StatusCode,
1708 body: bytes::Bytes,
1709 headers: axum::http::HeaderMap,
1710 }
1711
1712 impl TestResponse {
1713 fn status(&self) -> StatusCode {
1714 self.status
1715 }
1716 fn body(&self) -> &[u8] {
1717 &self.body
1718 }
1719 fn headers(&self) -> &axum::http::HeaderMap {
1720 &self.headers
1721 }
1722 }
1723
1724 struct TestRequestBuilder {
1726 method: String,
1727 path: String,
1728 headers: Vec<(String, String)>,
1729 body: Option<String>,
1730 }
1731
1732 impl TestRequestBuilder {
1733 fn new() -> Self {
1734 Self {
1735 method: "GET".to_string(),
1736 path: "/".to_string(),
1737 headers: Vec::new(),
1738 body: None,
1739 }
1740 }
1741 fn method(mut self, m: &str) -> Self {
1742 self.method = m.to_string();
1743 self
1744 }
1745 fn path(mut self, p: &str) -> Self {
1746 self.path = p.to_string();
1747 self
1748 }
1749 fn header(mut self, k: &str, v: &str) -> Self {
1750 self.headers.push((k.to_string(), v.to_string()));
1751 self
1752 }
1753 fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1754 self.body = Some(serde_json::to_string(body).unwrap());
1755 self.headers
1756 .push(("content-type".to_string(), "application/json".to_string()));
1757 self
1758 }
1759 async fn reply(self, app: &Router) -> TestResponse {
1760 let mut builder = Request::builder()
1761 .method(self.method.as_str())
1762 .uri(&self.path);
1763 for (k, v) in &self.headers {
1764 builder = builder.header(k.as_str(), v.as_str());
1765 }
1766 let body = match self.body {
1767 Some(b) => Body::from(b),
1768 None => Body::empty(),
1769 };
1770 let req = builder.body(body).unwrap();
1771 let resp = app.clone().oneshot(req).await.unwrap();
1772 let status = resp.status();
1773 let headers = resp.headers().clone();
1774 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1775 .await
1776 .unwrap();
1777 TestResponse {
1778 status,
1779 body,
1780 headers,
1781 }
1782 }
1783 }
1784
1785 fn test_request() -> TestRequestBuilder {
1787 TestRequestBuilder::new()
1788 }
1789
1790 async fn setup_test_manager() -> SharedTenantManager {
1791 let mut mgr = TenantManager::new();
1792 let id = mgr
1793 .create_tenant(
1794 "Test Corp".into(),
1795 "test-key-123".into(),
1796 TenantQuota::default(),
1797 )
1798 .unwrap();
1799
1800 let tenant = mgr.get_tenant_mut(&id).unwrap();
1802 tenant
1803 .deploy_pipeline(
1804 "Test Pipeline".into(),
1805 "stream A = SensorReading .where(x > 1)".into(),
1806 )
1807 .await
1808 .unwrap();
1809
1810 Arc::new(RwLock::new(mgr))
1811 }
1812
1813 #[tokio::test]
1814 async fn test_deploy_pipeline() {
1815 let mgr = setup_test_manager().await;
1816 let routes = api_routes(mgr, None, None, None);
1817
1818 let resp = test_request()
1819 .method("POST")
1820 .path("/api/v1/pipelines")
1821 .header("x-api-key", "test-key-123")
1822 .json(&DeployPipelineRequest {
1823 name: "New Pipeline".into(),
1824 source: "stream B = Events .where(y > 10)".into(),
1825 })
1826 .reply(&routes)
1827 .await;
1828
1829 assert_eq!(resp.status(), StatusCode::CREATED);
1830 let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1831 assert_eq!(body.name, "New Pipeline");
1832 assert_eq!(body.status, "running");
1833 }
1834
1835 #[tokio::test]
1836 async fn test_deploy_invalid_api_key() {
1837 let mgr = setup_test_manager().await;
1838 let routes = api_routes(mgr, None, None, None);
1839
1840 let resp = test_request()
1841 .method("POST")
1842 .path("/api/v1/pipelines")
1843 .header("x-api-key", "wrong-key")
1844 .json(&DeployPipelineRequest {
1845 name: "Bad".into(),
1846 source: "stream X = Y .where(z > 1)".into(),
1847 })
1848 .reply(&routes)
1849 .await;
1850
1851 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1852 }
1853
1854 #[tokio::test]
1855 async fn test_deploy_invalid_vpl() {
1856 let mgr = setup_test_manager().await;
1857 let routes = api_routes(mgr, None, None, None);
1858
1859 let resp = test_request()
1860 .method("POST")
1861 .path("/api/v1/pipelines")
1862 .header("x-api-key", "test-key-123")
1863 .json(&DeployPipelineRequest {
1864 name: "Bad VPL".into(),
1865 source: "this is not valid {{{".into(),
1866 })
1867 .reply(&routes)
1868 .await;
1869
1870 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1871 }
1872
1873 #[tokio::test]
1874 async fn test_list_pipelines() {
1875 let mgr = setup_test_manager().await;
1876 let routes = api_routes(mgr, None, None, None);
1877
1878 let resp = test_request()
1879 .method("GET")
1880 .path("/api/v1/pipelines")
1881 .header("x-api-key", "test-key-123")
1882 .reply(&routes)
1883 .await;
1884
1885 assert_eq!(resp.status(), StatusCode::OK);
1886 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1887 assert_eq!(body.total, 1);
1888 assert_eq!(body.pipelines[0].name, "Test Pipeline");
1889 }
1890
1891 #[tokio::test]
1892 async fn test_usage_endpoint() {
1893 let mgr = setup_test_manager().await;
1894 let routes = api_routes(mgr, None, None, None);
1895
1896 let resp = test_request()
1897 .method("GET")
1898 .path("/api/v1/usage")
1899 .header("x-api-key", "test-key-123")
1900 .reply(&routes)
1901 .await;
1902
1903 assert_eq!(resp.status(), StatusCode::OK);
1904 let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1905 assert_eq!(body.active_pipelines, 1);
1906 }
1907
1908 #[tokio::test]
1909 async fn test_inject_event() {
1910 let mgr = setup_test_manager().await;
1911
1912 let pipeline_id = {
1914 let m = mgr.read().await;
1915 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1916 let tenant = m.get_tenant(&tid).unwrap();
1917 tenant.pipelines.keys().next().unwrap().clone()
1918 };
1919
1920 let routes = api_routes(mgr, None, None, None);
1921
1922 let resp = test_request()
1923 .method("POST")
1924 .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1925 .header("x-api-key", "test-key-123")
1926 .json(&InjectEventRequest {
1927 event_type: "SensorReading".into(),
1928 fields: {
1929 let mut m = serde_json::Map::new();
1930 m.insert(
1931 "x".into(),
1932 serde_json::Value::Number(serde_json::Number::from(42)),
1933 );
1934 m
1935 },
1936 })
1937 .reply(&routes)
1938 .await;
1939
1940 assert_eq!(resp.status(), StatusCode::OK);
1941 }
1942
1943 #[test]
1944 fn test_json_to_runtime_value() {
1945 assert_eq!(
1946 json_to_runtime_value(&serde_json::json!(null)),
1947 varpulis_core::Value::Null
1948 );
1949 assert_eq!(
1950 json_to_runtime_value(&serde_json::json!(true)),
1951 varpulis_core::Value::Bool(true)
1952 );
1953 assert_eq!(
1954 json_to_runtime_value(&serde_json::json!(42)),
1955 varpulis_core::Value::Int(42)
1956 );
1957 assert_eq!(
1958 json_to_runtime_value(&serde_json::json!(1.23)),
1959 varpulis_core::Value::Float(1.23)
1960 );
1961 assert_eq!(
1962 json_to_runtime_value(&serde_json::json!("hello")),
1963 varpulis_core::Value::Str("hello".into())
1964 );
1965 }
1966
1967 #[test]
1968 fn test_error_response_format() {
1969 let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
1970 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1971 }
1972
1973 #[test]
1974 fn test_tenant_error_mapping() {
1975 let resp = tenant_error_response(TenantError::NotFound("t1".into()));
1976 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1977
1978 let resp = tenant_error_response(TenantError::RateLimitExceeded);
1979 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
1980
1981 let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
1982 let resp = tenant_error_response(TenantError::ParseError(parse_err));
1983 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1984 }
1985
1986 fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
1991 let mgr = Arc::new(RwLock::new(TenantManager::new()));
1992 let key = admin_key.map(|k| k.to_string());
1993 let routes = api_routes(mgr.clone(), key, None, None);
1994 (mgr, routes)
1995 }
1996
1997 #[tokio::test]
1998 async fn test_create_tenant() {
1999 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2000
2001 let resp = test_request()
2002 .method("POST")
2003 .path("/api/v1/tenants")
2004 .header("x-admin-key", "admin-secret")
2005 .json(&CreateTenantRequest {
2006 name: "Acme Corp".into(),
2007 quota_tier: None,
2008 })
2009 .reply(&routes)
2010 .await;
2011
2012 assert_eq!(resp.status(), StatusCode::CREATED);
2013 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2014 assert_eq!(body.name, "Acme Corp");
2015 assert!(!body.api_key.is_empty());
2016 assert!(!body.id.is_empty());
2017 }
2018
2019 #[tokio::test]
2020 async fn test_list_tenants_admin() {
2021 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2022
2023 for name in &["Tenant A", "Tenant B"] {
2025 test_request()
2026 .method("POST")
2027 .path("/api/v1/tenants")
2028 .header("x-admin-key", "admin-secret")
2029 .json(&CreateTenantRequest {
2030 name: name.to_string(),
2031 quota_tier: None,
2032 })
2033 .reply(&routes)
2034 .await;
2035 }
2036
2037 let resp = test_request()
2038 .method("GET")
2039 .path("/api/v1/tenants")
2040 .header("x-admin-key", "admin-secret")
2041 .reply(&routes)
2042 .await;
2043
2044 assert_eq!(resp.status(), StatusCode::OK);
2045 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2046 assert_eq!(body.total, 2);
2047 }
2048
2049 #[tokio::test]
2050 async fn test_get_tenant_admin() {
2051 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2052
2053 let create_resp = test_request()
2055 .method("POST")
2056 .path("/api/v1/tenants")
2057 .header("x-admin-key", "admin-secret")
2058 .json(&CreateTenantRequest {
2059 name: "Detail Corp".into(),
2060 quota_tier: Some("pro".into()),
2061 })
2062 .reply(&routes)
2063 .await;
2064
2065 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2066
2067 let resp = test_request()
2068 .method("GET")
2069 .path(&format!("/api/v1/tenants/{}", created.id))
2070 .header("x-admin-key", "admin-secret")
2071 .reply(&routes)
2072 .await;
2073
2074 assert_eq!(resp.status(), StatusCode::OK);
2075 let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
2076 assert_eq!(body.name, "Detail Corp");
2077 assert_eq!(body.pipeline_count, 0);
2078 assert_eq!(body.quota.max_pipelines, 20);
2080 }
2081
2082 #[tokio::test]
2083 async fn test_delete_tenant_admin() {
2084 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2085
2086 let create_resp = test_request()
2088 .method("POST")
2089 .path("/api/v1/tenants")
2090 .header("x-admin-key", "admin-secret")
2091 .json(&CreateTenantRequest {
2092 name: "Doomed".into(),
2093 quota_tier: None,
2094 })
2095 .reply(&routes)
2096 .await;
2097 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2098
2099 let resp = test_request()
2100 .method("DELETE")
2101 .path(&format!("/api/v1/tenants/{}", created.id))
2102 .header("x-admin-key", "admin-secret")
2103 .reply(&routes)
2104 .await;
2105
2106 assert_eq!(resp.status(), StatusCode::OK);
2107
2108 let list_resp = test_request()
2110 .method("GET")
2111 .path("/api/v1/tenants")
2112 .header("x-admin-key", "admin-secret")
2113 .reply(&routes)
2114 .await;
2115 let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2116 assert_eq!(body.total, 0);
2117 }
2118
2119 #[tokio::test]
2120 async fn test_invalid_admin_key() {
2121 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2122
2123 let resp = test_request()
2124 .method("GET")
2125 .path("/api/v1/tenants")
2126 .header("x-admin-key", "wrong-key")
2127 .reply(&routes)
2128 .await;
2129
2130 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2131 }
2132
2133 #[tokio::test]
2134 async fn test_no_admin_key_configured() {
2135 let (_mgr, routes) = setup_admin_routes(None);
2136
2137 let resp = test_request()
2138 .method("GET")
2139 .path("/api/v1/tenants")
2140 .header("x-admin-key", "anything")
2141 .reply(&routes)
2142 .await;
2143
2144 assert_eq!(resp.status(), StatusCode::FORBIDDEN);
2145 }
2146
2147 #[tokio::test]
2148 async fn test_create_tenant_tier_selection() {
2149 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2150
2151 let resp = test_request()
2153 .method("POST")
2154 .path("/api/v1/tenants")
2155 .header("x-admin-key", "admin-secret")
2156 .json(&CreateTenantRequest {
2157 name: "Free User".into(),
2158 quota_tier: Some("free".into()),
2159 })
2160 .reply(&routes)
2161 .await;
2162 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2163 assert_eq!(body.quota.max_pipelines, 5); let resp = test_request()
2167 .method("POST")
2168 .path("/api/v1/tenants")
2169 .header("x-admin-key", "admin-secret")
2170 .json(&CreateTenantRequest {
2171 name: "Enterprise User".into(),
2172 quota_tier: Some("enterprise".into()),
2173 })
2174 .reply(&routes)
2175 .await;
2176 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2177 assert_eq!(body.quota.max_pipelines, 1000); }
2179
2180 async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2186 let m = mgr.read().await;
2187 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2188 let tenant = m.get_tenant(&tid).unwrap();
2189 tenant.pipelines.keys().next().unwrap().clone()
2190 }
2191
2192 #[tokio::test]
2193 async fn test_get_single_pipeline() {
2194 let mgr = setup_test_manager().await;
2195 let pipeline_id = get_first_pipeline_id(&mgr).await;
2196 let routes = api_routes(mgr, None, None, None);
2197
2198 let resp = test_request()
2199 .method("GET")
2200 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2201 .header("x-api-key", "test-key-123")
2202 .reply(&routes)
2203 .await;
2204
2205 assert_eq!(resp.status(), StatusCode::OK);
2206 let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2207 assert_eq!(body.id, pipeline_id);
2208 assert_eq!(body.name, "Test Pipeline");
2209 assert_eq!(body.status, "running");
2210 assert!(body.source.contains("SensorReading"));
2211 }
2212
2213 #[tokio::test]
2214 async fn test_get_pipeline_not_found() {
2215 let mgr = setup_test_manager().await;
2216 let routes = api_routes(mgr, None, None, None);
2217
2218 let resp = test_request()
2219 .method("GET")
2220 .path("/api/v1/pipelines/nonexistent-id")
2221 .header("x-api-key", "test-key-123")
2222 .reply(&routes)
2223 .await;
2224
2225 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2226 }
2227
2228 #[tokio::test]
2229 async fn test_delete_pipeline_api() {
2230 let mgr = setup_test_manager().await;
2231 let pipeline_id = get_first_pipeline_id(&mgr).await;
2232 let routes = api_routes(mgr.clone(), None, None, None);
2233
2234 let resp = test_request()
2235 .method("DELETE")
2236 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2237 .header("x-api-key", "test-key-123")
2238 .reply(&routes)
2239 .await;
2240
2241 assert_eq!(resp.status(), StatusCode::OK);
2242 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2243 assert_eq!(body["deleted"], true);
2244
2245 let list_resp = test_request()
2247 .method("GET")
2248 .path("/api/v1/pipelines")
2249 .header("x-api-key", "test-key-123")
2250 .reply(&routes)
2251 .await;
2252 let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2253 assert_eq!(list.total, 0);
2254 }
2255
2256 #[tokio::test]
2257 async fn test_delete_pipeline_not_found() {
2258 let mgr = setup_test_manager().await;
2259 let routes = api_routes(mgr, None, None, None);
2260
2261 let resp = test_request()
2262 .method("DELETE")
2263 .path("/api/v1/pipelines/nonexistent-id")
2264 .header("x-api-key", "test-key-123")
2265 .reply(&routes)
2266 .await;
2267
2268 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2269 }
2270
2271 #[tokio::test]
2276 async fn test_inject_batch() {
2277 let mgr = setup_test_manager().await;
2278 let pipeline_id = get_first_pipeline_id(&mgr).await;
2279 let routes = api_routes(mgr, None, None, None);
2280
2281 let resp = test_request()
2282 .method("POST")
2283 .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2284 .header("x-api-key", "test-key-123")
2285 .json(&InjectBatchRequest {
2286 events: vec![
2287 InjectEventRequest {
2288 event_type: "SensorReading".into(),
2289 fields: {
2290 let mut m = serde_json::Map::new();
2291 m.insert("x".into(), serde_json::json!(5));
2292 m
2293 },
2294 },
2295 InjectEventRequest {
2296 event_type: "SensorReading".into(),
2297 fields: {
2298 let mut m = serde_json::Map::new();
2299 m.insert("x".into(), serde_json::json!(10));
2300 m
2301 },
2302 },
2303 ],
2304 })
2305 .reply(&routes)
2306 .await;
2307
2308 assert_eq!(resp.status(), StatusCode::OK);
2309 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2310 assert_eq!(body.accepted, 2);
2311 assert!(body.processing_time_us > 0);
2312 }
2313
2314 #[tokio::test]
2315 async fn test_inject_batch_invalid_pipeline() {
2316 let mgr = setup_test_manager().await;
2317 let routes = api_routes(mgr, None, None, None);
2318
2319 let resp = test_request()
2321 .method("POST")
2322 .path("/api/v1/pipelines/nonexistent/events-batch")
2323 .header("x-api-key", "test-key-123")
2324 .json(&InjectBatchRequest {
2325 events: vec![InjectEventRequest {
2326 event_type: "Test".into(),
2327 fields: serde_json::Map::new(),
2328 }],
2329 })
2330 .reply(&routes)
2331 .await;
2332
2333 assert_eq!(resp.status(), StatusCode::OK);
2335 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2336 assert_eq!(body.accepted, 0);
2337 }
2338
2339 #[tokio::test]
2344 async fn test_checkpoint_pipeline() {
2345 let mgr = setup_test_manager().await;
2346 let pipeline_id = get_first_pipeline_id(&mgr).await;
2347 let routes = api_routes(mgr, None, None, None);
2348
2349 let resp = test_request()
2350 .method("POST")
2351 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2352 .header("x-api-key", "test-key-123")
2353 .reply(&routes)
2354 .await;
2355
2356 assert_eq!(resp.status(), StatusCode::OK);
2357 let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2358 assert_eq!(body.pipeline_id, pipeline_id);
2359 }
2360
2361 #[tokio::test]
2362 async fn test_checkpoint_not_found() {
2363 let mgr = setup_test_manager().await;
2364 let routes = api_routes(mgr, None, None, None);
2365
2366 let resp = test_request()
2367 .method("POST")
2368 .path("/api/v1/pipelines/nonexistent/checkpoint")
2369 .header("x-api-key", "test-key-123")
2370 .reply(&routes)
2371 .await;
2372
2373 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2374 }
2375
2376 #[tokio::test]
2377 async fn test_restore_pipeline() {
2378 let mgr = setup_test_manager().await;
2379 let pipeline_id = get_first_pipeline_id(&mgr).await;
2380 let routes = api_routes(mgr, None, None, None);
2381
2382 let cp_resp = test_request()
2384 .method("POST")
2385 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2386 .header("x-api-key", "test-key-123")
2387 .reply(&routes)
2388 .await;
2389 let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2390
2391 let resp = test_request()
2393 .method("POST")
2394 .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2395 .header("x-api-key", "test-key-123")
2396 .json(&RestoreRequest {
2397 checkpoint: cp.checkpoint,
2398 })
2399 .reply(&routes)
2400 .await;
2401
2402 assert_eq!(resp.status(), StatusCode::OK);
2403 let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2404 assert_eq!(body.pipeline_id, pipeline_id);
2405 assert!(body.restored);
2406 }
2407
2408 #[tokio::test]
2409 async fn test_restore_not_found() {
2410 let mgr = setup_test_manager().await;
2411 let routes = api_routes(mgr, None, None, None);
2412
2413 let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2414 version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2415 window_states: std::collections::HashMap::new(),
2416 sase_states: std::collections::HashMap::new(),
2417 join_states: std::collections::HashMap::new(),
2418 variables: std::collections::HashMap::new(),
2419 events_processed: 0,
2420 output_events_emitted: 0,
2421 watermark_state: None,
2422 distinct_states: std::collections::HashMap::new(),
2423 limit_states: std::collections::HashMap::new(),
2424 };
2425
2426 let resp = test_request()
2427 .method("POST")
2428 .path("/api/v1/pipelines/nonexistent/restore")
2429 .header("x-api-key", "test-key-123")
2430 .json(&RestoreRequest { checkpoint })
2431 .reply(&routes)
2432 .await;
2433
2434 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2435 }
2436
2437 #[tokio::test]
2442 async fn test_metrics_endpoint() {
2443 let mgr = setup_test_manager().await;
2444 let pipeline_id = get_first_pipeline_id(&mgr).await;
2445 let routes = api_routes(mgr, None, None, None);
2446
2447 let resp = test_request()
2448 .method("GET")
2449 .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2450 .header("x-api-key", "test-key-123")
2451 .reply(&routes)
2452 .await;
2453
2454 assert_eq!(resp.status(), StatusCode::OK);
2455 let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2456 assert_eq!(body.pipeline_id, pipeline_id);
2457 }
2458
2459 #[tokio::test]
2460 async fn test_metrics_not_found() {
2461 let mgr = setup_test_manager().await;
2462 let routes = api_routes(mgr, None, None, None);
2463
2464 let resp = test_request()
2465 .method("GET")
2466 .path("/api/v1/pipelines/nonexistent/metrics")
2467 .header("x-api-key", "test-key-123")
2468 .reply(&routes)
2469 .await;
2470
2471 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2472 }
2473
2474 #[tokio::test]
2479 async fn test_reload_pipeline() {
2480 let mgr = setup_test_manager().await;
2481 let pipeline_id = get_first_pipeline_id(&mgr).await;
2482 let routes = api_routes(mgr, None, None, None);
2483
2484 let resp = test_request()
2485 .method("POST")
2486 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2487 .header("x-api-key", "test-key-123")
2488 .json(&ReloadPipelineRequest {
2489 source: "stream B = Events .where(y > 10)".into(),
2490 })
2491 .reply(&routes)
2492 .await;
2493
2494 assert_eq!(resp.status(), StatusCode::OK);
2495 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2496 assert_eq!(body["reloaded"], true);
2497 }
2498
2499 #[tokio::test]
2500 async fn test_reload_invalid_vpl() {
2501 let mgr = setup_test_manager().await;
2502 let pipeline_id = get_first_pipeline_id(&mgr).await;
2503 let routes = api_routes(mgr, None, None, None);
2504
2505 let resp = test_request()
2506 .method("POST")
2507 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2508 .header("x-api-key", "test-key-123")
2509 .json(&ReloadPipelineRequest {
2510 source: "not valid {{{".into(),
2511 })
2512 .reply(&routes)
2513 .await;
2514
2515 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2516 }
2517
2518 #[tokio::test]
2519 async fn test_reload_not_found() {
2520 let mgr = setup_test_manager().await;
2521 let routes = api_routes(mgr, None, None, None);
2522
2523 let resp = test_request()
2524 .method("POST")
2525 .path("/api/v1/pipelines/nonexistent/reload")
2526 .header("x-api-key", "test-key-123")
2527 .json(&ReloadPipelineRequest {
2528 source: "stream B = Events .where(y > 10)".into(),
2529 })
2530 .reply(&routes)
2531 .await;
2532
2533 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2534 }
2535
2536 #[tokio::test]
2541 async fn test_logs_invalid_pipeline() {
2542 let mgr = setup_test_manager().await;
2543 let routes = api_routes(mgr, None, None, None);
2544
2545 let resp = test_request()
2546 .method("GET")
2547 .path("/api/v1/pipelines/nonexistent/logs")
2548 .header("x-api-key", "test-key-123")
2549 .reply(&routes)
2550 .await;
2551
2552 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2553 }
2554
2555 #[tokio::test]
2556 async fn test_logs_invalid_api_key() {
2557 let mgr = setup_test_manager().await;
2558 let pipeline_id = get_first_pipeline_id(&mgr).await;
2559 let routes = api_routes(mgr, None, None, None);
2560
2561 let resp = test_request()
2562 .method("GET")
2563 .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2564 .header("x-api-key", "wrong-key")
2565 .reply(&routes)
2566 .await;
2567
2568 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2569 }
2570
2571 #[test]
2576 fn test_json_to_runtime_value_array() {
2577 let arr = serde_json::json!([1, "hello", true]);
2578 let val = json_to_runtime_value(&arr);
2579 match val {
2580 varpulis_core::Value::Array(a) => {
2581 assert_eq!(a.len(), 3);
2582 assert_eq!(a[0], varpulis_core::Value::Int(1));
2583 assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2584 assert_eq!(a[2], varpulis_core::Value::Bool(true));
2585 }
2586 _ => panic!("Expected Array"),
2587 }
2588 }
2589
2590 #[test]
2591 fn test_json_to_runtime_value_object() {
2592 let obj = serde_json::json!({"key": "val", "num": 42});
2593 let val = json_to_runtime_value(&obj);
2594 match val {
2595 varpulis_core::Value::Map(m) => {
2596 assert_eq!(m.len(), 2);
2597 }
2598 _ => panic!("Expected Map"),
2599 }
2600 }
2601
2602 #[test]
2603 fn test_json_from_value_roundtrip() {
2604 use varpulis_core::Value;
2605 assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2606 assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2607 assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2608 assert_eq!(
2609 json_from_value(&Value::Float(2.71)),
2610 serde_json::json!(2.71)
2611 );
2612 assert_eq!(
2613 json_from_value(&Value::Str("hi".into())),
2614 serde_json::json!("hi")
2615 );
2616 assert_eq!(
2617 json_from_value(&Value::Timestamp(1000000)),
2618 serde_json::json!(1000000)
2619 );
2620 assert_eq!(
2621 json_from_value(&Value::Duration(5000)),
2622 serde_json::json!(5000)
2623 );
2624 }
2625
2626 #[test]
2631 fn test_tenant_error_all_variants() {
2632 let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2633 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2634
2635 let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2636 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2637
2638 let resp = tenant_error_response(TenantError::EngineError(
2639 varpulis_runtime::EngineError::Pipeline("boom".into()),
2640 ));
2641 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2642
2643 let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2644 assert_eq!(resp.status(), StatusCode::CONFLICT);
2645
2646 let resp = tenant_error_response(TenantError::BackpressureExceeded {
2647 current: 50000,
2648 max: 50000,
2649 });
2650 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2651 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2652 }
2653
2654 #[tokio::test]
2659 async fn test_list_pipelines_default_pagination() {
2660 let mgr = setup_test_manager().await;
2661 let routes = api_routes(mgr, None, None, None);
2662
2663 let resp = test_request()
2664 .method("GET")
2665 .path("/api/v1/pipelines")
2666 .header("x-api-key", "test-key-123")
2667 .reply(&routes)
2668 .await;
2669
2670 assert_eq!(resp.status(), StatusCode::OK);
2671 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2672 assert_eq!(body.total, 1);
2673 let pagination = body.pagination.unwrap();
2674 assert_eq!(pagination.total, 1);
2675 assert_eq!(pagination.offset, 0);
2676 assert_eq!(pagination.limit, 50);
2677 assert!(!pagination.has_more);
2678 }
2679
2680 #[tokio::test]
2681 async fn test_list_pipelines_with_pagination_params() {
2682 let mgr = setup_test_manager().await;
2683
2684 {
2686 let mut m = mgr.write().await;
2687 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2688 let tenant = m.get_tenant_mut(&tid).unwrap();
2689 tenant
2690 .deploy_pipeline(
2691 "Pipeline B".into(),
2692 "stream B = Events .where(y > 2)".into(),
2693 )
2694 .await
2695 .unwrap();
2696 tenant
2697 .deploy_pipeline(
2698 "Pipeline C".into(),
2699 "stream C = Events .where(z > 3)".into(),
2700 )
2701 .await
2702 .unwrap();
2703 }
2704
2705 let routes = api_routes(mgr, None, None, None);
2706
2707 let resp = test_request()
2709 .method("GET")
2710 .path("/api/v1/pipelines?limit=1&offset=0")
2711 .header("x-api-key", "test-key-123")
2712 .reply(&routes)
2713 .await;
2714
2715 assert_eq!(resp.status(), StatusCode::OK);
2716 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2717 assert_eq!(body.pipelines.len(), 1);
2718 assert_eq!(body.total, 3);
2719 let pagination = body.pagination.unwrap();
2720 assert!(pagination.has_more);
2721 assert_eq!(pagination.limit, 1);
2722
2723 let resp = test_request()
2725 .method("GET")
2726 .path("/api/v1/pipelines?limit=1&offset=2")
2727 .header("x-api-key", "test-key-123")
2728 .reply(&routes)
2729 .await;
2730
2731 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2732 assert_eq!(body.pipelines.len(), 1);
2733 assert_eq!(body.total, 3);
2734 assert!(!body.pagination.unwrap().has_more);
2735 }
2736
2737 #[tokio::test]
2738 async fn test_list_pipelines_limit_exceeds_max() {
2739 let mgr = setup_test_manager().await;
2740 let routes = api_routes(mgr, None, None, None);
2741
2742 let resp = test_request()
2743 .method("GET")
2744 .path("/api/v1/pipelines?limit=1001")
2745 .header("x-api-key", "test-key-123")
2746 .reply(&routes)
2747 .await;
2748
2749 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2750 }
2751
2752 #[tokio::test]
2753 async fn test_list_tenants_with_pagination() {
2754 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2755
2756 for name in &["T1", "T2", "T3"] {
2758 test_request()
2759 .method("POST")
2760 .path("/api/v1/tenants")
2761 .header("x-admin-key", "admin-secret")
2762 .json(&CreateTenantRequest {
2763 name: name.to_string(),
2764 quota_tier: None,
2765 })
2766 .reply(&routes)
2767 .await;
2768 }
2769
2770 let resp = test_request()
2772 .method("GET")
2773 .path("/api/v1/tenants?limit=2&offset=0")
2774 .header("x-admin-key", "admin-secret")
2775 .reply(&routes)
2776 .await;
2777
2778 assert_eq!(resp.status(), StatusCode::OK);
2779 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2780 assert_eq!(body.tenants.len(), 2);
2781 assert_eq!(body.total, 3);
2782 assert!(body.pagination.unwrap().has_more);
2783
2784 let resp = test_request()
2786 .method("GET")
2787 .path("/api/v1/tenants?limit=2&offset=2")
2788 .header("x-admin-key", "admin-secret")
2789 .reply(&routes)
2790 .await;
2791
2792 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2793 assert_eq!(body.tenants.len(), 1);
2794 assert!(!body.pagination.unwrap().has_more);
2795 }
2796
2797 #[tokio::test]
2798 async fn test_inject_backpressure_429() {
2799 use std::sync::atomic::Ordering;
2800
2801 let mut mgr = TenantManager::new();
2802 mgr.set_max_queue_depth(5);
2803 let id = mgr
2804 .create_tenant(
2805 "BP Corp".into(),
2806 "bp-key-123".into(),
2807 TenantQuota::default(),
2808 )
2809 .unwrap();
2810
2811 let tenant = mgr.get_tenant_mut(&id).unwrap();
2812 let pid = tenant
2813 .deploy_pipeline(
2814 "BP Pipeline".into(),
2815 "stream A = SensorReading .where(x > 1)".into(),
2816 )
2817 .await
2818 .unwrap();
2819
2820 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2822
2823 let shared = Arc::new(RwLock::new(mgr));
2824 let routes = api_routes(shared, None, None, None);
2825
2826 let resp = test_request()
2827 .method("POST")
2828 .path(&format!("/api/v1/pipelines/{pid}/events"))
2829 .header("x-api-key", "bp-key-123")
2830 .json(&InjectEventRequest {
2831 event_type: "SensorReading".into(),
2832 fields: serde_json::Map::new(),
2833 })
2834 .reply(&routes)
2835 .await;
2836
2837 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2838 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2840 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2842 assert_eq!(body["code"], "queue_depth_exceeded");
2843 }
2844
2845 #[tokio::test]
2846 async fn test_inject_batch_backpressure_429() {
2847 use std::sync::atomic::Ordering;
2848
2849 let mut mgr = TenantManager::new();
2850 mgr.set_max_queue_depth(5);
2851 let id = mgr
2852 .create_tenant(
2853 "BP Batch Corp".into(),
2854 "bp-batch-key".into(),
2855 TenantQuota::default(),
2856 )
2857 .unwrap();
2858
2859 let tenant = mgr.get_tenant_mut(&id).unwrap();
2860 let pid = tenant
2861 .deploy_pipeline(
2862 "BP Batch Pipeline".into(),
2863 "stream A = SensorReading .where(x > 1)".into(),
2864 )
2865 .await
2866 .unwrap();
2867
2868 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2870
2871 let shared = Arc::new(RwLock::new(mgr));
2872 let routes = api_routes(shared, None, None, None);
2873
2874 let resp = test_request()
2875 .method("POST")
2876 .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2877 .header("x-api-key", "bp-batch-key")
2878 .json(&InjectBatchRequest {
2879 events: vec![InjectEventRequest {
2880 event_type: "SensorReading".into(),
2881 fields: serde_json::Map::new(),
2882 }],
2883 })
2884 .reply(&routes)
2885 .await;
2886
2887 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2888 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2889 }
2890}