1use crate::auth::constant_time_compare;
7use axum::extract::{Json, Path, Query, State};
8use axum::http::StatusCode;
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, post};
11use axum::Router;
12use futures_util::stream;
13use indexmap::IndexMap;
14use rustc_hash::FxBuildHasher;
15use serde::{Deserialize, Serialize};
16use std::convert::Infallible;
17use tower_http::cors::{Any, CorsLayer};
18use varpulis_runtime::tenant::{SharedTenantManager, TenantError, TenantQuota};
19use varpulis_runtime::Event;
20
21use varpulis_core::pagination::{PaginationMeta, PaginationParams, MAX_LIMIT};
22
23use crate::billing::SharedBillingState;
24
25#[derive(Debug, Deserialize, Serialize)]
30pub struct DeployPipelineRequest {
31 pub name: String,
32 pub source: String,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct DeployPipelineResponse {
37 pub id: String,
38 pub name: String,
39 pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct PipelineInfo {
44 pub id: String,
45 pub name: String,
46 pub status: String,
47 pub source: String,
48 pub uptime_secs: u64,
49}
50
51#[derive(Debug, Serialize, Deserialize)]
52pub struct PipelineListResponse {
53 pub pipelines: Vec<PipelineInfo>,
54 pub total: usize,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub pagination: Option<PaginationMeta>,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct PipelineMetricsResponse {
61 pub pipeline_id: String,
62 pub events_processed: u64,
63 pub output_events_emitted: u64,
64}
65
66#[derive(Debug, Deserialize, Serialize)]
67pub struct InjectEventRequest {
68 pub event_type: String,
69 pub fields: serde_json::Map<String, serde_json::Value>,
70}
71
72#[derive(Debug, Deserialize, Serialize)]
73pub struct InjectBatchRequest {
74 pub events: Vec<InjectEventRequest>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct InjectBatchResponse {
79 pub accepted: usize,
80 pub output_events: Vec<serde_json::Value>,
81 pub processing_time_us: u64,
82}
83
84#[derive(Debug, Deserialize, Serialize)]
85pub struct ReloadPipelineRequest {
86 pub source: String,
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct CheckpointResponse {
91 pub pipeline_id: String,
92 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
93 pub events_processed: u64,
94}
95
96#[derive(Debug, Deserialize, Serialize)]
97pub struct RestoreRequest {
98 pub checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102pub struct RestoreResponse {
103 pub pipeline_id: String,
104 pub restored: bool,
105 pub events_restored: u64,
106}
107
108#[derive(Debug, Serialize)]
109pub struct ApiError {
110 pub error: String,
111 pub code: String,
112}
113
114#[derive(Debug, Deserialize)]
115pub struct DlqQueryParams {
116 #[serde(default)]
117 pub offset: Option<usize>,
118 #[serde(default)]
119 pub limit: Option<usize>,
120}
121
122#[derive(Debug, Serialize)]
123pub struct DlqEntriesResponse {
124 pub entries: Vec<varpulis_runtime::dead_letter::DlqEntryOwned>,
125 pub total: u64,
126}
127
128#[derive(Debug, Serialize)]
129pub struct DlqReplayResponse {
130 pub replayed: usize,
131}
132
133#[derive(Debug, Serialize)]
134pub struct DlqClearResponse {
135 pub cleared: bool,
136}
137
138#[derive(Debug, Serialize, Deserialize)]
139pub struct UsageResponse {
140 pub tenant_id: String,
141 pub events_processed: u64,
142 pub output_events_emitted: u64,
143 pub active_pipelines: usize,
144 pub quota: QuotaInfo,
145}
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct QuotaInfo {
149 pub max_pipelines: usize,
150 pub max_events_per_second: u64,
151 pub max_streams_per_pipeline: usize,
152}
153
154#[derive(Debug, Deserialize, Serialize)]
159pub struct CreateTenantRequest {
160 pub name: String,
161 #[serde(default)]
162 pub quota_tier: Option<String>,
163}
164
165#[derive(Debug, Serialize, Deserialize)]
166pub struct TenantResponse {
167 pub id: String,
168 pub name: String,
169 pub api_key: String,
170 pub quota: QuotaInfo,
171}
172
173#[derive(Debug, Serialize, Deserialize)]
174pub struct TenantListResponse {
175 pub tenants: Vec<TenantResponse>,
176 pub total: usize,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub pagination: Option<PaginationMeta>,
179}
180
181#[derive(Debug, Serialize, Deserialize)]
182pub struct TenantDetailResponse {
183 pub id: String,
184 pub name: String,
185 pub api_key: String,
186 pub quota: QuotaInfo,
187 pub usage: TenantUsageInfo,
188 pub pipeline_count: usize,
189}
190
191#[derive(Debug, Serialize, Deserialize)]
192pub struct TenantUsageInfo {
193 pub events_processed: u64,
194 pub output_events_emitted: u64,
195 pub active_pipelines: usize,
196}
197
198fn build_cors(origins: Option<Vec<String>>) -> CorsLayer {
207 use axum::http::{HeaderValue, Method};
208
209 let base = CorsLayer::new()
210 .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
211 .allow_headers([
212 "content-type".parse().unwrap(),
213 "x-api-key".parse().unwrap(),
214 "authorization".parse().unwrap(),
215 ]);
216
217 match origins {
218 Some(ref list) if !list.is_empty() && !list.iter().any(|o| o == "*") => {
219 let origins: Vec<HeaderValue> = list.iter().filter_map(|s| s.parse().ok()).collect();
220 base.allow_origin(origins)
221 }
222 _ => base.allow_origin(Any),
223 }
224}
225
226#[derive(Debug, Clone)]
228pub struct ApiState {
229 pub manager: SharedTenantManager,
230 pub admin_key: Option<String>,
231 pub billing_state: Option<SharedBillingState>,
232}
233
234#[derive(Debug)]
236pub struct ApiKey(pub String);
237
238impl<S> axum::extract::FromRequestParts<S> for ApiKey
239where
240 S: Send + Sync,
241{
242 type Rejection = Response;
243
244 async fn from_request_parts(
245 parts: &mut axum::http::request::Parts,
246 _state: &S,
247 ) -> Result<Self, Self::Rejection> {
248 parts
249 .headers
250 .get("x-api-key")
251 .and_then(|v| v.to_str().ok())
252 .map(|s| Self(s.to_string()))
253 .ok_or_else(|| {
254 (
255 StatusCode::UNAUTHORIZED,
256 axum::Json(serde_json::json!({"error": "Missing X-API-Key header"})),
257 )
258 .into_response()
259 })
260 }
261}
262
263#[derive(Debug)]
265pub struct AdminKey(pub String);
266
267impl<S> axum::extract::FromRequestParts<S> for AdminKey
268where
269 S: Send + Sync,
270{
271 type Rejection = Response;
272
273 async fn from_request_parts(
274 parts: &mut axum::http::request::Parts,
275 _state: &S,
276 ) -> Result<Self, Self::Rejection> {
277 parts
278 .headers
279 .get("x-admin-key")
280 .and_then(|v| v.to_str().ok())
281 .map(|s| Self(s.to_string()))
282 .ok_or_else(|| {
283 (
284 StatusCode::UNAUTHORIZED,
285 axum::Json(serde_json::json!({"error": "Missing X-Admin-Key header"})),
286 )
287 .into_response()
288 })
289 }
290}
291
292pub fn api_routes(
294 manager: SharedTenantManager,
295 admin_key: Option<String>,
296 cors_origins: Option<Vec<String>>,
297 billing_state: Option<SharedBillingState>,
298) -> Router {
299 let state = ApiState {
300 manager,
301 admin_key,
302 billing_state,
303 };
304
305 let cors = build_cors(cors_origins);
306
307 Router::new()
308 .route("/api/v1/pipelines", post(handle_deploy).get(handle_list))
310 .route(
311 "/api/v1/pipelines/{pipeline_id}",
312 get(handle_get).delete(handle_delete),
313 )
314 .route(
316 "/api/v1/pipelines/{pipeline_id}/events",
317 post(handle_inject),
318 )
319 .route(
320 "/api/v1/pipelines/{pipeline_id}/events-batch",
321 post(handle_inject_batch),
322 )
323 .route(
324 "/api/v1/pipelines/{pipeline_id}/checkpoint",
325 post(handle_checkpoint),
326 )
327 .route(
328 "/api/v1/pipelines/{pipeline_id}/restore",
329 post(handle_restore),
330 )
331 .route(
332 "/api/v1/pipelines/{pipeline_id}/metrics",
333 get(handle_metrics),
334 )
335 .route(
336 "/api/v1/pipelines/{pipeline_id}/reload",
337 post(handle_reload),
338 )
339 .route("/api/v1/usage", get(handle_usage))
340 .route("/api/v1/pipelines/{pipeline_id}/logs", get(handle_logs))
341 .route(
343 "/api/v1/pipelines/{pipeline_id}/dlq",
344 get(handle_dlq_get).delete(handle_dlq_clear),
345 )
346 .route(
347 "/api/v1/pipelines/{pipeline_id}/dlq/replay",
348 post(handle_dlq_replay),
349 )
350 .route(
352 "/api/v1/tenants",
353 post(handle_create_tenant).get(handle_list_tenants),
354 )
355 .route(
356 "/api/v1/tenants/{tenant_id}",
357 get(handle_get_tenant).delete(handle_delete_tenant),
358 )
359 .layer(cors)
360 .with_state(state)
361}
362
363async fn handle_deploy(
368 State(state): State<ApiState>,
369 ApiKey(api_key): ApiKey,
370 Json(body): Json<DeployPipelineRequest>,
371) -> Response {
372 let manager = &state.manager;
373 let mut mgr = manager.write().await;
374
375 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
376 Some(id) => id.clone(),
377 None => {
378 return error_response(
379 StatusCode::UNAUTHORIZED,
380 "invalid_api_key",
381 "Invalid API key",
382 )
383 }
384 };
385
386 let result = mgr
387 .deploy_pipeline_on_tenant(&tenant_id, body.name.clone(), body.source)
388 .await;
389
390 match result {
391 Ok(id) => {
392 mgr.persist_if_needed(&tenant_id);
393 let resp = DeployPipelineResponse {
394 id,
395 name: body.name,
396 status: "running".to_string(),
397 };
398 (StatusCode::CREATED, axum::Json(&resp)).into_response()
399 }
400 Err(e) => tenant_error_response(e),
401 }
402}
403
404async fn handle_list(
405 State(state): State<ApiState>,
406 ApiKey(api_key): ApiKey,
407 Query(pagination): Query<PaginationParams>,
408) -> Response {
409 let manager = &state.manager;
410 if pagination.exceeds_max() {
411 return error_response(
412 StatusCode::BAD_REQUEST,
413 "invalid_limit",
414 &format!("limit must not exceed {MAX_LIMIT}"),
415 );
416 }
417
418 let mgr = manager.read().await;
419
420 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
421 Some(id) => id.clone(),
422 None => {
423 return error_response(
424 StatusCode::UNAUTHORIZED,
425 "invalid_api_key",
426 "Invalid API key",
427 )
428 }
429 };
430
431 let tenant = match mgr.get_tenant(&tenant_id) {
432 Some(t) => t,
433 None => {
434 return error_response(
435 StatusCode::NOT_FOUND,
436 "tenant_not_found",
437 "Tenant not found",
438 )
439 }
440 };
441
442 let all_pipelines: Vec<PipelineInfo> = tenant
443 .pipelines
444 .values()
445 .map(|p| PipelineInfo {
446 id: p.id.clone(),
447 name: p.name.clone(),
448 status: p.status.to_string(),
449 source: p.source.clone(),
450 uptime_secs: p.created_at.elapsed().as_secs(),
451 })
452 .collect();
453
454 let (pipelines, meta) = pagination.paginate(all_pipelines);
455 let total = meta.total;
456 let resp = PipelineListResponse {
457 pipelines,
458 total,
459 pagination: Some(meta),
460 };
461 axum::Json(&resp).into_response()
462}
463
464async fn handle_get(
465 State(state): State<ApiState>,
466 Path(pipeline_id): Path<String>,
467 ApiKey(api_key): ApiKey,
468) -> Response {
469 let manager = &state.manager;
470 let mgr = manager.read().await;
471
472 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
473 Some(id) => id.clone(),
474 None => {
475 return error_response(
476 StatusCode::UNAUTHORIZED,
477 "invalid_api_key",
478 "Invalid API key",
479 )
480 }
481 };
482
483 let tenant = match mgr.get_tenant(&tenant_id) {
484 Some(t) => t,
485 None => {
486 return error_response(
487 StatusCode::NOT_FOUND,
488 "tenant_not_found",
489 "Tenant not found",
490 )
491 }
492 };
493
494 match tenant.pipelines.get(&pipeline_id) {
495 Some(p) => {
496 let info = PipelineInfo {
497 id: p.id.clone(),
498 name: p.name.clone(),
499 status: p.status.to_string(),
500 source: p.source.clone(),
501 uptime_secs: p.created_at.elapsed().as_secs(),
502 };
503 axum::Json(&info).into_response()
504 }
505 None => error_response(
506 StatusCode::NOT_FOUND,
507 "pipeline_not_found",
508 "Pipeline not found",
509 ),
510 }
511}
512
513async fn handle_delete(
514 State(state): State<ApiState>,
515 Path(pipeline_id): Path<String>,
516 ApiKey(api_key): ApiKey,
517) -> Response {
518 let manager = &state.manager;
519 let mut mgr = manager.write().await;
520
521 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
522 Some(id) => id.clone(),
523 None => {
524 return error_response(
525 StatusCode::UNAUTHORIZED,
526 "invalid_api_key",
527 "Invalid API key",
528 )
529 }
530 };
531
532 let result = {
533 let tenant = match mgr.get_tenant_mut(&tenant_id) {
534 Some(t) => t,
535 None => {
536 return error_response(
537 StatusCode::NOT_FOUND,
538 "tenant_not_found",
539 "Tenant not found",
540 )
541 }
542 };
543 tenant.remove_pipeline(&pipeline_id)
544 };
545
546 match result {
547 Ok(()) => {
548 mgr.persist_if_needed(&tenant_id);
549 axum::Json(serde_json::json!({"deleted": true})).into_response()
550 }
551 Err(e) => tenant_error_response(e),
552 }
553}
554
555async fn handle_inject(
556 State(state): State<ApiState>,
557 Path(pipeline_id): Path<String>,
558 ApiKey(api_key): ApiKey,
559 Json(body): Json<InjectEventRequest>,
560) -> Response {
561 let manager = &state.manager;
562 let billing_state = &state.billing_state;
563 #[cfg(feature = "saas")]
565 if let Some(ref bs) = billing_state {
566 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
567 if let Err(err) = bs.check_usage_limit(org_id, 1).await {
568 return crate::billing::usage_limit_response(&err);
569 }
570 bs.usage.write().await.record_events(org_id, 1);
572 }
573 }
574 #[cfg(not(feature = "saas"))]
575 let _ = &billing_state;
576
577 let mut mgr = manager.write().await;
578
579 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
580 Some(id) => id.clone(),
581 None => {
582 return error_response(
583 StatusCode::UNAUTHORIZED,
584 "invalid_api_key",
585 "Invalid API key",
586 )
587 }
588 };
589
590 if let Err(e) = mgr.check_backpressure() {
592 return tenant_error_response(e);
593 }
594
595 let mut event = Event::new(body.event_type.clone());
596 for (key, value) in &body.fields {
597 let v = json_to_runtime_value(value);
598 event = event.with_field(key.as_str(), v);
599 }
600
601 match mgr
602 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
603 .await
604 {
605 Ok(output_events) => {
606 let events_json: Vec<serde_json::Value> = output_events
607 .iter()
608 .map(|e| {
609 let mut fields = serde_json::Map::new();
610 for (k, v) in &e.data {
611 fields.insert(k.to_string(), crate::websocket::value_to_json(v));
612 }
613 serde_json::json!({
614 "event_type": e.event_type.to_string(),
615 "fields": serde_json::Value::Object(fields),
616 })
617 })
618 .collect();
619 let response = serde_json::json!({
620 "accepted": true,
621 "output_events": events_json,
622 });
623 axum::Json(response).into_response()
624 }
625 Err(e) => tenant_error_response(e),
626 }
627}
628
629async fn handle_inject_batch(
630 State(state): State<ApiState>,
631 Path(pipeline_id): Path<String>,
632 ApiKey(api_key): ApiKey,
633 Json(body): Json<InjectBatchRequest>,
634) -> Response {
635 let manager = &state.manager;
636 let billing_state = &state.billing_state;
637 let event_count = body.events.len() as i64;
638
639 #[cfg(feature = "saas")]
641 if let Some(ref bs) = billing_state {
642 if let Some(org_id) = bs.org_id_for_api_key(&api_key).await {
643 if let Err(err) = bs.check_usage_limit(org_id, event_count).await {
644 return crate::billing::usage_limit_response(&err);
645 }
646 bs.usage.write().await.record_events(org_id, event_count);
648 }
649 }
650 #[cfg(not(feature = "saas"))]
651 let _ = (&billing_state, event_count);
652
653 let mut mgr = manager.write().await;
654
655 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
656 Some(id) => id.clone(),
657 None => {
658 return error_response(
659 StatusCode::UNAUTHORIZED,
660 "invalid_api_key",
661 "Invalid API key",
662 )
663 }
664 };
665
666 if let Err(e) = mgr.check_backpressure() {
668 return tenant_error_response(e);
669 }
670
671 let start = std::time::Instant::now();
672 let mut accepted = 0usize;
673 let mut output_events = Vec::new();
674
675 for req in body.events {
676 let mut event = Event::new(req.event_type.clone());
677 for (key, value) in &req.fields {
678 let v = json_to_runtime_value(value);
679 event = event.with_field(key.as_str(), v);
680 }
681
682 match mgr
683 .process_event_with_backpressure(&tenant_id, &pipeline_id, event)
684 .await
685 {
686 Ok(outputs) => {
687 accepted += 1;
688 for e in &outputs {
689 let mut flat = serde_json::Map::new();
690 flat.insert(
691 "event_type".to_string(),
692 serde_json::Value::String(e.event_type.to_string()),
693 );
694 for (k, v) in &e.data {
695 flat.insert(k.to_string(), crate::websocket::value_to_json(v));
696 }
697 output_events.push(serde_json::Value::Object(flat));
698 }
699 }
700 Err(TenantError::BackpressureExceeded { .. }) => {
701 break;
703 }
704 Err(_) => {
705 }
707 }
708 }
709
710 let processing_time_us = start.elapsed().as_micros() as u64;
711
712 let resp = InjectBatchResponse {
713 accepted,
714 output_events,
715 processing_time_us,
716 };
717 axum::Json(&resp).into_response()
718}
719
720async fn handle_checkpoint(
721 State(state): State<ApiState>,
722 Path(pipeline_id): Path<String>,
723 ApiKey(api_key): ApiKey,
724) -> Response {
725 let manager = &state.manager;
726 let mgr = manager.read().await;
727
728 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
729 Some(id) => id.clone(),
730 None => {
731 return error_response(
732 StatusCode::UNAUTHORIZED,
733 "invalid_api_key",
734 "Invalid API key",
735 )
736 }
737 };
738
739 let tenant = match mgr.get_tenant(&tenant_id) {
740 Some(t) => t,
741 None => {
742 return error_response(
743 StatusCode::NOT_FOUND,
744 "tenant_not_found",
745 "Tenant not found",
746 )
747 }
748 };
749
750 match tenant.checkpoint_pipeline(&pipeline_id).await {
751 Ok(checkpoint) => {
752 let resp = CheckpointResponse {
753 pipeline_id,
754 events_processed: checkpoint.events_processed,
755 checkpoint,
756 };
757 axum::Json(&resp).into_response()
758 }
759 Err(e) => tenant_error_response(e),
760 }
761}
762
763async fn handle_restore(
764 State(state): State<ApiState>,
765 Path(pipeline_id): Path<String>,
766 ApiKey(api_key): ApiKey,
767 Json(body): Json<RestoreRequest>,
768) -> Response {
769 let manager = &state.manager;
770 let mut mgr = manager.write().await;
771
772 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
773 Some(id) => id.clone(),
774 None => {
775 return error_response(
776 StatusCode::UNAUTHORIZED,
777 "invalid_api_key",
778 "Invalid API key",
779 )
780 }
781 };
782
783 let tenant = match mgr.get_tenant_mut(&tenant_id) {
784 Some(t) => t,
785 None => {
786 return error_response(
787 StatusCode::NOT_FOUND,
788 "tenant_not_found",
789 "Tenant not found",
790 )
791 }
792 };
793
794 match tenant
795 .restore_pipeline(&pipeline_id, &body.checkpoint)
796 .await
797 {
798 Ok(()) => {
799 let resp = RestoreResponse {
800 pipeline_id,
801 restored: true,
802 events_restored: body.checkpoint.events_processed,
803 };
804 axum::Json(&resp).into_response()
805 }
806 Err(e) => tenant_error_response(e),
807 }
808}
809
810async fn handle_metrics(
811 State(state): State<ApiState>,
812 Path(pipeline_id): Path<String>,
813 ApiKey(api_key): ApiKey,
814) -> Response {
815 let manager = &state.manager;
816 let mgr = manager.read().await;
817
818 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
819 Some(id) => id.clone(),
820 None => {
821 return error_response(
822 StatusCode::UNAUTHORIZED,
823 "invalid_api_key",
824 "Invalid API key",
825 )
826 }
827 };
828
829 let tenant = match mgr.get_tenant(&tenant_id) {
830 Some(t) => t,
831 None => {
832 return error_response(
833 StatusCode::NOT_FOUND,
834 "tenant_not_found",
835 "Tenant not found",
836 )
837 }
838 };
839
840 if !tenant.pipelines.contains_key(&pipeline_id) {
841 return error_response(
842 StatusCode::NOT_FOUND,
843 "pipeline_not_found",
844 "Pipeline not found",
845 );
846 }
847
848 let resp = PipelineMetricsResponse {
849 pipeline_id,
850 events_processed: tenant.usage.events_processed,
851 output_events_emitted: tenant.usage.output_events_emitted,
852 };
853 axum::Json(&resp).into_response()
854}
855
856async fn handle_reload(
857 State(state): State<ApiState>,
858 Path(pipeline_id): Path<String>,
859 ApiKey(api_key): ApiKey,
860 Json(body): Json<ReloadPipelineRequest>,
861) -> Response {
862 let manager = &state.manager;
863 let mut mgr = manager.write().await;
864
865 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
866 Some(id) => id.clone(),
867 None => {
868 return error_response(
869 StatusCode::UNAUTHORIZED,
870 "invalid_api_key",
871 "Invalid API key",
872 )
873 }
874 };
875
876 let result = {
877 let tenant = match mgr.get_tenant_mut(&tenant_id) {
878 Some(t) => t,
879 None => {
880 return error_response(
881 StatusCode::NOT_FOUND,
882 "tenant_not_found",
883 "Tenant not found",
884 )
885 }
886 };
887 tenant.reload_pipeline(&pipeline_id, body.source).await
888 };
889
890 match result {
891 Ok(()) => {
892 mgr.persist_if_needed(&tenant_id);
893 axum::Json(serde_json::json!({"reloaded": true})).into_response()
894 }
895 Err(e) => tenant_error_response(e),
896 }
897}
898
899async fn handle_usage(State(state): State<ApiState>, ApiKey(api_key): ApiKey) -> Response {
900 let manager = &state.manager;
901 let mgr = manager.read().await;
902
903 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
904 Some(id) => id.clone(),
905 None => {
906 return error_response(
907 StatusCode::UNAUTHORIZED,
908 "invalid_api_key",
909 "Invalid API key",
910 )
911 }
912 };
913
914 let tenant = match mgr.get_tenant(&tenant_id) {
915 Some(t) => t,
916 None => {
917 return error_response(
918 StatusCode::NOT_FOUND,
919 "tenant_not_found",
920 "Tenant not found",
921 )
922 }
923 };
924
925 let resp = UsageResponse {
926 tenant_id: tenant.id.to_string(),
927 events_processed: tenant.usage.events_processed,
928 output_events_emitted: tenant.usage.output_events_emitted,
929 active_pipelines: tenant.usage.active_pipelines,
930 quota: QuotaInfo {
931 max_pipelines: tenant.quota.max_pipelines,
932 max_events_per_second: tenant.quota.max_events_per_second,
933 max_streams_per_pipeline: tenant.quota.max_streams_per_pipeline,
934 },
935 };
936 axum::Json(&resp).into_response()
937}
938
939async fn handle_logs(
941 State(state): State<ApiState>,
942 Path(pipeline_id): Path<String>,
943 ApiKey(api_key): ApiKey,
944) -> Response {
945 let manager = &state.manager;
946 let mgr = manager.read().await;
947
948 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
949 Some(id) => id.clone(),
950 None => return error_response(StatusCode::UNAUTHORIZED, "invalid_key", "Invalid API key"),
951 };
952
953 let tenant = match mgr.get_tenant(&tenant_id) {
955 Some(t) => t,
956 None => {
957 return error_response(
958 StatusCode::NOT_FOUND,
959 "tenant_not_found",
960 "Tenant not found",
961 )
962 }
963 };
964
965 let rx: tokio::sync::broadcast::Receiver<Event> =
966 match tenant.subscribe_pipeline_logs(&pipeline_id) {
967 Ok(rx) => rx,
968 Err(_) => {
969 return error_response(
970 StatusCode::NOT_FOUND,
971 "pipeline_not_found",
972 &format!("Pipeline {pipeline_id} not found"),
973 )
974 }
975 };
976
977 drop(mgr); let stream = stream::unfold(rx, |mut rx| async move {
981 match rx.recv().await {
982 Ok(event) => {
983 let data: serde_json::Map<String, serde_json::Value> = event
984 .data
985 .iter()
986 .map(|(k, v): (&std::sync::Arc<str>, &varpulis_core::Value)| {
987 (k.to_string(), json_from_value(v))
988 })
989 .collect();
990 let json = serde_json::to_string(&LogEvent {
991 event_type: event.event_type.to_string(),
992 timestamp: event.timestamp.to_rfc3339(),
993 data,
994 })
995 .unwrap_or_default();
996 let sse = axum::response::sse::Event::default().data(json);
997 Some((Ok::<_, Infallible>(sse), rx))
998 }
999 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1000 let msg = format!("{{\"warning\":\"skipped {n} events\"}}");
1001 let sse = axum::response::sse::Event::default()
1002 .event("warning")
1003 .data(msg);
1004 Some((Ok(sse), rx))
1005 }
1006 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
1007 }
1008 });
1009
1010 axum::response::sse::Sse::new(stream)
1011 .keep_alive(axum::response::sse::KeepAlive::default())
1012 .into_response()
1013}
1014
1015#[derive(Serialize)]
1016struct LogEvent {
1017 event_type: String,
1018 timestamp: String,
1019 data: serde_json::Map<String, serde_json::Value>,
1020}
1021
1022fn json_from_value(v: &varpulis_core::Value) -> serde_json::Value {
1023 match v {
1024 varpulis_core::Value::Null => serde_json::Value::Null,
1025 varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
1026 varpulis_core::Value::Int(i) => serde_json::json!(*i),
1027 varpulis_core::Value::Float(f) => serde_json::json!(*f),
1028 varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
1029 varpulis_core::Value::Timestamp(ns) => serde_json::json!(*ns),
1030 varpulis_core::Value::Duration(ns) => serde_json::json!(*ns),
1031 varpulis_core::Value::Array(arr) => {
1032 serde_json::Value::Array(arr.iter().map(json_from_value).collect())
1033 }
1034 varpulis_core::Value::Map(map) => {
1035 let obj: serde_json::Map<String, serde_json::Value> = map
1036 .iter()
1037 .map(|(k, v)| (k.to_string(), json_from_value(v)))
1038 .collect();
1039 serde_json::Value::Object(obj)
1040 }
1041 }
1042}
1043
1044async fn handle_dlq_get(
1049 State(state): State<ApiState>,
1050 Path(pipeline_id): Path<String>,
1051 ApiKey(api_key): ApiKey,
1052 Query(params): Query<DlqQueryParams>,
1053) -> Response {
1054 let manager = &state.manager;
1055 let mgr = manager.read().await;
1056
1057 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1058 Some(id) => id.clone(),
1059 None => {
1060 return error_response(
1061 StatusCode::UNAUTHORIZED,
1062 "invalid_api_key",
1063 "Invalid API key",
1064 )
1065 }
1066 };
1067
1068 let tenant = match mgr.get_tenant(&tenant_id) {
1069 Some(t) => t,
1070 None => {
1071 return error_response(
1072 StatusCode::NOT_FOUND,
1073 "tenant_not_found",
1074 "Tenant not found",
1075 )
1076 }
1077 };
1078
1079 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1080 Some(p) => p,
1081 None => {
1082 return error_response(
1083 StatusCode::NOT_FOUND,
1084 "pipeline_not_found",
1085 "Pipeline not found",
1086 )
1087 }
1088 };
1089
1090 let engine = pipeline.engine.lock().await;
1091 let dlq = match engine.dlq() {
1092 Some(d) => d,
1093 None => {
1094 let resp = DlqEntriesResponse {
1095 entries: Vec::new(),
1096 total: 0,
1097 };
1098 return axum::Json(&resp).into_response();
1099 }
1100 };
1101
1102 let offset = params.offset.unwrap_or(0);
1103 let limit = params.limit.unwrap_or(100).min(1000);
1104
1105 match dlq.read_entries(offset, limit) {
1106 Ok(entries) => {
1107 let resp = DlqEntriesResponse {
1108 total: dlq.line_count(),
1109 entries,
1110 };
1111 axum::Json(&resp).into_response()
1112 }
1113 Err(e) => error_response(
1114 StatusCode::INTERNAL_SERVER_ERROR,
1115 "dlq_read_error",
1116 &format!("Failed to read DLQ: {e}"),
1117 ),
1118 }
1119}
1120
1121async fn handle_dlq_replay(
1122 State(state): State<ApiState>,
1123 Path(pipeline_id): Path<String>,
1124 ApiKey(api_key): ApiKey,
1125) -> Response {
1126 let manager = &state.manager;
1127 let mgr = manager.read().await;
1128
1129 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1130 Some(id) => id.clone(),
1131 None => {
1132 return error_response(
1133 StatusCode::UNAUTHORIZED,
1134 "invalid_api_key",
1135 "Invalid API key",
1136 )
1137 }
1138 };
1139
1140 let tenant = match mgr.get_tenant(&tenant_id) {
1141 Some(t) => t,
1142 None => {
1143 return error_response(
1144 StatusCode::NOT_FOUND,
1145 "tenant_not_found",
1146 "Tenant not found",
1147 )
1148 }
1149 };
1150
1151 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1152 Some(p) => p,
1153 None => {
1154 return error_response(
1155 StatusCode::NOT_FOUND,
1156 "pipeline_not_found",
1157 "Pipeline not found",
1158 )
1159 }
1160 };
1161
1162 let entries = {
1164 let engine = pipeline.engine.lock().await;
1165 let dlq = match engine.dlq() {
1166 Some(d) => d,
1167 None => {
1168 let resp = DlqReplayResponse { replayed: 0 };
1169 return axum::Json(&resp).into_response();
1170 }
1171 };
1172 match dlq.read_entries(0, 100_000) {
1174 Ok(entries) => entries,
1175 Err(e) => {
1176 return error_response(
1177 StatusCode::INTERNAL_SERVER_ERROR,
1178 "dlq_read_error",
1179 &format!("Failed to read DLQ: {e}"),
1180 )
1181 }
1182 }
1183 };
1184
1185 let mut replayed = 0usize;
1187 {
1188 let mut engine = pipeline.engine.lock().await;
1189 for entry in &entries {
1190 let event_type = entry
1192 .event
1193 .get("event_type")
1194 .and_then(|v| v.as_str())
1195 .unwrap_or("unknown");
1196 let mut event = Event::new(event_type);
1197 if let Some(data) = entry.event.get("data").and_then(|v| v.as_object()) {
1198 for (k, v) in data {
1199 let rv = json_to_runtime_value(v);
1200 event = event.with_field(k.as_str(), rv);
1201 }
1202 }
1203 if engine.process(event).await.is_ok() {
1204 replayed += 1;
1205 }
1206 }
1207 }
1208
1209 let resp = DlqReplayResponse { replayed };
1210 axum::Json(&resp).into_response()
1211}
1212
1213async fn handle_dlq_clear(
1214 State(state): State<ApiState>,
1215 Path(pipeline_id): Path<String>,
1216 ApiKey(api_key): ApiKey,
1217) -> Response {
1218 let manager = &state.manager;
1219 let mgr = manager.read().await;
1220
1221 let tenant_id = match mgr.get_tenant_by_api_key(&api_key) {
1222 Some(id) => id.clone(),
1223 None => {
1224 return error_response(
1225 StatusCode::UNAUTHORIZED,
1226 "invalid_api_key",
1227 "Invalid API key",
1228 )
1229 }
1230 };
1231
1232 let tenant = match mgr.get_tenant(&tenant_id) {
1233 Some(t) => t,
1234 None => {
1235 return error_response(
1236 StatusCode::NOT_FOUND,
1237 "tenant_not_found",
1238 "Tenant not found",
1239 )
1240 }
1241 };
1242
1243 let pipeline = match tenant.pipelines.get(&pipeline_id) {
1244 Some(p) => p,
1245 None => {
1246 return error_response(
1247 StatusCode::NOT_FOUND,
1248 "pipeline_not_found",
1249 "Pipeline not found",
1250 )
1251 }
1252 };
1253
1254 let engine = pipeline.engine.lock().await;
1255 match engine.dlq() {
1256 Some(dlq) => match dlq.clear() {
1257 Ok(()) => {
1258 let resp = DlqClearResponse { cleared: true };
1259 axum::Json(&resp).into_response()
1260 }
1261 Err(e) => error_response(
1262 StatusCode::INTERNAL_SERVER_ERROR,
1263 "dlq_clear_error",
1264 &format!("Failed to clear DLQ: {e}"),
1265 ),
1266 },
1267 None => {
1268 let resp = DlqClearResponse { cleared: true };
1269 axum::Json(&resp).into_response()
1270 }
1271 }
1272}
1273
1274#[allow(clippy::result_large_err)]
1281fn validate_admin_key(provided: &str, configured: &Option<String>) -> Result<(), Response> {
1282 match configured {
1283 None => Err(error_response(
1284 StatusCode::FORBIDDEN,
1285 "admin_disabled",
1286 "Admin API is disabled (no --api-key configured)",
1287 )),
1288 Some(key) => {
1289 if constant_time_compare(key, provided) {
1290 Ok(())
1291 } else {
1292 Err(error_response(
1293 StatusCode::UNAUTHORIZED,
1294 "invalid_admin_key",
1295 "Invalid admin key",
1296 ))
1297 }
1298 }
1299 }
1300}
1301
1302fn quota_from_tier(tier: Option<&str>) -> TenantQuota {
1303 match tier {
1304 Some("free") => TenantQuota::free(),
1305 Some("pro") => TenantQuota::pro(),
1306 Some("enterprise") => TenantQuota::enterprise(),
1307 _ => TenantQuota::default(),
1308 }
1309}
1310
1311async fn handle_create_tenant(
1312 State(state): State<ApiState>,
1313 AdminKey(admin_key): AdminKey,
1314 Json(body): Json<CreateTenantRequest>,
1315) -> Response {
1316 let manager = &state.manager;
1317 let configured_key = &state.admin_key;
1318 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1319 return resp;
1320 }
1321
1322 let api_key = uuid::Uuid::new_v4().to_string();
1323 let quota = quota_from_tier(body.quota_tier.as_deref());
1324
1325 let mut mgr = manager.write().await;
1326 match mgr.create_tenant(body.name.clone(), api_key.clone(), quota.clone()) {
1327 Ok(tenant_id) => {
1328 let resp = TenantResponse {
1329 id: tenant_id.as_str().to_string(),
1330 name: body.name,
1331 api_key,
1332 quota: QuotaInfo {
1333 max_pipelines: quota.max_pipelines,
1334 max_events_per_second: quota.max_events_per_second,
1335 max_streams_per_pipeline: quota.max_streams_per_pipeline,
1336 },
1337 };
1338 (StatusCode::CREATED, axum::Json(&resp)).into_response()
1339 }
1340 Err(e) => tenant_error_response(e),
1341 }
1342}
1343
1344async fn handle_list_tenants(
1345 State(state): State<ApiState>,
1346 AdminKey(admin_key): AdminKey,
1347 Query(pagination): Query<PaginationParams>,
1348) -> Response {
1349 let manager = &state.manager;
1350 let configured_key = &state.admin_key;
1351 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1352 return resp;
1353 }
1354
1355 if pagination.exceeds_max() {
1356 return error_response(
1357 StatusCode::BAD_REQUEST,
1358 "invalid_limit",
1359 &format!("limit must not exceed {MAX_LIMIT}"),
1360 );
1361 }
1362
1363 let mgr = manager.read().await;
1364 let all_tenants: Vec<TenantResponse> = mgr
1365 .list_tenants()
1366 .iter()
1367 .map(|t| TenantResponse {
1368 id: t.id.as_str().to_string(),
1369 name: t.name.clone(),
1370 api_key: t.api_key.clone(),
1371 quota: QuotaInfo {
1372 max_pipelines: t.quota.max_pipelines,
1373 max_events_per_second: t.quota.max_events_per_second,
1374 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1375 },
1376 })
1377 .collect();
1378 let (tenants, meta) = pagination.paginate(all_tenants);
1379 let total = meta.total;
1380 let resp = TenantListResponse {
1381 tenants,
1382 total,
1383 pagination: Some(meta),
1384 };
1385 axum::Json(&resp).into_response()
1386}
1387
1388async fn handle_get_tenant(
1389 State(state): State<ApiState>,
1390 Path(tenant_id_str): Path<String>,
1391 AdminKey(admin_key): AdminKey,
1392) -> Response {
1393 let manager = &state.manager;
1394 let configured_key = &state.admin_key;
1395 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1396 return resp;
1397 }
1398
1399 let mgr = manager.read().await;
1400 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1401 match mgr.get_tenant(&tenant_id) {
1402 Some(t) => {
1403 let resp = TenantDetailResponse {
1404 id: t.id.as_str().to_string(),
1405 name: t.name.clone(),
1406 api_key: t.api_key.clone(),
1407 quota: QuotaInfo {
1408 max_pipelines: t.quota.max_pipelines,
1409 max_events_per_second: t.quota.max_events_per_second,
1410 max_streams_per_pipeline: t.quota.max_streams_per_pipeline,
1411 },
1412 usage: TenantUsageInfo {
1413 events_processed: t.usage.events_processed,
1414 output_events_emitted: t.usage.output_events_emitted,
1415 active_pipelines: t.usage.active_pipelines,
1416 },
1417 pipeline_count: t.pipelines.len(),
1418 };
1419 axum::Json(&resp).into_response()
1420 }
1421 None => error_response(
1422 StatusCode::NOT_FOUND,
1423 "tenant_not_found",
1424 "Tenant not found",
1425 ),
1426 }
1427}
1428
1429async fn handle_delete_tenant(
1430 State(state): State<ApiState>,
1431 Path(tenant_id_str): Path<String>,
1432 AdminKey(admin_key): AdminKey,
1433) -> Response {
1434 let manager = &state.manager;
1435 let configured_key = &state.admin_key;
1436 if let Err(resp) = validate_admin_key(&admin_key, configured_key) {
1437 return resp;
1438 }
1439
1440 let mut mgr = manager.write().await;
1441 let tenant_id = varpulis_runtime::TenantId::new(&tenant_id_str);
1442 match mgr.remove_tenant(&tenant_id) {
1443 Ok(()) => axum::Json(serde_json::json!({"deleted": true})).into_response(),
1444 Err(e) => tenant_error_response(e),
1445 }
1446}
1447
1448fn error_response(status: StatusCode, code: &str, message: &str) -> Response {
1453 let body = ApiError {
1454 error: message.to_string(),
1455 code: code.to_string(),
1456 };
1457 (status, axum::Json(body)).into_response()
1458}
1459
1460fn tenant_error_response(err: TenantError) -> Response {
1461 if let TenantError::BackpressureExceeded { current, max } = &err {
1463 let body = serde_json::json!({
1464 "error": format!("queue depth {current} exceeds maximum {max}"),
1465 "code": "queue_depth_exceeded",
1466 "retry_after": 1,
1467 });
1468 return (
1469 StatusCode::TOO_MANY_REQUESTS,
1470 [("Retry-After", "1"), ("Content-Type", "application/json")],
1471 serde_json::to_string(&body).unwrap_or_default(),
1472 )
1473 .into_response();
1474 }
1475
1476 let (status, code) = match &err {
1477 TenantError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
1478 TenantError::PipelineNotFound(_) => (StatusCode::NOT_FOUND, "pipeline_not_found"),
1479 TenantError::QuotaExceeded(_) => (StatusCode::TOO_MANY_REQUESTS, "quota_exceeded"),
1480 TenantError::RateLimitExceeded => (StatusCode::TOO_MANY_REQUESTS, "rate_limited"),
1481 TenantError::BackpressureExceeded { .. } => unreachable!(),
1482 TenantError::ParseError(_) => (StatusCode::BAD_REQUEST, "parse_error"),
1483 TenantError::EngineError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "engine_error"),
1484 TenantError::AlreadyExists(_) => (StatusCode::CONFLICT, "already_exists"),
1485 };
1486 error_response(status, code, &err.to_string())
1487}
1488
1489fn json_to_runtime_value(v: &serde_json::Value) -> varpulis_core::Value {
1490 match v {
1491 serde_json::Value::Null => varpulis_core::Value::Null,
1492 serde_json::Value::Bool(b) => varpulis_core::Value::Bool(*b),
1493 serde_json::Value::Number(n) => {
1494 if let Some(i) = n.as_i64() {
1495 varpulis_core::Value::Int(i)
1496 } else if let Some(f) = n.as_f64() {
1497 varpulis_core::Value::Float(f)
1498 } else {
1499 varpulis_core::Value::Null
1500 }
1501 }
1502 serde_json::Value::String(s) => varpulis_core::Value::Str(s.clone().into()),
1503 serde_json::Value::Array(arr) => {
1504 varpulis_core::Value::array(arr.iter().map(json_to_runtime_value).collect())
1505 }
1506 serde_json::Value::Object(map) => {
1507 let mut m: IndexMap<std::sync::Arc<str>, varpulis_core::Value, FxBuildHasher> =
1508 IndexMap::with_hasher(FxBuildHasher);
1509 for (k, v) in map {
1510 m.insert(k.as_str().into(), json_to_runtime_value(v));
1511 }
1512 varpulis_core::Value::map(m)
1513 }
1514 }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519 use super::*;
1520 use axum::body::Body;
1521 use axum::http::Request;
1522 use std::sync::Arc;
1523 use tokio::sync::RwLock;
1524 use tower::ServiceExt;
1525 use varpulis_runtime::tenant::{TenantManager, TenantQuota};
1526
1527 struct TestResponse {
1529 status: StatusCode,
1530 body: bytes::Bytes,
1531 headers: axum::http::HeaderMap,
1532 }
1533
1534 impl TestResponse {
1535 fn status(&self) -> StatusCode {
1536 self.status
1537 }
1538 fn body(&self) -> &[u8] {
1539 &self.body
1540 }
1541 fn headers(&self) -> &axum::http::HeaderMap {
1542 &self.headers
1543 }
1544 }
1545
1546 struct TestRequestBuilder {
1548 method: String,
1549 path: String,
1550 headers: Vec<(String, String)>,
1551 body: Option<String>,
1552 }
1553
1554 impl TestRequestBuilder {
1555 fn new() -> Self {
1556 Self {
1557 method: "GET".to_string(),
1558 path: "/".to_string(),
1559 headers: Vec::new(),
1560 body: None,
1561 }
1562 }
1563 fn method(mut self, m: &str) -> Self {
1564 self.method = m.to_string();
1565 self
1566 }
1567 fn path(mut self, p: &str) -> Self {
1568 self.path = p.to_string();
1569 self
1570 }
1571 fn header(mut self, k: &str, v: &str) -> Self {
1572 self.headers.push((k.to_string(), v.to_string()));
1573 self
1574 }
1575 fn json<T: serde::Serialize>(mut self, body: &T) -> Self {
1576 self.body = Some(serde_json::to_string(body).unwrap());
1577 self.headers
1578 .push(("content-type".to_string(), "application/json".to_string()));
1579 self
1580 }
1581 async fn reply(self, app: &Router) -> TestResponse {
1582 let mut builder = Request::builder()
1583 .method(self.method.as_str())
1584 .uri(&self.path);
1585 for (k, v) in &self.headers {
1586 builder = builder.header(k.as_str(), v.as_str());
1587 }
1588 let body = match self.body {
1589 Some(b) => Body::from(b),
1590 None => Body::empty(),
1591 };
1592 let req = builder.body(body).unwrap();
1593 let resp = app.clone().oneshot(req).await.unwrap();
1594 let status = resp.status();
1595 let headers = resp.headers().clone();
1596 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1597 .await
1598 .unwrap();
1599 TestResponse {
1600 status,
1601 body,
1602 headers,
1603 }
1604 }
1605 }
1606
1607 fn test_request() -> TestRequestBuilder {
1609 TestRequestBuilder::new()
1610 }
1611
1612 async fn setup_test_manager() -> SharedTenantManager {
1613 let mut mgr = TenantManager::new();
1614 let id = mgr
1615 .create_tenant(
1616 "Test Corp".into(),
1617 "test-key-123".into(),
1618 TenantQuota::default(),
1619 )
1620 .unwrap();
1621
1622 let tenant = mgr.get_tenant_mut(&id).unwrap();
1624 tenant
1625 .deploy_pipeline(
1626 "Test Pipeline".into(),
1627 "stream A = SensorReading .where(x > 1)".into(),
1628 )
1629 .await
1630 .unwrap();
1631
1632 Arc::new(RwLock::new(mgr))
1633 }
1634
1635 #[tokio::test]
1636 async fn test_deploy_pipeline() {
1637 let mgr = setup_test_manager().await;
1638 let routes = api_routes(mgr, None, None, None);
1639
1640 let resp = test_request()
1641 .method("POST")
1642 .path("/api/v1/pipelines")
1643 .header("x-api-key", "test-key-123")
1644 .json(&DeployPipelineRequest {
1645 name: "New Pipeline".into(),
1646 source: "stream B = Events .where(y > 10)".into(),
1647 })
1648 .reply(&routes)
1649 .await;
1650
1651 assert_eq!(resp.status(), StatusCode::CREATED);
1652 let body: DeployPipelineResponse = serde_json::from_slice(resp.body()).unwrap();
1653 assert_eq!(body.name, "New Pipeline");
1654 assert_eq!(body.status, "running");
1655 }
1656
1657 #[tokio::test]
1658 async fn test_deploy_invalid_api_key() {
1659 let mgr = setup_test_manager().await;
1660 let routes = api_routes(mgr, None, None, None);
1661
1662 let resp = test_request()
1663 .method("POST")
1664 .path("/api/v1/pipelines")
1665 .header("x-api-key", "wrong-key")
1666 .json(&DeployPipelineRequest {
1667 name: "Bad".into(),
1668 source: "stream X = Y .where(z > 1)".into(),
1669 })
1670 .reply(&routes)
1671 .await;
1672
1673 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1674 }
1675
1676 #[tokio::test]
1677 async fn test_deploy_invalid_vpl() {
1678 let mgr = setup_test_manager().await;
1679 let routes = api_routes(mgr, None, None, None);
1680
1681 let resp = test_request()
1682 .method("POST")
1683 .path("/api/v1/pipelines")
1684 .header("x-api-key", "test-key-123")
1685 .json(&DeployPipelineRequest {
1686 name: "Bad VPL".into(),
1687 source: "this is not valid {{{".into(),
1688 })
1689 .reply(&routes)
1690 .await;
1691
1692 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1693 }
1694
1695 #[tokio::test]
1696 async fn test_list_pipelines() {
1697 let mgr = setup_test_manager().await;
1698 let routes = api_routes(mgr, None, None, None);
1699
1700 let resp = test_request()
1701 .method("GET")
1702 .path("/api/v1/pipelines")
1703 .header("x-api-key", "test-key-123")
1704 .reply(&routes)
1705 .await;
1706
1707 assert_eq!(resp.status(), StatusCode::OK);
1708 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
1709 assert_eq!(body.total, 1);
1710 assert_eq!(body.pipelines[0].name, "Test Pipeline");
1711 }
1712
1713 #[tokio::test]
1714 async fn test_usage_endpoint() {
1715 let mgr = setup_test_manager().await;
1716 let routes = api_routes(mgr, None, None, None);
1717
1718 let resp = test_request()
1719 .method("GET")
1720 .path("/api/v1/usage")
1721 .header("x-api-key", "test-key-123")
1722 .reply(&routes)
1723 .await;
1724
1725 assert_eq!(resp.status(), StatusCode::OK);
1726 let body: UsageResponse = serde_json::from_slice(resp.body()).unwrap();
1727 assert_eq!(body.active_pipelines, 1);
1728 }
1729
1730 #[tokio::test]
1731 async fn test_inject_event() {
1732 let mgr = setup_test_manager().await;
1733
1734 let pipeline_id = {
1736 let m = mgr.read().await;
1737 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
1738 let tenant = m.get_tenant(&tid).unwrap();
1739 tenant.pipelines.keys().next().unwrap().clone()
1740 };
1741
1742 let routes = api_routes(mgr, None, None, None);
1743
1744 let resp = test_request()
1745 .method("POST")
1746 .path(&format!("/api/v1/pipelines/{pipeline_id}/events"))
1747 .header("x-api-key", "test-key-123")
1748 .json(&InjectEventRequest {
1749 event_type: "SensorReading".into(),
1750 fields: {
1751 let mut m = serde_json::Map::new();
1752 m.insert(
1753 "x".into(),
1754 serde_json::Value::Number(serde_json::Number::from(42)),
1755 );
1756 m
1757 },
1758 })
1759 .reply(&routes)
1760 .await;
1761
1762 assert_eq!(resp.status(), StatusCode::OK);
1763 }
1764
1765 #[test]
1766 fn test_json_to_runtime_value() {
1767 assert_eq!(
1768 json_to_runtime_value(&serde_json::json!(null)),
1769 varpulis_core::Value::Null
1770 );
1771 assert_eq!(
1772 json_to_runtime_value(&serde_json::json!(true)),
1773 varpulis_core::Value::Bool(true)
1774 );
1775 assert_eq!(
1776 json_to_runtime_value(&serde_json::json!(42)),
1777 varpulis_core::Value::Int(42)
1778 );
1779 assert_eq!(
1780 json_to_runtime_value(&serde_json::json!(1.23)),
1781 varpulis_core::Value::Float(1.23)
1782 );
1783 assert_eq!(
1784 json_to_runtime_value(&serde_json::json!("hello")),
1785 varpulis_core::Value::Str("hello".into())
1786 );
1787 }
1788
1789 #[test]
1790 fn test_error_response_format() {
1791 let resp = error_response(StatusCode::BAD_REQUEST, "test_error", "Something failed");
1792 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1793 }
1794
1795 #[test]
1796 fn test_tenant_error_mapping() {
1797 let resp = tenant_error_response(TenantError::NotFound("t1".into()));
1798 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1799
1800 let resp = tenant_error_response(TenantError::RateLimitExceeded);
1801 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
1802
1803 let parse_err = varpulis_parser::parse("INVALID{{{").unwrap_err();
1804 let resp = tenant_error_response(TenantError::ParseError(parse_err));
1805 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1806 }
1807
1808 fn setup_admin_routes(admin_key: Option<&str>) -> (SharedTenantManager, Router) {
1813 let mgr = Arc::new(RwLock::new(TenantManager::new()));
1814 let key = admin_key.map(|k| k.to_string());
1815 let routes = api_routes(mgr.clone(), key, None, None);
1816 (mgr, routes)
1817 }
1818
1819 #[tokio::test]
1820 async fn test_create_tenant() {
1821 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1822
1823 let resp = test_request()
1824 .method("POST")
1825 .path("/api/v1/tenants")
1826 .header("x-admin-key", "admin-secret")
1827 .json(&CreateTenantRequest {
1828 name: "Acme Corp".into(),
1829 quota_tier: None,
1830 })
1831 .reply(&routes)
1832 .await;
1833
1834 assert_eq!(resp.status(), StatusCode::CREATED);
1835 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1836 assert_eq!(body.name, "Acme Corp");
1837 assert!(!body.api_key.is_empty());
1838 assert!(!body.id.is_empty());
1839 }
1840
1841 #[tokio::test]
1842 async fn test_list_tenants_admin() {
1843 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1844
1845 for name in &["Tenant A", "Tenant B"] {
1847 test_request()
1848 .method("POST")
1849 .path("/api/v1/tenants")
1850 .header("x-admin-key", "admin-secret")
1851 .json(&CreateTenantRequest {
1852 name: name.to_string(),
1853 quota_tier: None,
1854 })
1855 .reply(&routes)
1856 .await;
1857 }
1858
1859 let resp = test_request()
1860 .method("GET")
1861 .path("/api/v1/tenants")
1862 .header("x-admin-key", "admin-secret")
1863 .reply(&routes)
1864 .await;
1865
1866 assert_eq!(resp.status(), StatusCode::OK);
1867 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
1868 assert_eq!(body.total, 2);
1869 }
1870
1871 #[tokio::test]
1872 async fn test_get_tenant_admin() {
1873 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1874
1875 let create_resp = test_request()
1877 .method("POST")
1878 .path("/api/v1/tenants")
1879 .header("x-admin-key", "admin-secret")
1880 .json(&CreateTenantRequest {
1881 name: "Detail Corp".into(),
1882 quota_tier: Some("pro".into()),
1883 })
1884 .reply(&routes)
1885 .await;
1886
1887 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
1888
1889 let resp = test_request()
1890 .method("GET")
1891 .path(&format!("/api/v1/tenants/{}", created.id))
1892 .header("x-admin-key", "admin-secret")
1893 .reply(&routes)
1894 .await;
1895
1896 assert_eq!(resp.status(), StatusCode::OK);
1897 let body: TenantDetailResponse = serde_json::from_slice(resp.body()).unwrap();
1898 assert_eq!(body.name, "Detail Corp");
1899 assert_eq!(body.pipeline_count, 0);
1900 assert_eq!(body.quota.max_pipelines, 20);
1902 }
1903
1904 #[tokio::test]
1905 async fn test_delete_tenant_admin() {
1906 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1907
1908 let create_resp = test_request()
1910 .method("POST")
1911 .path("/api/v1/tenants")
1912 .header("x-admin-key", "admin-secret")
1913 .json(&CreateTenantRequest {
1914 name: "Doomed".into(),
1915 quota_tier: None,
1916 })
1917 .reply(&routes)
1918 .await;
1919 let created: TenantResponse = serde_json::from_slice(create_resp.body()).unwrap();
1920
1921 let resp = test_request()
1922 .method("DELETE")
1923 .path(&format!("/api/v1/tenants/{}", created.id))
1924 .header("x-admin-key", "admin-secret")
1925 .reply(&routes)
1926 .await;
1927
1928 assert_eq!(resp.status(), StatusCode::OK);
1929
1930 let list_resp = test_request()
1932 .method("GET")
1933 .path("/api/v1/tenants")
1934 .header("x-admin-key", "admin-secret")
1935 .reply(&routes)
1936 .await;
1937 let body: TenantListResponse = serde_json::from_slice(list_resp.body()).unwrap();
1938 assert_eq!(body.total, 0);
1939 }
1940
1941 #[tokio::test]
1942 async fn test_invalid_admin_key() {
1943 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1944
1945 let resp = test_request()
1946 .method("GET")
1947 .path("/api/v1/tenants")
1948 .header("x-admin-key", "wrong-key")
1949 .reply(&routes)
1950 .await;
1951
1952 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1953 }
1954
1955 #[tokio::test]
1956 async fn test_no_admin_key_configured() {
1957 let (_mgr, routes) = setup_admin_routes(None);
1958
1959 let resp = test_request()
1960 .method("GET")
1961 .path("/api/v1/tenants")
1962 .header("x-admin-key", "anything")
1963 .reply(&routes)
1964 .await;
1965
1966 assert_eq!(resp.status(), StatusCode::FORBIDDEN);
1967 }
1968
1969 #[tokio::test]
1970 async fn test_create_tenant_tier_selection() {
1971 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
1972
1973 let resp = test_request()
1975 .method("POST")
1976 .path("/api/v1/tenants")
1977 .header("x-admin-key", "admin-secret")
1978 .json(&CreateTenantRequest {
1979 name: "Free User".into(),
1980 quota_tier: Some("free".into()),
1981 })
1982 .reply(&routes)
1983 .await;
1984 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1985 assert_eq!(body.quota.max_pipelines, 2); let resp = test_request()
1989 .method("POST")
1990 .path("/api/v1/tenants")
1991 .header("x-admin-key", "admin-secret")
1992 .json(&CreateTenantRequest {
1993 name: "Enterprise User".into(),
1994 quota_tier: Some("enterprise".into()),
1995 })
1996 .reply(&routes)
1997 .await;
1998 let body: TenantResponse = serde_json::from_slice(resp.body()).unwrap();
1999 assert_eq!(body.quota.max_pipelines, 1000); }
2001
2002 async fn get_first_pipeline_id(mgr: &SharedTenantManager) -> String {
2008 let m = mgr.read().await;
2009 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2010 let tenant = m.get_tenant(&tid).unwrap();
2011 tenant.pipelines.keys().next().unwrap().clone()
2012 }
2013
2014 #[tokio::test]
2015 async fn test_get_single_pipeline() {
2016 let mgr = setup_test_manager().await;
2017 let pipeline_id = get_first_pipeline_id(&mgr).await;
2018 let routes = api_routes(mgr, None, None, None);
2019
2020 let resp = test_request()
2021 .method("GET")
2022 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2023 .header("x-api-key", "test-key-123")
2024 .reply(&routes)
2025 .await;
2026
2027 assert_eq!(resp.status(), StatusCode::OK);
2028 let body: PipelineInfo = serde_json::from_slice(resp.body()).unwrap();
2029 assert_eq!(body.id, pipeline_id);
2030 assert_eq!(body.name, "Test Pipeline");
2031 assert_eq!(body.status, "running");
2032 assert!(body.source.contains("SensorReading"));
2033 }
2034
2035 #[tokio::test]
2036 async fn test_get_pipeline_not_found() {
2037 let mgr = setup_test_manager().await;
2038 let routes = api_routes(mgr, None, None, None);
2039
2040 let resp = test_request()
2041 .method("GET")
2042 .path("/api/v1/pipelines/nonexistent-id")
2043 .header("x-api-key", "test-key-123")
2044 .reply(&routes)
2045 .await;
2046
2047 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2048 }
2049
2050 #[tokio::test]
2051 async fn test_delete_pipeline_api() {
2052 let mgr = setup_test_manager().await;
2053 let pipeline_id = get_first_pipeline_id(&mgr).await;
2054 let routes = api_routes(mgr.clone(), None, None, None);
2055
2056 let resp = test_request()
2057 .method("DELETE")
2058 .path(&format!("/api/v1/pipelines/{pipeline_id}"))
2059 .header("x-api-key", "test-key-123")
2060 .reply(&routes)
2061 .await;
2062
2063 assert_eq!(resp.status(), StatusCode::OK);
2064 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2065 assert_eq!(body["deleted"], true);
2066
2067 let list_resp = test_request()
2069 .method("GET")
2070 .path("/api/v1/pipelines")
2071 .header("x-api-key", "test-key-123")
2072 .reply(&routes)
2073 .await;
2074 let list: PipelineListResponse = serde_json::from_slice(list_resp.body()).unwrap();
2075 assert_eq!(list.total, 0);
2076 }
2077
2078 #[tokio::test]
2079 async fn test_delete_pipeline_not_found() {
2080 let mgr = setup_test_manager().await;
2081 let routes = api_routes(mgr, None, None, None);
2082
2083 let resp = test_request()
2084 .method("DELETE")
2085 .path("/api/v1/pipelines/nonexistent-id")
2086 .header("x-api-key", "test-key-123")
2087 .reply(&routes)
2088 .await;
2089
2090 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2091 }
2092
2093 #[tokio::test]
2098 async fn test_inject_batch() {
2099 let mgr = setup_test_manager().await;
2100 let pipeline_id = get_first_pipeline_id(&mgr).await;
2101 let routes = api_routes(mgr, None, None, None);
2102
2103 let resp = test_request()
2104 .method("POST")
2105 .path(&format!("/api/v1/pipelines/{pipeline_id}/events-batch"))
2106 .header("x-api-key", "test-key-123")
2107 .json(&InjectBatchRequest {
2108 events: vec![
2109 InjectEventRequest {
2110 event_type: "SensorReading".into(),
2111 fields: {
2112 let mut m = serde_json::Map::new();
2113 m.insert("x".into(), serde_json::json!(5));
2114 m
2115 },
2116 },
2117 InjectEventRequest {
2118 event_type: "SensorReading".into(),
2119 fields: {
2120 let mut m = serde_json::Map::new();
2121 m.insert("x".into(), serde_json::json!(10));
2122 m
2123 },
2124 },
2125 ],
2126 })
2127 .reply(&routes)
2128 .await;
2129
2130 assert_eq!(resp.status(), StatusCode::OK);
2131 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2132 assert_eq!(body.accepted, 2);
2133 assert!(body.processing_time_us > 0);
2134 }
2135
2136 #[tokio::test]
2137 async fn test_inject_batch_invalid_pipeline() {
2138 let mgr = setup_test_manager().await;
2139 let routes = api_routes(mgr, None, None, None);
2140
2141 let resp = test_request()
2143 .method("POST")
2144 .path("/api/v1/pipelines/nonexistent/events-batch")
2145 .header("x-api-key", "test-key-123")
2146 .json(&InjectBatchRequest {
2147 events: vec![InjectEventRequest {
2148 event_type: "Test".into(),
2149 fields: serde_json::Map::new(),
2150 }],
2151 })
2152 .reply(&routes)
2153 .await;
2154
2155 assert_eq!(resp.status(), StatusCode::OK);
2157 let body: InjectBatchResponse = serde_json::from_slice(resp.body()).unwrap();
2158 assert_eq!(body.accepted, 0);
2159 }
2160
2161 #[tokio::test]
2166 async fn test_checkpoint_pipeline() {
2167 let mgr = setup_test_manager().await;
2168 let pipeline_id = get_first_pipeline_id(&mgr).await;
2169 let routes = api_routes(mgr, None, None, None);
2170
2171 let resp = test_request()
2172 .method("POST")
2173 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2174 .header("x-api-key", "test-key-123")
2175 .reply(&routes)
2176 .await;
2177
2178 assert_eq!(resp.status(), StatusCode::OK);
2179 let body: CheckpointResponse = serde_json::from_slice(resp.body()).unwrap();
2180 assert_eq!(body.pipeline_id, pipeline_id);
2181 }
2182
2183 #[tokio::test]
2184 async fn test_checkpoint_not_found() {
2185 let mgr = setup_test_manager().await;
2186 let routes = api_routes(mgr, None, None, None);
2187
2188 let resp = test_request()
2189 .method("POST")
2190 .path("/api/v1/pipelines/nonexistent/checkpoint")
2191 .header("x-api-key", "test-key-123")
2192 .reply(&routes)
2193 .await;
2194
2195 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2196 }
2197
2198 #[tokio::test]
2199 async fn test_restore_pipeline() {
2200 let mgr = setup_test_manager().await;
2201 let pipeline_id = get_first_pipeline_id(&mgr).await;
2202 let routes = api_routes(mgr, None, None, None);
2203
2204 let cp_resp = test_request()
2206 .method("POST")
2207 .path(&format!("/api/v1/pipelines/{pipeline_id}/checkpoint"))
2208 .header("x-api-key", "test-key-123")
2209 .reply(&routes)
2210 .await;
2211 let cp: CheckpointResponse = serde_json::from_slice(cp_resp.body()).unwrap();
2212
2213 let resp = test_request()
2215 .method("POST")
2216 .path(&format!("/api/v1/pipelines/{pipeline_id}/restore"))
2217 .header("x-api-key", "test-key-123")
2218 .json(&RestoreRequest {
2219 checkpoint: cp.checkpoint,
2220 })
2221 .reply(&routes)
2222 .await;
2223
2224 assert_eq!(resp.status(), StatusCode::OK);
2225 let body: RestoreResponse = serde_json::from_slice(resp.body()).unwrap();
2226 assert_eq!(body.pipeline_id, pipeline_id);
2227 assert!(body.restored);
2228 }
2229
2230 #[tokio::test]
2231 async fn test_restore_not_found() {
2232 let mgr = setup_test_manager().await;
2233 let routes = api_routes(mgr, None, None, None);
2234
2235 let checkpoint = varpulis_runtime::persistence::EngineCheckpoint {
2236 version: varpulis_runtime::persistence::CHECKPOINT_VERSION,
2237 window_states: std::collections::HashMap::new(),
2238 sase_states: std::collections::HashMap::new(),
2239 join_states: std::collections::HashMap::new(),
2240 variables: std::collections::HashMap::new(),
2241 events_processed: 0,
2242 output_events_emitted: 0,
2243 watermark_state: None,
2244 distinct_states: std::collections::HashMap::new(),
2245 limit_states: std::collections::HashMap::new(),
2246 };
2247
2248 let resp = test_request()
2249 .method("POST")
2250 .path("/api/v1/pipelines/nonexistent/restore")
2251 .header("x-api-key", "test-key-123")
2252 .json(&RestoreRequest { checkpoint })
2253 .reply(&routes)
2254 .await;
2255
2256 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2257 }
2258
2259 #[tokio::test]
2264 async fn test_metrics_endpoint() {
2265 let mgr = setup_test_manager().await;
2266 let pipeline_id = get_first_pipeline_id(&mgr).await;
2267 let routes = api_routes(mgr, None, None, None);
2268
2269 let resp = test_request()
2270 .method("GET")
2271 .path(&format!("/api/v1/pipelines/{pipeline_id}/metrics"))
2272 .header("x-api-key", "test-key-123")
2273 .reply(&routes)
2274 .await;
2275
2276 assert_eq!(resp.status(), StatusCode::OK);
2277 let body: PipelineMetricsResponse = serde_json::from_slice(resp.body()).unwrap();
2278 assert_eq!(body.pipeline_id, pipeline_id);
2279 }
2280
2281 #[tokio::test]
2282 async fn test_metrics_not_found() {
2283 let mgr = setup_test_manager().await;
2284 let routes = api_routes(mgr, None, None, None);
2285
2286 let resp = test_request()
2287 .method("GET")
2288 .path("/api/v1/pipelines/nonexistent/metrics")
2289 .header("x-api-key", "test-key-123")
2290 .reply(&routes)
2291 .await;
2292
2293 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2294 }
2295
2296 #[tokio::test]
2301 async fn test_reload_pipeline() {
2302 let mgr = setup_test_manager().await;
2303 let pipeline_id = get_first_pipeline_id(&mgr).await;
2304 let routes = api_routes(mgr, None, None, None);
2305
2306 let resp = test_request()
2307 .method("POST")
2308 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2309 .header("x-api-key", "test-key-123")
2310 .json(&ReloadPipelineRequest {
2311 source: "stream B = Events .where(y > 10)".into(),
2312 })
2313 .reply(&routes)
2314 .await;
2315
2316 assert_eq!(resp.status(), StatusCode::OK);
2317 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2318 assert_eq!(body["reloaded"], true);
2319 }
2320
2321 #[tokio::test]
2322 async fn test_reload_invalid_vpl() {
2323 let mgr = setup_test_manager().await;
2324 let pipeline_id = get_first_pipeline_id(&mgr).await;
2325 let routes = api_routes(mgr, None, None, None);
2326
2327 let resp = test_request()
2328 .method("POST")
2329 .path(&format!("/api/v1/pipelines/{pipeline_id}/reload"))
2330 .header("x-api-key", "test-key-123")
2331 .json(&ReloadPipelineRequest {
2332 source: "not valid {{{".into(),
2333 })
2334 .reply(&routes)
2335 .await;
2336
2337 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2338 }
2339
2340 #[tokio::test]
2341 async fn test_reload_not_found() {
2342 let mgr = setup_test_manager().await;
2343 let routes = api_routes(mgr, None, None, None);
2344
2345 let resp = test_request()
2346 .method("POST")
2347 .path("/api/v1/pipelines/nonexistent/reload")
2348 .header("x-api-key", "test-key-123")
2349 .json(&ReloadPipelineRequest {
2350 source: "stream B = Events .where(y > 10)".into(),
2351 })
2352 .reply(&routes)
2353 .await;
2354
2355 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2356 }
2357
2358 #[tokio::test]
2363 async fn test_logs_invalid_pipeline() {
2364 let mgr = setup_test_manager().await;
2365 let routes = api_routes(mgr, None, None, None);
2366
2367 let resp = test_request()
2368 .method("GET")
2369 .path("/api/v1/pipelines/nonexistent/logs")
2370 .header("x-api-key", "test-key-123")
2371 .reply(&routes)
2372 .await;
2373
2374 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2375 }
2376
2377 #[tokio::test]
2378 async fn test_logs_invalid_api_key() {
2379 let mgr = setup_test_manager().await;
2380 let pipeline_id = get_first_pipeline_id(&mgr).await;
2381 let routes = api_routes(mgr, None, None, None);
2382
2383 let resp = test_request()
2384 .method("GET")
2385 .path(&format!("/api/v1/pipelines/{pipeline_id}/logs"))
2386 .header("x-api-key", "wrong-key")
2387 .reply(&routes)
2388 .await;
2389
2390 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2391 }
2392
2393 #[test]
2398 fn test_json_to_runtime_value_array() {
2399 let arr = serde_json::json!([1, "hello", true]);
2400 let val = json_to_runtime_value(&arr);
2401 match val {
2402 varpulis_core::Value::Array(a) => {
2403 assert_eq!(a.len(), 3);
2404 assert_eq!(a[0], varpulis_core::Value::Int(1));
2405 assert_eq!(a[1], varpulis_core::Value::Str("hello".into()));
2406 assert_eq!(a[2], varpulis_core::Value::Bool(true));
2407 }
2408 _ => panic!("Expected Array"),
2409 }
2410 }
2411
2412 #[test]
2413 fn test_json_to_runtime_value_object() {
2414 let obj = serde_json::json!({"key": "val", "num": 42});
2415 let val = json_to_runtime_value(&obj);
2416 match val {
2417 varpulis_core::Value::Map(m) => {
2418 assert_eq!(m.len(), 2);
2419 }
2420 _ => panic!("Expected Map"),
2421 }
2422 }
2423
2424 #[test]
2425 fn test_json_from_value_roundtrip() {
2426 use varpulis_core::Value;
2427 assert_eq!(json_from_value(&Value::Null), serde_json::json!(null));
2428 assert_eq!(json_from_value(&Value::Bool(true)), serde_json::json!(true));
2429 assert_eq!(json_from_value(&Value::Int(42)), serde_json::json!(42));
2430 assert_eq!(
2431 json_from_value(&Value::Float(2.71)),
2432 serde_json::json!(2.71)
2433 );
2434 assert_eq!(
2435 json_from_value(&Value::Str("hi".into())),
2436 serde_json::json!("hi")
2437 );
2438 assert_eq!(
2439 json_from_value(&Value::Timestamp(1000000)),
2440 serde_json::json!(1000000)
2441 );
2442 assert_eq!(
2443 json_from_value(&Value::Duration(5000)),
2444 serde_json::json!(5000)
2445 );
2446 }
2447
2448 #[test]
2453 fn test_tenant_error_all_variants() {
2454 let resp = tenant_error_response(TenantError::PipelineNotFound("p1".into()));
2455 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2456
2457 let resp = tenant_error_response(TenantError::QuotaExceeded("max pipelines".into()));
2458 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2459
2460 let resp = tenant_error_response(TenantError::EngineError(
2461 varpulis_runtime::EngineError::Pipeline("boom".into()),
2462 ));
2463 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2464
2465 let resp = tenant_error_response(TenantError::AlreadyExists("t1".into()));
2466 assert_eq!(resp.status(), StatusCode::CONFLICT);
2467
2468 let resp = tenant_error_response(TenantError::BackpressureExceeded {
2469 current: 50000,
2470 max: 50000,
2471 });
2472 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2473 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2474 }
2475
2476 #[tokio::test]
2481 async fn test_list_pipelines_default_pagination() {
2482 let mgr = setup_test_manager().await;
2483 let routes = api_routes(mgr, None, None, None);
2484
2485 let resp = test_request()
2486 .method("GET")
2487 .path("/api/v1/pipelines")
2488 .header("x-api-key", "test-key-123")
2489 .reply(&routes)
2490 .await;
2491
2492 assert_eq!(resp.status(), StatusCode::OK);
2493 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2494 assert_eq!(body.total, 1);
2495 let pagination = body.pagination.unwrap();
2496 assert_eq!(pagination.total, 1);
2497 assert_eq!(pagination.offset, 0);
2498 assert_eq!(pagination.limit, 50);
2499 assert!(!pagination.has_more);
2500 }
2501
2502 #[tokio::test]
2503 async fn test_list_pipelines_with_pagination_params() {
2504 let mgr = setup_test_manager().await;
2505
2506 {
2508 let mut m = mgr.write().await;
2509 let tid = m.get_tenant_by_api_key("test-key-123").unwrap().clone();
2510 let tenant = m.get_tenant_mut(&tid).unwrap();
2511 tenant
2512 .deploy_pipeline(
2513 "Pipeline B".into(),
2514 "stream B = Events .where(y > 2)".into(),
2515 )
2516 .await
2517 .unwrap();
2518 tenant
2519 .deploy_pipeline(
2520 "Pipeline C".into(),
2521 "stream C = Events .where(z > 3)".into(),
2522 )
2523 .await
2524 .unwrap();
2525 }
2526
2527 let routes = api_routes(mgr, None, None, None);
2528
2529 let resp = test_request()
2531 .method("GET")
2532 .path("/api/v1/pipelines?limit=1&offset=0")
2533 .header("x-api-key", "test-key-123")
2534 .reply(&routes)
2535 .await;
2536
2537 assert_eq!(resp.status(), StatusCode::OK);
2538 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2539 assert_eq!(body.pipelines.len(), 1);
2540 assert_eq!(body.total, 3);
2541 let pagination = body.pagination.unwrap();
2542 assert!(pagination.has_more);
2543 assert_eq!(pagination.limit, 1);
2544
2545 let resp = test_request()
2547 .method("GET")
2548 .path("/api/v1/pipelines?limit=1&offset=2")
2549 .header("x-api-key", "test-key-123")
2550 .reply(&routes)
2551 .await;
2552
2553 let body: PipelineListResponse = serde_json::from_slice(resp.body()).unwrap();
2554 assert_eq!(body.pipelines.len(), 1);
2555 assert_eq!(body.total, 3);
2556 assert!(!body.pagination.unwrap().has_more);
2557 }
2558
2559 #[tokio::test]
2560 async fn test_list_pipelines_limit_exceeds_max() {
2561 let mgr = setup_test_manager().await;
2562 let routes = api_routes(mgr, None, None, None);
2563
2564 let resp = test_request()
2565 .method("GET")
2566 .path("/api/v1/pipelines?limit=1001")
2567 .header("x-api-key", "test-key-123")
2568 .reply(&routes)
2569 .await;
2570
2571 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2572 }
2573
2574 #[tokio::test]
2575 async fn test_list_tenants_with_pagination() {
2576 let (_mgr, routes) = setup_admin_routes(Some("admin-secret"));
2577
2578 for name in &["T1", "T2", "T3"] {
2580 test_request()
2581 .method("POST")
2582 .path("/api/v1/tenants")
2583 .header("x-admin-key", "admin-secret")
2584 .json(&CreateTenantRequest {
2585 name: name.to_string(),
2586 quota_tier: None,
2587 })
2588 .reply(&routes)
2589 .await;
2590 }
2591
2592 let resp = test_request()
2594 .method("GET")
2595 .path("/api/v1/tenants?limit=2&offset=0")
2596 .header("x-admin-key", "admin-secret")
2597 .reply(&routes)
2598 .await;
2599
2600 assert_eq!(resp.status(), StatusCode::OK);
2601 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2602 assert_eq!(body.tenants.len(), 2);
2603 assert_eq!(body.total, 3);
2604 assert!(body.pagination.unwrap().has_more);
2605
2606 let resp = test_request()
2608 .method("GET")
2609 .path("/api/v1/tenants?limit=2&offset=2")
2610 .header("x-admin-key", "admin-secret")
2611 .reply(&routes)
2612 .await;
2613
2614 let body: TenantListResponse = serde_json::from_slice(resp.body()).unwrap();
2615 assert_eq!(body.tenants.len(), 1);
2616 assert!(!body.pagination.unwrap().has_more);
2617 }
2618
2619 #[tokio::test]
2620 async fn test_inject_backpressure_429() {
2621 use std::sync::atomic::Ordering;
2622
2623 let mut mgr = TenantManager::new();
2624 mgr.set_max_queue_depth(5);
2625 let id = mgr
2626 .create_tenant(
2627 "BP Corp".into(),
2628 "bp-key-123".into(),
2629 TenantQuota::default(),
2630 )
2631 .unwrap();
2632
2633 let tenant = mgr.get_tenant_mut(&id).unwrap();
2634 let pid = tenant
2635 .deploy_pipeline(
2636 "BP Pipeline".into(),
2637 "stream A = SensorReading .where(x > 1)".into(),
2638 )
2639 .await
2640 .unwrap();
2641
2642 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2644
2645 let shared = Arc::new(RwLock::new(mgr));
2646 let routes = api_routes(shared, None, None, None);
2647
2648 let resp = test_request()
2649 .method("POST")
2650 .path(&format!("/api/v1/pipelines/{pid}/events"))
2651 .header("x-api-key", "bp-key-123")
2652 .json(&InjectEventRequest {
2653 event_type: "SensorReading".into(),
2654 fields: serde_json::Map::new(),
2655 })
2656 .reply(&routes)
2657 .await;
2658
2659 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2660 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2662 let body: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
2664 assert_eq!(body["code"], "queue_depth_exceeded");
2665 }
2666
2667 #[tokio::test]
2668 async fn test_inject_batch_backpressure_429() {
2669 use std::sync::atomic::Ordering;
2670
2671 let mut mgr = TenantManager::new();
2672 mgr.set_max_queue_depth(5);
2673 let id = mgr
2674 .create_tenant(
2675 "BP Batch Corp".into(),
2676 "bp-batch-key".into(),
2677 TenantQuota::default(),
2678 )
2679 .unwrap();
2680
2681 let tenant = mgr.get_tenant_mut(&id).unwrap();
2682 let pid = tenant
2683 .deploy_pipeline(
2684 "BP Batch Pipeline".into(),
2685 "stream A = SensorReading .where(x > 1)".into(),
2686 )
2687 .await
2688 .unwrap();
2689
2690 mgr.pending_events_counter().store(5, Ordering::Relaxed);
2692
2693 let shared = Arc::new(RwLock::new(mgr));
2694 let routes = api_routes(shared, None, None, None);
2695
2696 let resp = test_request()
2697 .method("POST")
2698 .path(&format!("/api/v1/pipelines/{pid}/events-batch"))
2699 .header("x-api-key", "bp-batch-key")
2700 .json(&InjectBatchRequest {
2701 events: vec![InjectEventRequest {
2702 event_type: "SensorReading".into(),
2703 fields: serde_json::Map::new(),
2704 }],
2705 })
2706 .reply(&routes)
2707 .await;
2708
2709 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
2710 assert_eq!(resp.headers().get("Retry-After").unwrap(), "1");
2711 }
2712}