1use std::convert::Infallible;
7
8use axum::extract::{Json, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::{IntoResponse, Response};
11use axum::routing::{get, post};
12use axum::Router;
13use futures_util::stream;
14use indexmap::IndexMap;
15use rustc_hash::FxBuildHasher;
16use serde::{Deserialize, Serialize};
17use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
18use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
19use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
20use varpulis_runtime::Event;
21
22use crate::auth::constant_time_compare;
23use crate::billing::SharedBillingState;
24
25#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31 pub name: String,
32 pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37 pub id: String,
38 pub name: String,
39 pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44 pub id: String,
45 pub name: String,
46 pub status: String,
47 pub source: String,
48 pub uptime_secs: u64,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub global_template_id: Option<String>,
51 #[serde(default = "default_scope")]
53 pub scope_level: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub inherited_from_org_id: Option<String>,
57 #[serde(default)]
59 pub read_only: bool,
60}
61
62fn default_scope() -> String {
63 "own".to_string()
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct PipelineListResponse {
68 pub pipelines: Vec<PipelineInfo>,
69 pub total: usize,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub pagination: Option<PaginationMeta>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct PipelineMetricsResponse {
76 pub pipeline_id: String,
77 pub events_processed: u64,
78 pub output_events_emitted: u64,
79}
80
81#[derive(Debug, Deserialize, Serialize)]
82pub struct InjectEventRequest {
83 pub event_type: String,
84 pub fields: serde_json::Map<String, serde_json::Value>,
85}
86
87#[derive(Debug, Deserialize, Serialize)]
88pub struct InjectBatchRequest {
89 pub events: Vec<InjectEventRequest>,
90}
91
92#[derive(Debug, Serialize, Deserialize)]
93pub struct InjectBatchResponse {
94 pub accepted: usize,
95 pub output_events: Vec<serde_json::Value>,
96 pub processing_time_us: u64,
97}
98
99#[derive(Debug, Deserialize, Serialize)]
100pub struct ReloadPipelineRequest {
101 pub source: String,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105pub struct CheckpointResponse {
106 pub pipeline_id: String,
107 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
108 pub events_processed: u64,
109}
110
111#[derive(Debug, Deserialize, Serialize)]
112pub struct RestoreRequest {
113 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
114}
115
116#[derive(Debug, Serialize, Deserialize)]
117pub struct RestoreResponse {
118 pub pipeline_id: String,
119 pub restored: bool,
120 pub events_restored: u64,
121}
122
123#[derive(Debug, Serialize)]
124pub struct ApiError {
125 pub error: String,
126 pub code: String,
127}
128
129#[derive(Debug, Deserialize)]
130pub struct DlqQueryParams {
131 #[serde(default)]
132 pub offset: Option<usize>,
133 #[serde(default)]
134 pub limit: Option<usize>,
135}
136
137#[derive(Debug, Serialize)]
138pub struct DlqEntriesResponse {
139 pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
140 pub total: u64,
141}
142
143#[derive(Debug, Serialize)]
144pub struct DlqReplayResponse {
145 pub replayed: usize,
146}
147
148#[derive(Debug, Serialize)]
149pub struct DlqClearResponse {
150 pub cleared: bool,
151}
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct UsageResponse {
155 pub tenant_id: String,
156 pub events_processed: u64,
157 pub output_events_emitted: u64,
158 pub active_pipelines: usize,
159 pub quota: QuotaInfo,
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct QuotaInfo {
164 pub max_pipelines: usize,
165 pub max_events_per_second: u64,
166 pub max_streams_per_pipeline: usize,
167}
168
169#[derive(Debug, Deserialize)]
174pub struct PipelineGraphRequest {
175 pub vpl: String,
176}
177
178#[derive(Debug, Serialize)]
179pub struct GenerateResponse {
180 pub vpl: String,
181}
182
183#[derive(Debug, Deserialize, Serialize)]
188pub struct CreateTenantRequest {
189 pub name: String,
190 #[serde(default)]
191 pub quota_tier: Option<String>,
192}
193
194#[derive(Debug, Serialize, Deserialize)]
195pub struct TenantResponse {
196 pub id: String,
197 pub name: String,
198 pub api_key: String,
199 pub quota: QuotaInfo,
200}
201
202#[derive(Debug, Serialize, Deserialize)]
203pub struct TenantListResponse {
204 pub tenants: Vec<TenantResponse>,
205 pub total: usize,
206 #[serde(skip_serializing_if = "Option::is_none")]
207 pub pagination: Option<PaginationMeta>,
208}
209
210#[derive(Debug, Serialize, Deserialize)]
211pub struct TenantDetailResponse {
212 pub id: String,
213 pub name: String,
214 pub api_key: String,
215 pub quota: QuotaInfo,
216 pub usage: TenantUsageInfo,
217 pub pipeline_count: usize,
218}
219
220#[derive(Debug, Serialize, Deserialize)]
221pub struct TenantUsageInfo {
222 pub events_processed: u64,
223 pub output_events_emitted: u64,
224 pub active_pipelines: usize,
225}
226
227fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
237 let methods = AllowMethods::list([
238 axum::http::Method::GET,
239 axum::http::Method::POST,
240 axum::http::Method::PUT,
241 axum::http::Method::DELETE,
242 axum::http::Method::OPTIONS,
243 ]);
244
245 let headers = AllowHeaders::list([
246 "content-type".parse().unwrap(),
247 "x-api-key".parse().unwrap(),
248 "authorization".parse().unwrap(),
249 "x-request-id".parse().unwrap(),
250 "traceparent".parse().unwrap(),
251 ]);
252
253 let origin = match origins {
254 Some(ref list) if list.iter().any(|o| o == "*") => {
255 tracing::warn!(
256 "CORS configured with allow_any_origin — this is unsafe for production. \
257 Set --cors-origins to restrict to specific origins."
258 );
259 AllowOrigin::any()
260 }
261 Some(ref list) if !list.is_empty() => {
262 let origins: Vec<axum::http::HeaderValue> =
263 list.iter().filter_map(|s| s.parse().ok()).collect();
264 AllowOrigin::list(origins)
265 }
266 _ => AllowOrigin::list([
267 "http://localhost:5173".parse().unwrap(),
268 "http://localhost:8080".parse().unwrap(),
269 "http://127.0.0.1:5173".parse().unwrap(),
270 "http://127.0.0.1:8080".parse().unwrap(),
271 ]),
272 };
273
274 CorsLayer::new()
275 .allow_methods(methods)
276 .allow_headers(headers)
277 .allow_origin(origin)
278}
279
280#[derive(Debug, Clone)]
282pub struct ApiState {
283 pub manager: SharedTenantManager,
284 pub admin_key: Option<String>,
285 pub billing_state: Option<SharedBillingState>,
286 #[cfg(feature = "saas")]
287 pub db_pool: Option<varpulis_db::PgPool>,
288}
289
290#[derive(Debug)]
292pub struct ApiKey(pub String);
293
294impl<S> axum::extract::FromRequestParts<S> for ApiKey
295where
296 S: Send + Sync,
297{
298 type Rejection = Response;
299
300 async fn from_request_parts(
301 parts: &mut axum::http::request::Parts,
302 _state: &S,
303 ) -> Result<Self, Self::Rejection> {
304 parts
305 .headers
306 .get("x-api-key")
307 .and_then(|v| v.to_str().ok())
308 .map(|s| Self(s.to_string()))
309 .ok_or_else(|| {
310 (
311 StatusCode::UNAUTHORIZED,
312 axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
313 )
314 .into_response()
315 })
316 }
317}
318
319#[derive(Debug)]
321pub struct AdminKey(pub String);
322
323impl<S> axum::extract::FromRequestParts<S> for AdminKey
324where
325 S: Send + Sync,
326{
327 type Rejection = Response;
328
329 async fn from_request_parts(
330 parts: &mut axum::http::request::Parts,
331 _state: &S,
332 ) -> Result<Self, Self::Rejection> {
333 parts
334 .headers
335 .get("x-admin-key")
336 .and_then(|v| v.to_str().ok())
337 .map(|s| Self(s.to_string()))
338 .ok_or_else(|| {
339 (
340 StatusCode::UNAUTHORIZED,
341 axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
342 )
343 .into_response()
344 })
345 }
346}
347
348pub fn api_routes(
350 manager: SharedTenantManager,
351 admin_key: Option<String>,
352 cors_origins: Option<Vec<String>>,
353 billing_state: Option<SharedBillingState>,
354 #[cfg(feature = "saas")] db_pool: Option<varpulis_db::PgPool>,
355) -> Router {
356 let state = ApiState {
357 manager,
358 admin_key,
359 billing_state,
360 #[cfg(feature = "saas")]
361 db_pool,
362 };
363
364 let cors = build_cors(cors_origins);
365
366 Router::new()
367 .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
369 .route(
370 "/api/v1/pipelines/{pipeline_id}",
371 get(handle_get).delete(handle_delete),
372 )
373 .route(
375 "/api/v1/pipelines/{pipeline_id}/events",
376 post(handle_inject),
377 )
378 .route(
379 "/api/v1/pipelines/{pipeline_id}/events-batch",
380 post(handle_inject_batch),
381 )
382 .route(
383 "/api/v1/pipelines/{pipeline_id}/checkpoint",
384 post(handle_checkpoint),
385 )
386 .route(
387 "/api/v1/pipelines/{pipeline_id}/restore",
388 post(handle_restore),
389 )
390 .route(
391 "/api/v1/pipelines/{pipeline_id}/metrics",
392 get(handle_metrics),
393 )
394 .route(
395 "/api/v1/pipelines/{pipeline_id}/topology",
396 get(handle_topology),
397 )
398 .route(
399 "/api/v1/pipelines/{pipeline_id}/reload",
400 post(handle_reload),
401 )
402 .route("/api/v1/usage", get(handle_usage))
403 .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
404 .route(
406 "/api/v1/pipelines/{pipeline_id}/dlq",
407 get(handle_dlq_get).delete(handle_dlq_clear),
408 )
409 .route(
410 "/api/v1/pipelines/{pipeline_id}/dlq/replay",
411 post(handle_dlq_replay),
412 )
413 .route("/api/v1/pipeline/graph", post(handle_pipeline_to_graph))
415 .route("/api/v1/pipeline/generate", post(handle_graph_to_pipeline))
416 .route(
418 "/api/v1/tenants",
419 post(handle_create_tenant).get(handle_list_tenants),
420 )
421 .route(
422 "/api/v1/tenants/{tenant_id}",
423 get(handle_get_tenant).delete(handle_delete_tenant),
424 )
425 .layer(cors)
426 .with_state(state)
427}
428
429async fn handle_deploy(
434 State(state): State<ApiState>,
435 ApiKey(api_key): ApiKey,
436 Json(body): Json<DeployPipelineRequest>,
437) -> Response {
438 let manager = &state.manager;
439 let mut mgr = manager.write().await;
440
441 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
442 Some(id) => id.clone(),
443 None => {
444 return error_response(
445 StatusCode::UNAUTHORIZED,
446 "invalid_api_key",
447 "Invalid API key",
448 )
449 }
450 };
451
452 let pipeline_name = body.name.clone();
453 #[cfg(feature = "saas")]
454 let vpl_source = body.source.clone();
455
456 let result = mgr
457 .deploy_pipeline_on_tenant(&tenant_id, body.name, body.source)
458 .await;
459
460 match result {
461 Ok(id) => {
462 mgr.persist_if_needed(&tenant_id);
463
464 #[cfg(feature = "saas")]
466 if let Some(ref pool) = state.db_pool {
467 if let Ok(org_uuid) = tenant_id.0.parse::<uuid::Uuid>() {
468 if let Err(e) = varpulis_db::repo::create_scoped_pipeline(
469 pool,
470 org_uuid,
471 &pipeline_name,
472 &vpl_source,
473 "own",
474 )
475 .await
476 {
477 tracing::warn!("Failed to sync pipeline to DB: {}", e);
478 }
479 }
480 }
481
482 let resp = DeployPipelineResponse {
483 id,
484 name: pipeline_name,
485 status: "running".to_string(),
486 };
487 (StatusCode::CREATED, axum::Json(&resp)).into_response()
488 }
489 Err(e) => tenant_error_response(e),
490 }
491}
492
493async fn handle_list(
494 State(state): State<ApiState>,
495 ApiKey(api_key): ApiKey,
496 Query(pagination): Query<PaginationParams>,
497) -> Response {
498 let manager = &state.manager;
499 if pagination.exceeds_max() {
500 return error_response(
501 StatusCode::BAD_REQUEST,
502 "invalid_limit",
503 &format!("limit must not exceed {MAX_LIMIT}"),
504 );
505 }
506
507 let mgr = manager.read().await;
508
509 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
510 Some(id) => id.clone(),
511 None => {
512 return error_response(
513 StatusCode::UNAUTHORIZED,
514 "invalid_api_key",
515 "Invalid API key",
516 )
517 }
518 };
519
520 let tenant = match mgr.get_tenant(&tenant_id) {
521 Some(t) => t,
522 None => {
523 return error_response(
524 StatusCode::NOT_FOUND,
525 "tenant_not_found",
526 "Tenant not found",
527 )
528 }
529 };
530
531 let all_pipelines: Vec<PipelineInfo> = tenant
532 .pipelines
533 .values()
534 .map(|p| {
535 let is_global = p.global_template_id.is_some();
536 PipelineInfo {
537 id: p.id.clone(),
538 name: p.name.clone(),
539 status: p.status.to_string(),
540 source: p.source.clone(),
541 uptime_secs: p.created_at.elapsed().as_secs(),
542 global_template_id: p.global_template_id.clone(),
543 scope_level: if is_global {
544 "global".to_string()
545 } else {
546 "own".to_string()
547 },
548 inherited_from_org_id: None,
549 read_only: is_global,
550 }
551 })
552 .collect();
553
554 let (pipelines, meta) = pagination.paginate(all_pipelines);
555 let total = meta.total;
556 let resp = PipelineListResponse {
557 pipelines,
558 total,
559 pagination: Some(meta),
560 };
561 axum::Json(&resp).into_response()
562}
563
564async fn handle_get(
565 State(state): State<ApiState>,
566 Path(pipeline_id): Path<String>,
567 ApiKey(api_key): ApiKey,
568) -> Response {
569 let manager = &state.manager;
570 let mgr = manager.read().await;
571
572 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
573 Some(id) => id.clone(),
574 None => {
575 return error_response(
576 StatusCode::UNAUTHORIZED,
577 "invalid_api_key",
578 "Invalid API key",
579 )
580 }
581 };
582
583 let tenant = match mgr.get_tenant(&tenant_id) {
584 Some(t) => t,
585 None => {
586 return error_response(
587 StatusCode::NOT_FOUND,
588 "tenant_not_found",
589 "Tenant not found",
590 )
591 }
592 };
593
594 match tenant.pipelines.get(&pipeline_id) {
595 Some(p) => {
596 let is_global = p.global_template_id.is_some();
597 let info = PipelineInfo {
598 id: p.id.clone(),
599 name: p.name.clone(),
600 status: p.status.to_string(),
601 source: p.source.clone(),
602 uptime_secs: p.created_at.elapsed().as_secs(),
603 global_template_id: p.global_template_id.clone(),
604 scope_level: if is_global {
605 "global".to_string()
606 } else {
607 "own".to_string()
608 },
609 inherited_from_org_id: None,
610 read_only: is_global,
611 };
612 axum::Json(&info).into_response()
613 }
614 None => error_response(
615 StatusCode::NOT_FOUND,
616 "pipeline_not_found",
617 "Pipeline not found",
618 ),
619 }
620}
621
622async fn handle_delete(
623 State(state): State<ApiState>,
624 Path(pipeline_id): Path<String>,
625 ApiKey(api_key): ApiKey,
626) -> Response {
627 let manager = &state.manager;
628 let mut mgr = manager.write().await;
629
630 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
631 Some(id) => id.clone(),
632 None => {
633 return error_response(
634 StatusCode::UNAUTHORIZED,
635 "invalid_api_key",
636 "Invalid API key",
637 )
638 }
639 };
640
641 #[cfg(feature = "saas")]
642 let mut pipeline_name_for_db = None;
643
644 let result = {
645 let tenant = match mgr.get_tenant_mut(&tenant_id) {
646 Some(t) => t,
647 None => {
648 return error_response(
649 StatusCode::NOT_FOUND,
650 "tenant_not_found",
651 "Tenant not found",
652 )
653 }
654 };
655
656 if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
658 if pipeline.global_template_id.is_some() {
659 return error_response(
660 StatusCode::FORBIDDEN,
661 "global_pipeline_protected",
662 "Global pipelines can only be managed by admin",
663 );
664 }
665 #[cfg(feature = "saas")]
666 {
667 pipeline_name_for_db = Some(pipeline.name.clone());
668 }
669 }
670
671 tenant.remove_pipeline(&pipeline_id)
672 };
673
674 match result {
675 Ok(()) => {
676 mgr.persist_if_needed(&tenant_id);
677
678 #[cfg(feature = "saas")]
680 if let Some(ref pool) = state.db_pool {
681 if let (Ok(org_uuid), Some(name)) =
682 (tenant_id.0.parse::<uuid::Uuid>(), &pipeline_name_for_db)
683 {
684 let _ = varpulis_db::repo::delete_pipeline_by_name(pool, org_uuid, name).await;
685 }
686 }
687
688 axum::Json(serde_json::json!({"deleted": true})).into_response()
689 }
690 Err(e) => tenant_error_response(e),
691 }
692}
693
694async fn handle_inject(
695 State(state): State<ApiState>,
696 Path(pipeline_id): Path<String>,
697 ApiKey(api_key): ApiKey,
698 Json(body): Json<InjectEventRequest>,
699) -> Response {
700 let manager = &state.manager;
701 let billing_state = &state.billing_state;
702 #[cfg(feature = "saas")]
704 let mut usage_warning: Option<f64> = None;
705 #[cfg(feature = "saas")]
706 if let Some(ref bs) = billing_state {
707 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
708 match bs.check_usage_limit(org_id, 1).await {
709 crate::billing::UsageCheckResult::Exceeded(err) => {
710 return crate::billing::usage_limit_response(&err);
711 }
712 crate::billing::UsageCheckResult::ApproachingLimit { usage_percent } => {
713 usage_warning = Some(usage_percent);
714 }
715 crate::billing::UsageCheckResult::Ok => {}
716 }
717 bs.usage.write().await.record_events(org_id, 1);
719 }
720 }
721 #[cfg(not(feature = "saas"))]
722 let _ = &billing_state;
723
724 let mut mgr = manager.write().await;
725
726 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
727 Some(id) => id.clone(),
728 None => {
729 return error_response(
730 StatusCode::UNAUTHORIZED,
731 "invalid_api_key",
732 "Invalid API key",
733 )
734 }
735 };
736
737 if let Err(e) = mgr.check_backpressure() {
739 return tenant_error_response(e);
740 }
741
742 let mut event = Event::new(body.event_type.clone());
743 for (key, value) in &body.fields {
744 let v = json_to_runtime_value(value);
745 event = event.with_field(key.as_str(), v);
746 }
747
748 match mgr
749 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
750 .await
751 {
752 Ok(output_events) => {
753 let events_json: Vec<serde_json::Value> = output_events
754 .iter()
755 .map(|e| {
756 let mut fields = serde_json::Map::new();
757 for (k, v) in &e.data {
758 fields.insert(k.to_string(), crate::websocket::value_to_json(v));
759 }
760 serde_json::json!({
761 "event_type": e.event_type.to_string(),
762 "fields": serde_json::Value::Object(fields),
763 })
764 })
765 .collect();
766 let response = serde_json::json!({
767 "accepted": true,
768 "output_events": events_json,
769 });
770 #[cfg(feature = "saas")]
771 if let Some(pct) = usage_warning {
772 return (
773 StatusCode::OK,
774 [("X-Usage-Warning", format!("approaching_limit ({pct:.0}%)"))],
775 axum::Json(response),
776 )
777 .into_response();
778 }
779 axum::Json(response).into_response()
780 }
781 Err(e) => tenant_error_response(e),
782 }
783}
784
785async fn handle_inject_batch(
786 State(state): State<ApiState>,
787 Path(pipeline_id): Path<String>,
788 ApiKey(api_key): ApiKey,
789 Json(body): Json<InjectBatchRequest>,
790) -> Response {
791 let manager = &state.manager;
792 let billing_state = &state.billing_state;
793 let event_count = body.events.len() as i64;
794
795 #[cfg(feature = "saas")]
797 if let Some(ref bs) = billing_state {
798 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
799 match bs.check_usage_limit(org_id, event_count).await {
800 crate::billing::UsageCheckResult::Exceeded(err) => {
801 return crate::billing::usage_limit_response(&err);
802 }
803 crate::billing::UsageCheckResult::ApproachingLimit { .. }
804 | crate::billing::UsageCheckResult::Ok => {}
805 }
806 bs.usage.write().await.record_events(org_id, event_count);
808 }
809 }
810 #[cfg(not(feature = "saas"))]
811 let _ = (&billing_state, event_count);
812
813 let mut mgr = manager.write().await;
814
815 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
816 Some(id) => id.clone(),
817 None => {
818 return error_response(
819 StatusCode::UNAUTHORIZED,
820 "invalid_api_key",
821 "Invalid API key",
822 )
823 }
824 };
825
826 if let Err(e) = mgr.check_backpressure() {
828 return tenant_error_response(e);
829 }
830
831 let start = std::time::Instant::now();
832 let mut accepted = 0usize;
833 let mut output_events = Vec::new();
834
835 for req in body.events {
836 let mut event = Event::new(req.event_type.clone());
837 for (key, value) in &req.fields {
838 let v = json_to_runtime_value(value);
839 event = event.with_field(key.as_str(), v);
840 }
841
842 match mgr
843 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
844 .await
845 {
846 Ok(outputs) => {
847 accepted += 1;
848 for e in &outputs {
849 let mut flat = serde_json::Map::new();
850 flat.insert(
851 "event_type".to_string(),
852 serde_json::Value::String(e.event_type.to_string()),
853 );
854 for (k, v) in &e.data {
855 flat.insert(k.to_string(), crate::websocket::value_to_json(v));
856 }
857 output_events.push(serde_json::Value::Object(flat));
858 }
859 }
860 Err(TenantError::BackpressureExceeded { .. }) => {
861 break;
863 }
864 Err(_) => {
865 }
867 }
868 }
869
870 let processing_time_us = start.elapsed().as_micros() as u64;
871
872 let resp = InjectBatchResponse {
873 accepted,
874 output_events,
875 processing_time_us,
876 };
877 axum::Json(&resp).into_response()
878}
879
880async fn handle_checkpoint(
881 State(state): State<ApiState>,
882 Path(pipeline_id): Path<String>,
883 ApiKey(api_key): ApiKey,
884) -> Response {
885 let manager = &state.manager;
886 let mgr = manager.read().await;
887
888 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
889 Some(id) => id.clone(),
890 None => {
891 return error_response(
892 StatusCode::UNAUTHORIZED,
893 "invalid_api_key",
894 "Invalid API key",
895 )
896 }
897 };
898
899 let tenant = match mgr.get_tenant(&tenant_id) {
900 Some(t) => t,
901 None => {
902 return error_response(
903 StatusCode::NOT_FOUND,
904 "tenant_not_found",
905 "Tenant not found",
906 )
907 }
908 };
909
910 match tenant.checkpoint_pipeline(&pipeline_id).await {
911 Ok(checkpoint) => {
912 let resp = CheckpointResponse {
913 pipeline_id,
914 events_processed: checkpoint.events_processed,
915 checkpoint,
916 };
917 axum::Json(&resp).into_response()
918 }
919 Err(e) => tenant_error_response(e),
920 }
921}
922
923async fn handle_restore(
924 State(state): State<ApiState>,
925 Path(pipeline_id): Path<String>,
926 ApiKey(api_key): ApiKey,
927 Json(body): Json<RestoreRequest>,
928) -> Response {
929 let manager = &state.manager;
930 let mut mgr = manager.write().await;
931
932 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
933 Some(id) => id.clone(),
934 None => {
935 return error_response(
936 StatusCode::UNAUTHORIZED,
937 "invalid_api_key",
938 "Invalid API key",
939 )
940 }
941 };
942
943 let tenant = match mgr.get_tenant_mut(&tenant_id) {
944 Some(t) => t,
945 None => {
946 return error_response(
947 StatusCode::NOT_FOUND,
948 "tenant_not_found",
949 "Tenant not found",
950 )
951 }
952 };
953
954 match tenant
955 .restore_pipeline(&pipeline_id, &body.checkpoint)
956 .await
957 {
958 Ok(()) => {
959 let resp = RestoreResponse {
960 pipeline_id,
961 restored: true,
962 events_restored: body.checkpoint.events_processed,
963 };
964 axum::Json(&resp).into_response()
965 }
966 Err(e) => tenant_error_response(e),
967 }
968}
969
970async fn handle_metrics(
971 State(state): State<ApiState>,
972 Path(pipeline_id): Path<String>,
973 ApiKey(api_key): ApiKey,
974) -> Response {
975 let manager = &state.manager;
976 let mgr = manager.read().await;
977
978 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
979 Some(id) => id.clone(),
980 None => {
981 return error_response(
982 StatusCode::UNAUTHORIZED,
983 "invalid_api_key",
984 "Invalid API key",
985 )
986 }
987 };
988
989 let tenant = match mgr.get_tenant(&tenant_id) {
990 Some(t) => t,
991 None => {
992 return error_response(
993 StatusCode::NOT_FOUND,
994 "tenant_not_found",
995 "Tenant not found",
996 )
997 }
998 };
999
1000 if !tenant.pipelines.contains_key(&pipeline_id) {
1001 return error_response(
1002 StatusCode::NOT_FOUND,
1003 "pipeline_not_found",
1004 "Pipeline not found",
1005 );
1006 }
1007
1008 let resp = PipelineMetricsResponse {
1009 pipeline_id,
1010 events_processed: tenant.usage.events_processed,
1011 output_events_emitted: tenant.usage.output_events_emitted,
1012 };
1013 axum::Json(&resp).into_response()
1014}
1015
1016async fn handle_topology(
1017 State(state): State<ApiState>,
1018 Path(pipeline_id): Path<String>,
1019 ApiKey(api_key): ApiKey,
1020) -> Response {
1021 let manager = &state.manager;
1022 let mgr = manager.read().await;
1023
1024 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1025 Some(id) => id.clone(),
1026 None => {
1027 return error_response(
1028 StatusCode::UNAUTHORIZED,
1029 "invalid_api_key",
1030 "Invalid API key",
1031 )
1032 }
1033 };
1034
1035 let tenant = match mgr.get_tenant(&tenant_id) {
1036 Some(t) => t,
1037 None => {
1038 return error_response(
1039 StatusCode::NOT_FOUND,
1040 "tenant_not_found",
1041 "Tenant not found",
1042 )
1043 }
1044 };
1045
1046 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1047 Some(p) => p,
1048 None => {
1049 return error_response(
1050 StatusCode::NOT_FOUND,
1051 "pipeline_not_found",
1052 "Pipeline not found",
1053 )
1054 }
1055 };
1056
1057 let engine = pipeline.engine.lock().await;
1058 let topology = engine.topology();
1059 axum::Json(&topology).into_response()
1060}
1061
1062async fn handle_reload(
1063 State(state): State<ApiState>,
1064 Path(pipeline_id): Path<String>,
1065 ApiKey(api_key): ApiKey,
1066 Json(body): Json<ReloadPipelineRequest>,
1067) -> Response {
1068 let manager = &state.manager;
1069 let mut mgr = manager.write().await;
1070
1071 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1072 Some(id) => id.clone(),
1073 None => {
1074 return error_response(
1075 StatusCode::UNAUTHORIZED,
1076 "invalid_api_key",
1077 "Invalid API key",
1078 )
1079 }
1080 };
1081
1082 let result = {
1083 let tenant = match mgr.get_tenant_mut(&tenant_id) {
1084 Some(t) => t,
1085 None => {
1086 return error_response(
1087 StatusCode::NOT_FOUND,
1088 "tenant_not_found",
1089 "Tenant not found",
1090 )
1091 }
1092 };
1093
1094 if let Some(pipeline) = tenant.pipelines.get(&pipeline_id) {
1096 if pipeline.global_template_id.is_some() {
1097 return error_response(
1098 StatusCode::FORBIDDEN,
1099 "global_pipeline_protected",
1100 "Global pipelines can only be managed by admin",
1101 );
1102 }
1103 }
1104
1105 tenant.reload_pipeline(&pipeline_id, body.source).await
1106 };
1107
1108 match result {
1109 Ok(()) => {
1110 mgr.persist_if_needed(&tenant_id);
1111 axum::Json(serde_json::json!({"reloaded": true})).into_response()
1112 }
1113 Err(e) => tenant_error_response(e),
1114 }
1115}
1116
1117async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
1118 let manager = &state.manager;
1119 let mgr = manager.read().await;
1120
1121 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1122 Some(id) => id.clone(),
1123 None => {
1124 return error_response(
1125 StatusCode::UNAUTHORIZED,
1126 "invalid_api_key",
1127 "Invalid API key",
1128 )
1129 }
1130 };
1131
1132 let tenant = match mgr.get_tenant(&tenant_id) {
1133 Some(t) => t,
1134 None => {
1135 return error_response(
1136 StatusCode::NOT_FOUND,
1137 "tenant_not_found",
1138 "Tenant not found",
1139 )
1140 }
1141 };
1142
1143 let resp = UsageResponse {
1144 tenant_id: tenant.id.to_string(),
1145 events_processed: tenant.usage.events_processed,
1146 output_events_emitted: tenant.usage.output_events_emitted,
1147 active_pipelines: tenant.usage.active_pipelines,
1148 quota: QuotaInfo {
1149 max_pipelines: tenant.quota.max_pipelines,
1150 max_events_per_second: tenant.quota.max_events_per_second,
1151 max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
1152 },
1153 };
1154 axum::Json(&resp).into_response()
1155}
1156
1157async fn handle_logs(
1159 State(state): State<ApiState>,
1160 Path(pipeline_id): Path<String>,
1161 ApiKey(api_key): ApiKey,
1162) -> Response {
1163 let manager = &state.manager;
1164 let mgr = manager.read().await;
1165
1166 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1167 Some(id) => id.clone(),
1168 None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
1169 };
1170
1171 let tenant = match mgr.get_tenant(&tenant_id) {
1173 Some(t) => t,
1174 None => {
1175 return error_response(
1176 StatusCode::NOT_FOUND,
1177 "tenant_not_found",
1178 "Tenant not found",
1179 )
1180 }
1181 };
1182
1183 let rx: tokio::sync::broadcast::Receiver<Event> =
1184 match tenant.subscribe_pipeline_logs(&pipeline_id) {
1185 Ok(rx) => rx,
1186 Err(_) => {
1187 return error_response(
1188 StatusCode::NOT_FOUND,
1189 "pipeline_not_found",
1190 &format!("Pipeline {pipeline_id} not found"),
1191 )
1192 }
1193 };
1194
1195 drop(mgr); let stream = stream::unfold(rx, |mut rx| async move {
1199 match rx.recv().await {
1200 Ok(event) => {
1201 let data: serde_json::Map<String, serde_json::Value> = event
1202 .data
1203 .iter()
1204 .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
1205 (k.to_string(), json_from_value(v))
1206 })
1207 .collect();
1208 let json = serde_json::to_string(&LogEvent {
1209 event_type: event.event_type.to_string(),
1210 timestamp: event.timestamp.to_rfc3339(),
1211 data,
1212 })
1213 .unwrap_or_default();
1214 let sse = axum::response::sse::Event::default().data(json);
1215 Some((Ok::<_, Infallible>(sse), rx))
1216 }
1217 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1218 let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1219 let sse = axum::response::sse::Event::default()
1220 .event("warning")
1221 .data(msg);
1222 Some((Ok(sse), rx))
1223 }
1224 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1225 }
1226 });
1227
1228 axum::response::sse::Sse::new(stream)
1229 .keep_alive(axum::response::sse::KeepAlive::default())
1230 .into_response()
1231}
1232
1233#[derive(Serialize)]
1234struct LogEvent {
1235 event_type: String,
1236 timestamp: String,
1237 data: serde_json::Map<String, serde_json::Value>,
1238}
1239
1240fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1241 match v {
1242 varpulis_core::Value::Null => serde_json::Value::Null,
1243 varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1244 varpulis_core::Value::Int(i) => serde_json::json!(*i),
1245 varpulis_core::Value::Float(f) => serde_json::json!(*f),
1246 varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1247 varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1248 varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1249 varpulis_core::Value::Array(arr) => {
1250 serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1251 }
1252 varpulis_core::Value::Map(map) => {
1253 let obj: serde_json::Map<String, serde_json::Value> = map
1254 .iter()
1255 .map(|(k, v)| (k.to_string(), json_from_value(v)))
1256 .collect();
1257 serde_json::Value::Object(obj)
1258 }
1259 }
1260}
1261
1262async fn handle_dlq_get(
1267 State(state): State<ApiState>,
1268 Path(pipeline_id): Path<String>,
1269 ApiKey(api_key): ApiKey,
1270 Query(params): Query<DlqQueryParams>,
1271) -> Response {
1272 let manager = &state.manager;
1273 let mgr = manager.read().await;
1274
1275 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1276 Some(id) => id.clone(),
1277 None => {
1278 return error_response(
1279 StatusCode::UNAUTHORIZED,
1280 "invalid_api_key",
1281 "Invalid API key",
1282 )
1283 }
1284 };
1285
1286 let tenant = match mgr.get_tenant(&tenant_id) {
1287 Some(t) => t,
1288 None => {
1289 return error_response(
1290 StatusCode::NOT_FOUND,
1291 "tenant_not_found",
1292 "Tenant not found",
1293 )
1294 }
1295 };
1296
1297 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1298 Some(p) => p,
1299 None => {
1300 return error_response(
1301 StatusCode::NOT_FOUND,
1302 "pipeline_not_found",
1303 "Pipeline not found",
1304 )
1305 }
1306 };
1307
1308 let engine = pipeline.engine.lock().await;
1309 let dlq = match engine.dlq() {
1310 Some(d) => d,
1311 None => {
1312 let resp = DlqEntriesResponse {
1313 entries: Vec::new(),
1314 total: 0,
1315 };
1316 return axum::Json(&resp).into_response();
1317 }
1318 };
1319
1320 let offset = params.offset.unwrap_or(0);
1321 let limit = params.limit.unwrap_or(100).min(1000);
1322
1323 match dlq.read_entries(offset, limit) {
1324 Ok(entries) => {
1325 let resp = DlqEntriesResponse {
1326 total: dlq.line_count(),
1327 entries,
1328 };
1329 axum::Json(&resp).into_response()
1330 }
1331 Err(e) => error_response(
1332 StatusCode::INTERNAL_SERVER_ERROR,
1333 "dlq_read_error",
1334 &format!("Failed to read DLQ: {e}"),
1335 ),
1336 }
1337}
1338
1339async fn handle_dlq_replay(
1340 State(state): State<ApiState>,
1341 Path(pipeline_id): Path<String>,
1342 ApiKey(api_key): ApiKey,
1343) -> Response {
1344 let manager = &state.manager;
1345 let mgr = manager.read().await;
1346
1347 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1348 Some(id) => id.clone(),
1349 None => {
1350 return error_response(
1351 StatusCode::UNAUTHORIZED,
1352 "invalid_api_key",
1353 "Invalid API key",
1354 )
1355 }
1356 };
1357
1358 let tenant = match mgr.get_tenant(&tenant_id) {
1359 Some(t) => t,
1360 None => {
1361 return error_response(
1362 StatusCode::NOT_FOUND,
1363 "tenant_not_found",
1364 "Tenant not found",
1365 )
1366 }
1367 };
1368
1369 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1370 Some(p) => p,
1371 None => {
1372 return error_response(
1373 StatusCode::NOT_FOUND,
1374 "pipeline_not_found",
1375 "Pipeline not found",
1376 )
1377 }
1378 };
1379
1380 let entries = {
1382 let engine = pipeline.engine.lock().await;
1383 let dlq = match engine.dlq() {
1384 Some(d) => d,
1385 None => {
1386 let resp = DlqReplayResponse { replayed: 0 };
1387 return axum::Json(&resp).into_response();
1388 }
1389 };
1390 match dlq.read_entries(0, 100_000) {
1392 Ok(entries) => entries,
1393 Err(e) => {
1394 return error_response(
1395 StatusCode::INTERNAL_SERVER_ERROR,
1396 "dlq_read_error",
1397 &format!("Failed to read DLQ: {e}"),
1398 )
1399 }
1400 }
1401 };
1402
1403 let mut replayed = 0usize;
1405 {
1406 let mut engine = pipeline.engine.lock().await;
1407 for entry in &entries {
1408 let event_type = entry
1410 .event
1411 .get("event_type")
1412 .and_then(|v| v.as_str())
1413 .unwrap_or("unknown");
1414 let mut event = Event::new(event_type);
1415 if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1416 for (k, v) in data {
1417 let rv = json_to_runtime_value(v);
1418 event = event.with_field(k.as_str(), rv);
1419 }
1420 }
1421 if engine.process(event).await.is_ok() {
1422 replayed += 1;
1423 }
1424 }
1425 }
1426
1427 let resp = DlqReplayResponse { replayed };
1428 axum::Json(&resp).into_response()
1429}
1430
1431async fn handle_dlq_clear(
1432 State(state): State<ApiState>,
1433 Path(pipeline_id): Path<String>,
1434 ApiKey(api_key): ApiKey,
1435) -> Response {
1436 let manager = &state.manager;
1437 let mgr = manager.read().await;
1438
1439 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1440 Some(id) => id.clone(),
1441 None => {
1442 return error_response(
1443 StatusCode::UNAUTHORIZED,
1444 "invalid_api_key",
1445 "Invalid API key",
1446 )
1447 }
1448 };
1449
1450 let tenant = match mgr.get_tenant(&tenant_id) {
1451 Some(t) => t,
1452 None => {
1453 return error_response(
1454 StatusCode::NOT_FOUND,
1455 "tenant_not_found",
1456 "Tenant not found",
1457 )
1458 }
1459 };
1460
1461 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1462 Some(p) => p,
1463 None => {
1464 return error_response(
1465 StatusCode::NOT_FOUND,
1466 "pipeline_not_found",
1467 "Pipeline not found",
1468 )
1469 }
1470 };
1471
1472 let engine = pipeline.engine.lock().await;
1473 match engine.dlq() {
1474 Some(dlq) => match dlq.clear() {
1475 Ok(()) => {
1476 let resp = DlqClearResponse { cleared: true };
1477 axum::Json(&resp).into_response()
1478 }
1479 Err(e) => error_response(
1480 StatusCode::INTERNAL_SERVER_ERROR,
1481 "dlq_clear_error",
1482 &format!("Failed to clear DLQ: {e}"),
1483 ),
1484 },
1485 None => {
1486 let resp = DlqClearResponse { cleared: true };
1487 axum::Json(&resp).into_response()
1488 }
1489 }
1490}
1491
1492#[allow(clippy::result_large_err)]
1499fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1500 match configured {
1501 None => Err(error_response(
1502 StatusCode::FORBIDDEN,
1503 "admin_disabled",
1504 "Admin API is disabled (no --api-key configured)",
1505 )),
1506 Some(key) => {
1507 if constant_time_compare(key, provided) {
1508 Ok(())
1509 } else {
1510 Err(error_response(
1511 StatusCode::UNAUTHORIZED,
1512 "invalid_admin_key",
1513 "Invalid admin key",
1514 ))
1515 }
1516 }
1517 }
1518}
1519
1520fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1521 match tier {
1522 Some("free") => TenantQuota::free(),
1523 Some("pro") => TenantQuota::pro(),
1524 Some("enterprise") => TenantQuota::enterprise(),
1525 _ => TenantQuota::default(),
1526 }
1527}
1528
1529async fn handle_create_tenant(
1530 State(state): State<ApiState>,
1531 AdminKey(admin_key): AdminKey,
1532 Json(body): Json<CreateTenantRequest>,
1533) -> Response {
1534 let manager = &state.manager;
1535 let configured_key = &state.admin_key;
1536 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1537 return resp;
1538 }
1539
1540 let api_key = uuid::Uuid::new_v4().to_string();
1541 let quota = quota_from_tier(body.quota_tier.as_deref());
1542
1543 let mut mgr = manager.write().await;
1544 match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1545 Ok(tenant_id) => {
1546 let resp = TenantResponse {
1547 id: tenant_id.as_str().to_string(),
1548 name: body.name,
1549 api_key,
1550 quota: QuotaInfo {
1551 max_pipelines: quota.max_pipelines,
1552 max_events_per_second: quota.max_events_per_second,
1553 max_streams_per_pipeline: quota.max_streams_per_pipeline,
1554 },
1555 };
1556 (StatusCode::CREATED, axum::Json(&resp)).into_response()
1557 }
1558 Err(e) => tenant_error_response(e),
1559 }
1560}
1561
1562async fn handle_list_tenants(
1563 State(state): State<ApiState>,
1564 AdminKey(admin_key): AdminKey,
1565 Query(pagination): Query<PaginationParams>,
1566) -> Response {
1567 let manager = &state.manager;
1568 let configured_key = &state.admin_key;
1569 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1570 return resp;
1571 }
1572
1573 if pagination.exceeds_max() {
1574 return error_response(
1575 StatusCode::BAD_REQUEST,
1576 "invalid_limit",
1577 &format!("limit must not exceed {MAX_LIMIT}"),
1578 );
1579 }
1580
1581 let mgr = manager.read().await;
1582 let all_tenants: Vec<TenantResponse> = mgr
1583 .list_tenants()
1584 .iter()
1585 .map(|t| TenantResponse {
1586 id: t.id.as_str().to_string(),
1587 name: t.name.clone(),
1588 api_key: format!("{}...", &t.api_key_hash[..8]),
1589 quota: QuotaInfo {
1590 max_pipelines: t.quota.max_pipelines,
1591 max_events_per_second: t.quota.max_events_per_second,
1592 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1593 },
1594 })
1595 .collect();
1596 let (tenants, meta) = pagination.paginate(all_tenants);
1597 let total = meta.total;
1598 let resp = TenantListResponse {
1599 tenants,
1600 total,
1601 pagination: Some(meta),
1602 };
1603 axum::Json(&resp).into_response()
1604}
1605
1606async fn handle_get_tenant(
1607 State(state): State<ApiState>,
1608 Path(tenant_id_str): Path<String>,
1609 AdminKey(admin_key): AdminKey,
1610) -> Response {
1611 let manager = &state.manager;
1612 let configured_key = &state.admin_key;
1613 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1614 return resp;
1615 }
1616
1617 let mgr = manager.read().await;
1618 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1619 match mgr.get_tenant(&tenant_id) {
1620 Some(t) => {
1621 let resp = TenantDetailResponse {
1622 id: t.id.as_str().to_string(),
1623 name: t.name.clone(),
1624 api_key: format!("{}...", &t.api_key_hash[..8]),
1625 quota: QuotaInfo {
1626 max_pipelines: t.quota.max_pipelines,
1627 max_events_per_second: t.quota.max_events_per_second,
1628 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1629 },
1630 usage: TenantUsageInfo {
1631 events_processed: t.usage.events_processed,
1632 output_events_emitted: t.usage.output_events_emitted,
1633 active_pipelines: t.usage.active_pipelines,
1634 },
1635 pipeline_count: t.pipelines.len(),
1636 };
1637 axum::Json(&resp).into_response()
1638 }
1639 None => error_response(
1640 StatusCode::NOT_FOUND,
1641 "tenant_not_found",
1642 "Tenant not found",
1643 ),
1644 }
1645}
1646
1647async fn handle_delete_tenant(
1648 State(state): State<ApiState>,
1649 Path(tenant_id_str): Path<String>,
1650 AdminKey(admin_key): AdminKey,
1651) -> Response {
1652 let manager = &state.manager;
1653 let configured_key = &state.admin_key;
1654 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1655 return resp;
1656 }
1657
1658 let mut mgr = manager.write().await;
1659 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1660 match mgr.remove_tenant(&tenant_id) {
1661 Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1662 Err(e) => tenant_error_response(e),
1663 }
1664}
1665
1666async fn handle_pipeline_to_graph(Json(body): Json<PipelineGraphRequest>) -> Response {
1671 match varpulis_parser::parse(&body.vpl) {
1672 Ok(program) => {
1673 let graph = varpulis_runtime::engine::graph::program_to_graph(&program);
1674 (StatusCode::OK, axum::Json(graph)).into_response()
1675 }
1676 Err(e) => error_response(
1677 StatusCode::BAD_REQUEST,
1678 "parse_error",
1679 &format!("Failed to parse VPL: {e}"),
1680 ),
1681 }
1682}
1683
1684async fn handle_graph_to_pipeline(
1685 Json(graph): Json<varpulis_runtime::engine::graph::PipelineGraph>,
1686) -> Response {
1687 let vpl = varpulis_runtime::engine::graph::graph_to_vpl(&graph);
1688 (StatusCode::OK, axum::Json(GenerateResponse { vpl })).into_response()
1689}
1690
1691fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1696 let body = ApiError {
1697 error: message.to_string(),
1698 code: code.to_string(),
1699 };
1700 (status, axum::Json(body)).into_response()
1701}
1702
1703fn tenant_error_response(err: TenantError) -> Response {
1704 if let TenantError::BackpressureExceeded { current, max } = &err {
1706 let body = serde_json::json!({
1707 "error": format!("queue depth {current} exceeds maximum {max}"),
1708 "code": "queue_depth_exceeded",
1709 "retry_after": 1,
1710 });
1711 return (
1712 StatusCode::TOO_MANY_REQUESTS,
1713 [("Retry-After", "1"), ("Content-Type", "application/json")],
1714 serde_json::to_string(&body).unwrap_or_default(),
1715 )
1716 .into_response();
1717 }
1718
1719 let (status, code) = match &err {
1720 TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1721 TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1722 TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1723 TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1724 TenantError::BackpressureExceeded { .. } => unreachable!(),
1725 TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1726 TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1727 TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1728 };
1729 error_response(status, code, &err.to_string())
1730}
1731
1732fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1733 match v {
1734 serde_json::Value::Null => varpulis_core::Value::Null,
1735 serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1736 serde_json::Value::Number(n) => {
1737 if let Some(i) = n.as_i64() {
1738 varpulis_core::Value::Int(i)
1739 } else if let Some(f) = n.as_f64() {
1740 varpulis_core::Value::Float(f)
1741 } else {
1742 varpulis_core::Value::Null
1743 }
1744 }
1745 serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1746 serde_json::Value::Array(arr) => {
1747 varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1748 }
1749 serde_json::Value::Object(map) => {
1750 let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1751 IndexMap::with_hasher(FxBuildHasher);
1752 for (k, v) in map {
1753 m.insert(k.as_str().into(), json_to_runtime_value(v));
1754 }
1755 varpulis_core::Value::map(m)
1756 }
1757 }
1758}
1759
1760#[cfg(test)]
1761mod tests {
1762 use std::sync::Arc;
1763
1764 use axum::body::Body;
1765 use axum::http::Request;
1766 use tokio::sync::RwLock;
1767 use tower::ServiceExt;
1768 use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1769
1770 use super::*;
1771
1772 struct TestResponse {
1774 status: StatusCode,
1775 body: bytes::Bytes,
1776 headers: axum::http::HeaderMap,
1777 }
1778
1779 impl TestResponse {
1780 fn status(&self) -> StatusCode {
1781 self.status
1782 }
1783 fn body(&self) -> &[u8] {
1784 &self.body
1785 }
1786 fn headers(&self) -> &axum::http::HeaderMap {
1787 &self.headers
1788 }
1789 }
1790
1791 struct TestRequestBuilder {
1793 method: String,
1794 path: String,
1795 headers: Vec<(String, String)>,
1796 body: Option<String>,
1797 }
1798
1799 impl TestRequestBuilder {
1800 fn new() -> Self {
1801 Self {
1802 method: "GET".to_string(),
1803 path: "/".to_string(),
1804 headers: Vec::new(),
1805 body: None,
1806 }
1807 }
1808 fn method(mut self, m: &str) -> Self {
1809 self.method = m.to_string();
1810 self
1811 }
1812 fn path(mut self, p: &str) -> Self {
1813 self.path = p.to_string();
1814 self
1815 }
1816 fn header(mut self, k: &str, v: &str) -> Self {
1817 self.headers.push((k.to_string(), v.to_string()));
1818 self
1819 }
1820 fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1821 self.body = Some(serde_json::to_string(body).unwrap());
1822 self.headers
1823 .push(("content-type".to_string(), "application/json".to_string()));
1824 self
1825 }
1826 async fn reply(self, app: &Router) -> TestResponse {
1827 let mut builder = Request::builder()
1828 .method(self.method.as_str())
1829 .uri(&self.path);
1830 for (k, v) in &self.headers {
1831 builder = builder.header(k.as_str(), v.as_str());
1832 }
1833 let body = match self.body {
1834 Some(b) => Body::from(b),
1835 None => Body::empty(),
1836 };
1837 let req = builder.body(body).unwrap();
1838 let resp = app.clone().oneshot(req).await.unwrap();
1839 let status = resp.status();
1840 let headers = resp.headers().clone();
1841 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1842 .await
1843 .unwrap();
1844 TestResponse {
1845 status,
1846 body,
1847 headers,
1848 }
1849 }
1850 }
1851
1852 fn test_request() -> TestRequestBuilder {
1854 TestRequestBuilder::new()
1855 }
1856
1857 async fn setup_test_manager() -> SharedTenantManager {
1858 let mut mgr = TenantManager::new();
1859 let id = mgr
1860 .create_tenant(
1861 "Test Corp".into(),
1862 "test-key-123".into(),
1863 TenantQuota::default(),
1864 )
1865 .unwrap();
1866
1867 let tenant = mgr.get_tenant_mut(&id).unwrap();
1869 tenant
1870 .deploy_pipeline(
1871 "Test Pipeline".into(),
1872 "stream A = SensorReading .where(x > 1)".into(),
1873 )
1874 .await
1875 .unwrap();
1876
1877 Arc::new(RwLock::new(mgr))
1878 }
1879
1880 #[tokio::test]
1881 async fn test_deploy_pipeline() {
1882 let mgr = setup_test_manager().await;
1883 let routes = api_routes(mgr, None, None, None);
1884
1885 let resp = test_request()
1886 .method("POST")
1887 .path("/api/v1/pipelines")
1888 .header("x-api-key", "test-key-123")
1889 .json(&DeployPipelineRequest {
1890 name: "New Pipeline".into(),
1891 source: "stream B = Events .where(y > 10)".into(),
1892 })
1893 .reply(&routes)
1894 .await;
1895
1896 assert_eq!(resp.status(), StatusCode::CREATED);
1897 let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1898 assert_eq!(body.name, "New Pipeline");
1899 assert_eq!(body.status, "running");
1900 }
1901
1902 #[tokio::test]
1903 async fn test_deploy_invalid_api_key() {
1904 let mgr = setup_test_manager().await;
1905 let routes = api_routes(mgr, None, None, None);
1906
1907 let resp = test_request()
1908 .method("POST")
1909 .path("/api/v1/pipelines")
1910 .header("x-api-key", "wrong-key")
1911 .json(&DeployPipelineRequest {
1912 name: "Bad".into(),
1913 source: "stream X = Y .where(z > 1)".into(),
1914 })
1915 .reply(&routes)
1916 .await;
1917
1918 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1919 }
1920
1921 #[tokio::test]
1922 async fn test_deploy_invalid_vpl() {
1923 let mgr = setup_test_manager().await;
1924 let routes = api_routes(mgr, None, None, None);
1925
1926 let resp = test_request()
1927 .method("POST")
1928 .path("/api/v1/pipelines")
1929 .header("x-api-key", "test-key-123")
1930 .json(&DeployPipelineRequest {
1931 name: "Bad VPL".into(),
1932 source: "this is not valid {{{".into(),
1933 })
1934 .reply(&routes)
1935 .await;
1936
1937 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1938 }
1939
1940 #[tokio::test]
1941 async fn test_list_pipelines() {
1942 let mgr = setup_test_manager().await;
1943 let routes = api_routes(mgr, None, None, None);
1944
1945 let resp = test_request()
1946 .method("GET")
1947 .path("/api/v1/pipelines")
1948 .header("x-api-key", "test-key-123")
1949 .reply(&routes)
1950 .await;
1951
1952 assert_eq!(resp.status(), StatusCode::OK);
1953 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1954 assert_eq!(body.total, 1);
1955 assert_eq!(body.pipelines[0].name, "Test Pipeline");
1956 }
1957
1958 #[tokio::test]
1959 async fn test_usage_endpoint() {
1960 let mgr = setup_test_manager().await;
1961 let routes = api_routes(mgr, None, None, None);
1962
1963 let resp = test_request()
1964 .method("GET")
1965 .path("/api/v1/usage")
1966 .header("x-api-key", "test-key-123")
1967 .reply(&routes)
1968 .await;
1969
1970 assert_eq!(resp.status(), StatusCode::OK);
1971 let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1972 assert_eq!(body.active_pipelines, 1);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_inject_event() {
1977 let mgr = setup_test_manager().await;
1978
1979 let pipeline_id = {
1981 let m = mgr.read().await;
1982 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1983 let tenant = m.get_tenant(&tid).unwrap();
1984 tenant.pipelines.keys().next().unwrap().clone()
1985 };
1986
1987 let routes = api_routes(mgr, None, None, None);
1988
1989 let resp = test_request()
1990 .method("POST")
1991 .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1992 .header("x-api-key", "test-key-123")
1993 .json(&InjectEventRequest {
1994 event_type: "SensorReading".into(),
1995 fields: {
1996 let mut m = serde_json::Map::new();
1997 m.insert(
1998 "x".into(),
1999 serde_json::Value::Number(serde_json::Number::from(42)),
2000 );
2001 m
2002 },
2003 })
2004 .reply(&routes)
2005 .await;
2006
2007 assert_eq!(resp.status(), StatusCode::OK);
2008 }
2009
2010 #[test]
2011 fn test_json_to_runtime_value() {
2012 assert_eq!(
2013 json_to_runtime_value(&serde_json::json!(null)),
2014 varpulis_core::Value::Null
2015 );
2016 assert_eq!(
2017 json_to_runtime_value(&serde_json::json!(true)),
2018 varpulis_core::Value::Bool(true)
2019 );
2020 assert_eq!(
2021 json_to_runtime_value(&serde_json::json!(42)),
2022 varpulis_core::Value::Int(42)
2023 );
2024 assert_eq!(
2025 json_to_runtime_value(&serde_json::json!(1.23)),
2026 varpulis_core::Value::Float(1.23)
2027 );
2028 assert_eq!(
2029 json_to_runtime_value(&serde_json::json!("hello")),
2030 varpulis_core::Value::Str("hello".into())
2031 );
2032 }
2033
2034 #[test]
2035 fn test_error_response_format() {
2036 let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
2037 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2038 }
2039
2040 #[test]
2041 fn test_tenant_error_mapping() {
2042 let resp = tenant_error_response(TenantError::NotFound("t1".into()));
2043 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2044
2045 let resp = tenant_error_response(TenantError::RateLimitExceeded);
2046 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2047
2048 let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
2049 let resp = tenant_error_response(TenantError::ParseError(parse_err));
2050 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2051 }
2052
2053 fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
2058 let mgr = Arc::new(RwLock::new(TenantManager::new()));
2059 let key = admin_key.map(|k| k.to_string());
2060 let routes = api_routes(mgr.clone(), key, None, None);
2061 (mgr, routes)
2062 }
2063
2064 #[tokio::test]
2065 async fn test_create_tenant() {
2066 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2067
2068 let resp = test_request()
2069 .method("POST")
2070 .path("/api/v1/tenants")
2071 .header("x-admin-key", "admin-secret")
2072 .json(&CreateTenantRequest {
2073 name: "Acme Corp".into(),
2074 quota_tier: None,
2075 })
2076 .reply(&routes)
2077 .await;
2078
2079 assert_eq!(resp.status(), StatusCode::CREATED);
2080 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2081 assert_eq!(body.name, "Acme Corp");
2082 assert!(!body.api_key.is_empty());
2083 assert!(!body.id.is_empty());
2084 }
2085
2086 #[tokio::test]
2087 async fn test_list_tenants_admin() {
2088 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2089
2090 for name in &["Tenant A", "Tenant B"] {
2092 test_request()
2093 .method("POST")
2094 .path("/api/v1/tenants")
2095 .header("x-admin-key", "admin-secret")
2096 .json(&CreateTenantRequest {
2097 name: name.to_string(),
2098 quota_tier: None,
2099 })
2100 .reply(&routes)
2101 .await;
2102 }
2103
2104 let resp = test_request()
2105 .method("GET")
2106 .path("/api/v1/tenants")
2107 .header("x-admin-key", "admin-secret")
2108 .reply(&routes)
2109 .await;
2110
2111 assert_eq!(resp.status(), StatusCode::OK);
2112 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2113 assert_eq!(body.total, 2);
2114 }
2115
2116 #[tokio::test]
2117 async fn test_get_tenant_admin() {
2118 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2119
2120 let create_resp = test_request()
2122 .method("POST")
2123 .path("/api/v1/tenants")
2124 .header("x-admin-key", "admin-secret")
2125 .json(&CreateTenantRequest {
2126 name: "Detail Corp".into(),
2127 quota_tier: Some("pro".into()),
2128 })
2129 .reply(&routes)
2130 .await;
2131
2132 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2133
2134 let resp = test_request()
2135 .method("GET")
2136 .path(&format!("/api/v1/tenants/{}", created.id))
2137 .header("x-admin-key", "admin-secret")
2138 .reply(&routes)
2139 .await;
2140
2141 assert_eq!(resp.status(), StatusCode::OK);
2142 let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
2143 assert_eq!(body.name, "Detail Corp");
2144 assert_eq!(body.pipeline_count, 0);
2145 assert_eq!(body.quota.max_pipelines, 20);
2147 }
2148
2149 #[tokio::test]
2150 async fn test_delete_tenant_admin() {
2151 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2152
2153 let create_resp = test_request()
2155 .method("POST")
2156 .path("/api/v1/tenants")
2157 .header("x-admin-key", "admin-secret")
2158 .json(&CreateTenantRequest {
2159 name: "Doomed".into(),
2160 quota_tier: None,
2161 })
2162 .reply(&routes)
2163 .await;
2164 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
2165
2166 let resp = test_request()
2167 .method("DELETE")
2168 .path(&format!("/api/v1/tenants/{}", created.id))
2169 .header("x-admin-key", "admin-secret")
2170 .reply(&routes)
2171 .await;
2172
2173 assert_eq!(resp.status(), StatusCode::OK);
2174
2175 let list_resp = test_request()
2177 .method("GET")
2178 .path("/api/v1/tenants")
2179 .header("x-admin-key", "admin-secret")
2180 .reply(&routes)
2181 .await;
2182 let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2183 assert_eq!(body.total, 0);
2184 }
2185
2186 #[tokio::test]
2187 async fn test_invalid_admin_key() {
2188 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2189
2190 let resp = test_request()
2191 .method("GET")
2192 .path("/api/v1/tenants")
2193 .header("x-admin-key", "wrong-key")
2194 .reply(&routes)
2195 .await;
2196
2197 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2198 }
2199
2200 #[tokio::test]
2201 async fn test_no_admin_key_configured() {
2202 let (_mgr, routes) = setup_admin_routes(None);
2203
2204 let resp = test_request()
2205 .method("GET")
2206 .path("/api/v1/tenants")
2207 .header("x-admin-key", "anything")
2208 .reply(&routes)
2209 .await;
2210
2211 assert_eq!(resp.status(), StatusCode::FORBIDDEN);
2212 }
2213
2214 #[tokio::test]
2215 async fn test_create_tenant_tier_selection() {
2216 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2217
2218 let resp = test_request()
2220 .method("POST")
2221 .path("/api/v1/tenants")
2222 .header("x-admin-key", "admin-secret")
2223 .json(&CreateTenantRequest {
2224 name: "Free User".into(),
2225 quota_tier: Some("free".into()),
2226 })
2227 .reply(&routes)
2228 .await;
2229 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2230 assert_eq!(body.quota.max_pipelines, 5); let resp = test_request()
2234 .method("POST")
2235 .path("/api/v1/tenants")
2236 .header("x-admin-key", "admin-secret")
2237 .json(&CreateTenantRequest {
2238 name: "Enterprise User".into(),
2239 quota_tier: Some("enterprise".into()),
2240 })
2241 .reply(&routes)
2242 .await;
2243 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
2244 assert_eq!(body.quota.max_pipelines, 1000); }
2246
2247 async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2253 let m = mgr.read().await;
2254 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2255 let tenant = m.get_tenant(&tid).unwrap();
2256 tenant.pipelines.keys().next().unwrap().clone()
2257 }
2258
2259 #[tokio::test]
2260 async fn test_get_single_pipeline() {
2261 let mgr = setup_test_manager().await;
2262 let pipeline_id = get_first_pipeline_id(&mgr).await;
2263 let routes = api_routes(mgr, None, None, None);
2264
2265 let resp = test_request()
2266 .method("GET")
2267 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2268 .header("x-api-key", "test-key-123")
2269 .reply(&routes)
2270 .await;
2271
2272 assert_eq!(resp.status(), StatusCode::OK);
2273 let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2274 assert_eq!(body.id, pipeline_id);
2275 assert_eq!(body.name, "Test Pipeline");
2276 assert_eq!(body.status, "running");
2277 assert!(body.source.contains("SensorReading"));
2278 }
2279
2280 #[tokio::test]
2281 async fn test_get_pipeline_not_found() {
2282 let mgr = setup_test_manager().await;
2283 let routes = api_routes(mgr, None, None, None);
2284
2285 let resp = test_request()
2286 .method("GET")
2287 .path("/api/v1/pipelines/nonexistent-id")
2288 .header("x-api-key", "test-key-123")
2289 .reply(&routes)
2290 .await;
2291
2292 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2293 }
2294
2295 #[tokio::test]
2296 async fn test_delete_pipeline_api() {
2297 let mgr = setup_test_manager().await;
2298 let pipeline_id = get_first_pipeline_id(&mgr).await;
2299 let routes = api_routes(mgr.clone(), None, None, None);
2300
2301 let resp = test_request()
2302 .method("DELETE")
2303 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2304 .header("x-api-key", "test-key-123")
2305 .reply(&routes)
2306 .await;
2307
2308 assert_eq!(resp.status(), StatusCode::OK);
2309 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2310 assert_eq!(body["deleted"], true);
2311
2312 let list_resp = test_request()
2314 .method("GET")
2315 .path("/api/v1/pipelines")
2316 .header("x-api-key", "test-key-123")
2317 .reply(&routes)
2318 .await;
2319 let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2320 assert_eq!(list.total, 0);
2321 }
2322
2323 #[tokio::test]
2324 async fn test_delete_pipeline_not_found() {
2325 let mgr = setup_test_manager().await;
2326 let routes = api_routes(mgr, None, None, None);
2327
2328 let resp = test_request()
2329 .method("DELETE")
2330 .path("/api/v1/pipelines/nonexistent-id")
2331 .header("x-api-key", "test-key-123")
2332 .reply(&routes)
2333 .await;
2334
2335 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2336 }
2337
2338 #[tokio::test]
2343 async fn test_inject_batch() {
2344 let mgr = setup_test_manager().await;
2345 let pipeline_id = get_first_pipeline_id(&mgr).await;
2346 let routes = api_routes(mgr, None, None, None);
2347
2348 let resp = test_request()
2349 .method("POST")
2350 .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2351 .header("x-api-key", "test-key-123")
2352 .json(&InjectBatchRequest {
2353 events: vec![
2354 InjectEventRequest {
2355 event_type: "SensorReading".into(),
2356 fields: {
2357 let mut m = serde_json::Map::new();
2358 m.insert("x".into(), serde_json::json!(5));
2359 m
2360 },
2361 },
2362 InjectEventRequest {
2363 event_type: "SensorReading".into(),
2364 fields: {
2365 let mut m = serde_json::Map::new();
2366 m.insert("x".into(), serde_json::json!(10));
2367 m
2368 },
2369 },
2370 ],
2371 })
2372 .reply(&routes)
2373 .await;
2374
2375 assert_eq!(resp.status(), StatusCode::OK);
2376 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2377 assert_eq!(body.accepted, 2);
2378 assert!(body.processing_time_us > 0);
2379 }
2380
2381 #[tokio::test]
2382 async fn test_inject_batch_invalid_pipeline() {
2383 let mgr = setup_test_manager().await;
2384 let routes = api_routes(mgr, None, None, None);
2385
2386 let resp = test_request()
2388 .method("POST")
2389 .path("/api/v1/pipelines/nonexistent/events-batch")
2390 .header("x-api-key", "test-key-123")
2391 .json(&InjectBatchRequest {
2392 events: vec![InjectEventRequest {
2393 event_type: "Test".into(),
2394 fields: serde_json::Map::new(),
2395 }],
2396 })
2397 .reply(&routes)
2398 .await;
2399
2400 assert_eq!(resp.status(), StatusCode::OK);
2402 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2403 assert_eq!(body.accepted, 0);
2404 }
2405
2406 #[tokio::test]
2411 async fn test_checkpoint_pipeline() {
2412 let mgr = setup_test_manager().await;
2413 let pipeline_id = get_first_pipeline_id(&mgr).await;
2414 let routes = api_routes(mgr, None, None, None);
2415
2416 let resp = test_request()
2417 .method("POST")
2418 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2419 .header("x-api-key", "test-key-123")
2420 .reply(&routes)
2421 .await;
2422
2423 assert_eq!(resp.status(), StatusCode::OK);
2424 let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2425 assert_eq!(body.pipeline_id, pipeline_id);
2426 }
2427
2428 #[tokio::test]
2429 async fn test_checkpoint_not_found() {
2430 let mgr = setup_test_manager().await;
2431 let routes = api_routes(mgr, None, None, None);
2432
2433 let resp = test_request()
2434 .method("POST")
2435 .path("/api/v1/pipelines/nonexistent/checkpoint")
2436 .header("x-api-key", "test-key-123")
2437 .reply(&routes)
2438 .await;
2439
2440 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2441 }
2442
2443 #[tokio::test]
2444 async fn test_restore_pipeline() {
2445 let mgr = setup_test_manager().await;
2446 let pipeline_id = get_first_pipeline_id(&mgr).await;
2447 let routes = api_routes(mgr, None, None, None);
2448
2449 let cp_resp = test_request()
2451 .method("POST")
2452 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2453 .header("x-api-key", "test-key-123")
2454 .reply(&routes)
2455 .await;
2456 let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2457
2458 let resp = test_request()
2460 .method("POST")
2461 .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2462 .header("x-api-key", "test-key-123")
2463 .json(&RestoreRequest {
2464 checkpoint: cp.checkpoint,
2465 })
2466 .reply(&routes)
2467 .await;
2468
2469 assert_eq!(resp.status(), StatusCode::OK);
2470 let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2471 assert_eq!(body.pipeline_id, pipeline_id);
2472 assert!(body.restored);
2473 }
2474
2475 #[tokio::test]
2476 async fn test_restore_not_found() {
2477 let mgr = setup_test_manager().await;
2478 let routes = api_routes(mgr, None, None, None);
2479
2480 let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2481 version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2482 window_states: std::collections::HashMap::new(),
2483 sase_states: std::collections::HashMap::new(),
2484 join_states: std::collections::HashMap::new(),
2485 variables: std::collections::HashMap::new(),
2486 events_processed: 0,
2487 output_events_emitted: 0,
2488 watermark_state: None,
2489 distinct_states: std::collections::HashMap::new(),
2490 limit_states: std::collections::HashMap::new(),
2491 };
2492
2493 let resp = test_request()
2494 .method("POST")
2495 .path("/api/v1/pipelines/nonexistent/restore")
2496 .header("x-api-key", "test-key-123")
2497 .json(&RestoreRequest { checkpoint })
2498 .reply(&routes)
2499 .await;
2500
2501 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2502 }
2503
2504 #[tokio::test]
2509 async fn test_metrics_endpoint() {
2510 let mgr = setup_test_manager().await;
2511 let pipeline_id = get_first_pipeline_id(&mgr).await;
2512 let routes = api_routes(mgr, None, None, None);
2513
2514 let resp = test_request()
2515 .method("GET")
2516 .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2517 .header("x-api-key", "test-key-123")
2518 .reply(&routes)
2519 .await;
2520
2521 assert_eq!(resp.status(), StatusCode::OK);
2522 let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2523 assert_eq!(body.pipeline_id, pipeline_id);
2524 }
2525
2526 #[tokio::test]
2527 async fn test_metrics_not_found() {
2528 let mgr = setup_test_manager().await;
2529 let routes = api_routes(mgr, None, None, None);
2530
2531 let resp = test_request()
2532 .method("GET")
2533 .path("/api/v1/pipelines/nonexistent/metrics")
2534 .header("x-api-key", "test-key-123")
2535 .reply(&routes)
2536 .await;
2537
2538 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2539 }
2540
2541 #[tokio::test]
2546 async fn test_reload_pipeline() {
2547 let mgr = setup_test_manager().await;
2548 let pipeline_id = get_first_pipeline_id(&mgr).await;
2549 let routes = api_routes(mgr, None, None, None);
2550
2551 let resp = test_request()
2552 .method("POST")
2553 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2554 .header("x-api-key", "test-key-123")
2555 .json(&ReloadPipelineRequest {
2556 source: "stream B = Events .where(y > 10)".into(),
2557 })
2558 .reply(&routes)
2559 .await;
2560
2561 assert_eq!(resp.status(), StatusCode::OK);
2562 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2563 assert_eq!(body["reloaded"], true);
2564 }
2565
2566 #[tokio::test]
2567 async fn test_reload_invalid_vpl() {
2568 let mgr = setup_test_manager().await;
2569 let pipeline_id = get_first_pipeline_id(&mgr).await;
2570 let routes = api_routes(mgr, None, None, None);
2571
2572 let resp = test_request()
2573 .method("POST")
2574 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2575 .header("x-api-key", "test-key-123")
2576 .json(&ReloadPipelineRequest {
2577 source: "not valid {{{".into(),
2578 })
2579 .reply(&routes)
2580 .await;
2581
2582 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2583 }
2584
2585 #[tokio::test]
2586 async fn test_reload_not_found() {
2587 let mgr = setup_test_manager().await;
2588 let routes = api_routes(mgr, None, None, None);
2589
2590 let resp = test_request()
2591 .method("POST")
2592 .path("/api/v1/pipelines/nonexistent/reload")
2593 .header("x-api-key", "test-key-123")
2594 .json(&ReloadPipelineRequest {
2595 source: "stream B = Events .where(y > 10)".into(),
2596 })
2597 .reply(&routes)
2598 .await;
2599
2600 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2601 }
2602
2603 #[tokio::test]
2608 async fn test_logs_invalid_pipeline() {
2609 let mgr = setup_test_manager().await;
2610 let routes = api_routes(mgr, None, None, None);
2611
2612 let resp = test_request()
2613 .method("GET")
2614 .path("/api/v1/pipelines/nonexistent/logs")
2615 .header("x-api-key", "test-key-123")
2616 .reply(&routes)
2617 .await;
2618
2619 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2620 }
2621
2622 #[tokio::test]
2623 async fn test_logs_invalid_api_key() {
2624 let mgr = setup_test_manager().await;
2625 let pipeline_id = get_first_pipeline_id(&mgr).await;
2626 let routes = api_routes(mgr, None, None, None);
2627
2628 let resp = test_request()
2629 .method("GET")
2630 .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2631 .header("x-api-key", "wrong-key")
2632 .reply(&routes)
2633 .await;
2634
2635 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2636 }
2637
2638 #[test]
2643 fn test_json_to_runtime_value_array() {
2644 let arr = serde_json::json!([1, "hello", true]);
2645 let val = json_to_runtime_value(&arr);
2646 match val {
2647 varpulis_core::Value::Array(a) => {
2648 assert_eq!(a.len(), 3);
2649 assert_eq!(a[0], varpulis_core::Value::Int(1));
2650 assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2651 assert_eq!(a[2], varpulis_core::Value::Bool(true));
2652 }
2653 _ => panic!("Expected Array"),
2654 }
2655 }
2656
2657 #[test]
2658 fn test_json_to_runtime_value_object() {
2659 let obj = serde_json::json!({"key": "val", "num": 42});
2660 let val = json_to_runtime_value(&obj);
2661 match val {
2662 varpulis_core::Value::Map(m) => {
2663 assert_eq!(m.len(), 2);
2664 }
2665 _ => panic!("Expected Map"),
2666 }
2667 }
2668
2669 #[test]
2670 fn test_json_from_value_roundtrip() {
2671 use varpulis_core::Value;
2672 assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2673 assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2674 assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2675 assert_eq!(
2676 json_from_value(&Value::Float(2.71)),
2677 serde_json::json!(2.71)
2678 );
2679 assert_eq!(
2680 json_from_value(&Value::Str("hi".into())),
2681 serde_json::json!("hi")
2682 );
2683 assert_eq!(
2684 json_from_value(&Value::Timestamp(1000000)),
2685 serde_json::json!(1000000)
2686 );
2687 assert_eq!(
2688 json_from_value(&Value::Duration(5000)),
2689 serde_json::json!(5000)
2690 );
2691 }
2692
2693 #[test]
2698 fn test_tenant_error_all_variants() {
2699 let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2700 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2701
2702 let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2703 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2704
2705 let resp = tenant_error_response(TenantError::EngineError(
2706 varpulis_runtime::EngineError::Pipeline("boom".into()),
2707 ));
2708 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2709
2710 let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2711 assert_eq!(resp.status(), StatusCode::CONFLICT);
2712
2713 let resp = tenant_error_response(TenantError::BackpressureExceeded {
2714 current: 50000,
2715 max: 50000,
2716 });
2717 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2718 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2719 }
2720
2721 #[tokio::test]
2726 async fn test_list_pipelines_default_pagination() {
2727 let mgr = setup_test_manager().await;
2728 let routes = api_routes(mgr, None, None, None);
2729
2730 let resp = test_request()
2731 .method("GET")
2732 .path("/api/v1/pipelines")
2733 .header("x-api-key", "test-key-123")
2734 .reply(&routes)
2735 .await;
2736
2737 assert_eq!(resp.status(), StatusCode::OK);
2738 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2739 assert_eq!(body.total, 1);
2740 let pagination = body.pagination.unwrap();
2741 assert_eq!(pagination.total, 1);
2742 assert_eq!(pagination.offset, 0);
2743 assert_eq!(pagination.limit, 50);
2744 assert!(!pagination.has_more);
2745 }
2746
2747 #[tokio::test]
2748 async fn test_list_pipelines_with_pagination_params() {
2749 let mgr = setup_test_manager().await;
2750
2751 {
2753 let mut m = mgr.write().await;
2754 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2755 let tenant = m.get_tenant_mut(&tid).unwrap();
2756 tenant
2757 .deploy_pipeline(
2758 "Pipeline B".into(),
2759 "stream B = Events .where(y > 2)".into(),
2760 )
2761 .await
2762 .unwrap();
2763 tenant
2764 .deploy_pipeline(
2765 "Pipeline C".into(),
2766 "stream C = Events .where(z > 3)".into(),
2767 )
2768 .await
2769 .unwrap();
2770 }
2771
2772 let routes = api_routes(mgr, None, None, None);
2773
2774 let resp = test_request()
2776 .method("GET")
2777 .path("/api/v1/pipelines?limit=1&offset=0")
2778 .header("x-api-key", "test-key-123")
2779 .reply(&routes)
2780 .await;
2781
2782 assert_eq!(resp.status(), StatusCode::OK);
2783 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2784 assert_eq!(body.pipelines.len(), 1);
2785 assert_eq!(body.total, 3);
2786 let pagination = body.pagination.unwrap();
2787 assert!(pagination.has_more);
2788 assert_eq!(pagination.limit, 1);
2789
2790 let resp = test_request()
2792 .method("GET")
2793 .path("/api/v1/pipelines?limit=1&offset=2")
2794 .header("x-api-key", "test-key-123")
2795 .reply(&routes)
2796 .await;
2797
2798 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2799 assert_eq!(body.pipelines.len(), 1);
2800 assert_eq!(body.total, 3);
2801 assert!(!body.pagination.unwrap().has_more);
2802 }
2803
2804 #[tokio::test]
2805 async fn test_list_pipelines_limit_exceeds_max() {
2806 let mgr = setup_test_manager().await;
2807 let routes = api_routes(mgr, None, None, None);
2808
2809 let resp = test_request()
2810 .method("GET")
2811 .path("/api/v1/pipelines?limit=1001")
2812 .header("x-api-key", "test-key-123")
2813 .reply(&routes)
2814 .await;
2815
2816 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2817 }
2818
2819 #[tokio::test]
2820 async fn test_list_tenants_with_pagination() {
2821 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2822
2823 for name in &["T1", "T2", "T3"] {
2825 test_request()
2826 .method("POST")
2827 .path("/api/v1/tenants")
2828 .header("x-admin-key", "admin-secret")
2829 .json(&CreateTenantRequest {
2830 name: name.to_string(),
2831 quota_tier: None,
2832 })
2833 .reply(&routes)
2834 .await;
2835 }
2836
2837 let resp = test_request()
2839 .method("GET")
2840 .path("/api/v1/tenants?limit=2&offset=0")
2841 .header("x-admin-key", "admin-secret")
2842 .reply(&routes)
2843 .await;
2844
2845 assert_eq!(resp.status(), StatusCode::OK);
2846 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2847 assert_eq!(body.tenants.len(), 2);
2848 assert_eq!(body.total, 3);
2849 assert!(body.pagination.unwrap().has_more);
2850
2851 let resp = test_request()
2853 .method("GET")
2854 .path("/api/v1/tenants?limit=2&offset=2")
2855 .header("x-admin-key", "admin-secret")
2856 .reply(&routes)
2857 .await;
2858
2859 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2860 assert_eq!(body.tenants.len(), 1);
2861 assert!(!body.pagination.unwrap().has_more);
2862 }
2863
2864 #[tokio::test]
2865 async fn test_inject_backpressure_429() {
2866 use std::sync::atomic::Ordering;
2867
2868 let mut mgr = TenantManager::new();
2869 mgr.set_max_queue_depth(5);
2870 let id = mgr
2871 .create_tenant(
2872 "BP Corp".into(),
2873 "bp-key-123".into(),
2874 TenantQuota::default(),
2875 )
2876 .unwrap();
2877
2878 let tenant = mgr.get_tenant_mut(&id).unwrap();
2879 let pid = tenant
2880 .deploy_pipeline(
2881 "BP Pipeline".into(),
2882 "stream A = SensorReading .where(x > 1)".into(),
2883 )
2884 .await
2885 .unwrap();
2886
2887 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2889
2890 let shared = Arc::new(RwLock::new(mgr));
2891 let routes = api_routes(shared, None, None, None);
2892
2893 let resp = test_request()
2894 .method("POST")
2895 .path(&format!("/api/v1/pipelines/{pid}/events"))
2896 .header("x-api-key", "bp-key-123")
2897 .json(&InjectEventRequest {
2898 event_type: "SensorReading".into(),
2899 fields: serde_json::Map::new(),
2900 })
2901 .reply(&routes)
2902 .await;
2903
2904 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2905 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2907 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2909 assert_eq!(body["code"], "queue_depth_exceeded");
2910 }
2911
2912 #[tokio::test]
2913 async fn test_inject_batch_backpressure_429() {
2914 use std::sync::atomic::Ordering;
2915
2916 let mut mgr = TenantManager::new();
2917 mgr.set_max_queue_depth(5);
2918 let id = mgr
2919 .create_tenant(
2920 "BP Batch Corp".into(),
2921 "bp-batch-key".into(),
2922 TenantQuota::default(),
2923 )
2924 .unwrap();
2925
2926 let tenant = mgr.get_tenant_mut(&id).unwrap();
2927 let pid = tenant
2928 .deploy_pipeline(
2929 "BP Batch Pipeline".into(),
2930 "stream A = SensorReading .where(x > 1)".into(),
2931 )
2932 .await
2933 .unwrap();
2934
2935 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2937
2938 let shared = Arc::new(RwLock::new(mgr));
2939 let routes = api_routes(shared, None, None, None);
2940
2941 let resp = test_request()
2942 .method("POST")
2943 .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2944 .header("x-api-key", "bp-batch-key")
2945 .json(&InjectBatchRequest {
2946 events: vec![InjectEventRequest {
2947 event_type: "SensorReading".into(),
2948 fields: serde_json::Map::new(),
2949 }],
2950 })
2951 .reply(&routes)
2952 .await;
2953
2954 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2955 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2956 }
2957}