1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::{mpsc, RwLock};
12use tracing::{error, info, warn};
13use uuid::Uuid;
14
15use crate::context::ContextOrchestrator;
16use crate::engine::Engine;
17use crate::event::Event;
18use crate::metrics::Metrics;
19use crate::persistence::{StateStore, StoreError};
20
21#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
23pub struct TenantId(pub String);
24
25impl TenantId {
26 pub fn new(id: impl Into<String>) -> Self {
27 Self(id.into())
28 }
29
30 pub fn generate() -> Self {
31 Self(Uuid::new_v4().to_string())
32 }
33
34 pub fn as_str(&self) -> &str {
35 &self.0
36 }
37}
38
39impl std::fmt::Display for TenantId {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 write!(f, "{}", self.0)
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct TenantQuota {
48 pub max_pipelines: usize,
50 pub max_events_per_second: u64,
52 pub max_streams_per_pipeline: usize,
54}
55
56impl Default for TenantQuota {
57 fn default() -> Self {
58 Self {
59 max_pipelines: 10,
60 max_events_per_second: 10_000,
61 max_streams_per_pipeline: 50,
62 }
63 }
64}
65
66impl TenantQuota {
67 pub const fn free() -> Self {
69 Self {
70 max_pipelines: 5,
71 max_events_per_second: 500,
72 max_streams_per_pipeline: 10,
73 }
74 }
75
76 pub const fn pro() -> Self {
78 Self {
79 max_pipelines: 20,
80 max_events_per_second: 50_000,
81 max_streams_per_pipeline: 100,
82 }
83 }
84
85 pub const fn business() -> Self {
87 Self {
88 max_pipelines: 100,
89 max_events_per_second: 200_000,
90 max_streams_per_pipeline: 200,
91 }
92 }
93
94 pub const fn enterprise() -> Self {
96 Self {
97 max_pipelines: 1000,
98 max_events_per_second: 500_000,
99 max_streams_per_pipeline: 500,
100 }
101 }
102
103 pub fn for_tier(tier: &str) -> Self {
105 match tier {
106 "pro" => Self::pro(),
107 "business" => Self::business(),
108 "enterprise" => Self::enterprise(),
109 _ => Self::free(),
110 }
111 }
112}
113
114#[derive(Debug, Clone, Default)]
116pub struct TenantUsage {
117 pub events_processed: u64,
119 pub events_in_window: u64,
121 pub window_start: Option<Instant>,
123 pub active_pipelines: usize,
125 pub output_events_emitted: u64,
127}
128
129impl TenantUsage {
130 pub fn record_event(&mut self, max_eps: u64) -> bool {
132 self.events_processed += 1;
133
134 let now = Instant::now();
135 match self.window_start {
136 Some(start) if now.duration_since(start).as_secs() < 1 => {
137 self.events_in_window += 1;
138 if max_eps > 0 && self.events_in_window > max_eps {
139 return false;
140 }
141 }
142 _ => {
143 self.window_start = Some(now);
144 self.events_in_window = 1;
145 }
146 }
147 true
148 }
149
150 pub const fn record_output_event(&mut self) {
151 self.output_events_emitted += 1;
152 }
153}
154
155#[derive(Debug)]
157pub struct Pipeline {
158 pub id: String,
160 pub name: String,
162 pub source: String,
164 pub engine: Arc<tokio::sync::Mutex<Engine>>,
166 pub output_rx: mpsc::Receiver<Event>,
168 pub log_broadcast: tokio::sync::broadcast::Sender<Event>,
171 pub created_at: Instant,
173 pub status: PipelineStatus,
175 pub orchestrator: Option<ContextOrchestrator>,
177 pub connector_registry: Option<crate::connector::ManagedConnectorRegistry>,
179 pub global_template_id: Option<String>,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub enum PipelineStatus {
186 Running,
187 Stopped,
188 Error(String),
189}
190
191impl std::fmt::Display for PipelineStatus {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 match self {
194 Self::Running => write!(f, "running"),
195 Self::Stopped => write!(f, "stopped"),
196 Self::Error(msg) => write!(f, "error: {msg}"),
197 }
198 }
199}
200
201#[derive(Debug, thiserror::Error)]
203pub enum TenantError {
204 #[error("tenant not found: {0}")]
206 NotFound(String),
207 #[error("pipeline not found: {0}")]
209 PipelineNotFound(String),
210 #[error("quota exceeded: {0}")]
212 QuotaExceeded(String),
213 #[error("rate limit exceeded")]
215 RateLimitExceeded,
216 #[error("queue depth {current} exceeds maximum {max}")]
218 BackpressureExceeded {
219 current: u64,
221 max: u64,
223 },
224 #[error("parse error: {0}")]
226 ParseError(#[from] varpulis_parser::ParseError),
227 #[error("engine error: {0}")]
229 EngineError(#[from] crate::engine::error::EngineError),
230 #[error("tenant already exists: {0}")]
232 AlreadyExists(String),
233}
234
235pub fn hash_api_key(raw: &str) -> String {
237 use sha2::Digest;
238 hex::encode(sha2::Sha256::digest(raw.as_bytes()))
239}
240
241#[derive(Debug)]
243pub struct Tenant {
244 pub id: TenantId,
246 pub name: String,
248 pub api_key_hash: String,
250 pub quota: TenantQuota,
252 pub usage: TenantUsage,
254 pub pipelines: HashMap<String, Pipeline>,
256 pub created_at: Instant,
258 pub(crate) ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
260 pub topic_prefix: Option<String>,
262}
263
264impl Tenant {
265 pub fn new(id: TenantId, name: String, api_key: String, quota: TenantQuota) -> Self {
266 Self {
267 id,
268 name,
269 api_key_hash: hash_api_key(&api_key),
270 quota,
271 usage: TenantUsage::default(),
272 pipelines: HashMap::new(),
273 created_at: Instant::now(),
274 ws_broadcast: None,
275 topic_prefix: None,
276 }
277 }
278
279 pub fn new_with_hash(
281 id: TenantId,
282 name: String,
283 api_key_hash: String,
284 quota: TenantQuota,
285 ) -> Self {
286 Self {
287 id,
288 name,
289 api_key_hash,
290 quota,
291 usage: TenantUsage::default(),
292 pipelines: HashMap::new(),
293 created_at: Instant::now(),
294 ws_broadcast: None,
295 topic_prefix: None,
296 }
297 }
298
299 pub async fn deploy_pipeline(
301 &mut self,
302 name: String,
303 source: String,
304 ) -> Result<String, TenantError> {
305 self.deploy_pipeline_with_metrics(name, source, None).await
306 }
307
308 pub async fn deploy_pipeline_with_metrics(
309 &mut self,
310 name: String,
311 source: String,
312 prometheus_metrics: Option<Metrics>,
313 ) -> Result<String, TenantError> {
314 if self.pipelines.len() >= self.quota.max_pipelines {
316 return Err(TenantError::QuotaExceeded(format!(
317 "max pipelines ({}) reached",
318 self.quota.max_pipelines
319 )));
320 }
321
322 let program = varpulis_parser::parse(&source)?;
324
325 let stream_count = program
327 .statements
328 .iter()
329 .filter(|s| matches!(&s.node, varpulis_core::ast::Stmt::StreamDecl { .. }))
330 .count();
331 if stream_count > self.quota.max_streams_per_pipeline {
332 return Err(TenantError::QuotaExceeded(format!(
333 "pipeline has {} streams, max is {}",
334 stream_count, self.quota.max_streams_per_pipeline
335 )));
336 }
337
338 let (output_tx, output_rx) = mpsc::channel(1000);
340 let output_tx_for_ctx = output_tx.clone();
341 let mut engine = Engine::new(output_tx);
342 if let Some(m) = prometheus_metrics {
343 engine = engine.with_metrics(m);
344 }
345 if let Some(ref prefix) = self.topic_prefix {
346 engine.set_topic_prefix(prefix);
347 }
348 engine.load(&program)?;
349
350 let orchestrator = if engine.has_contexts() {
352 match ContextOrchestrator::build(
354 engine.context_map(),
355 &program,
356 output_tx_for_ctx,
357 1000,
358 ) {
359 Ok(orch) => Some(orch),
360 Err(e) => {
361 return Err(crate::engine::error::EngineError::Pipeline(format!(
362 "Failed to build context orchestrator: {e}"
363 ))
364 .into());
365 }
366 }
367 } else {
368 engine.connect_sinks().await?;
370 None
371 };
372
373 let bindings = engine.source_bindings().to_vec();
375 let mut connector_registry = None;
376
377 if !bindings.is_empty() {
378 let (event_tx, event_rx) = mpsc::channel::<Event>(10_000);
379
380 let mut registry = crate::connector::ManagedConnectorRegistry::from_configs(
381 engine.connector_configs(),
382 )
383 .map_err(|e| {
384 TenantError::EngineError(crate::engine::error::EngineError::Pipeline(format!(
385 "Registry build error: {e}"
386 )))
387 })?;
388
389 for binding in &bindings {
390 let config = engine.get_connector(&binding.connector_name).cloned();
391 let Some(config) = config else {
392 return Err(crate::engine::error::EngineError::Pipeline(format!(
393 "Connector '{}' referenced in .from() but not declared",
394 binding.connector_name
395 ))
396 .into());
397 };
398
399 let topic = binding
400 .topic_override
401 .as_deref()
402 .or(config.topic.as_deref())
403 .unwrap_or("varpulis/events/#");
404
405 info!(
406 "Starting {} source: {} topic={}",
407 config.connector_type, binding.connector_name, topic
408 );
409
410 registry
411 .start_source(
412 &binding.connector_name,
413 topic,
414 event_tx.clone(),
415 &binding.extra_params,
416 )
417 .await
418 .map_err(|e| -> TenantError {
419 crate::engine::error::EngineError::Pipeline(format!(
420 "Source start error: {e}"
421 ))
422 .into()
423 })?;
424
425 let sink_keys = engine.sink_keys_for_connector(&binding.connector_name);
427 for sink_key in &sink_keys {
428 let sink_topic = if let Some(t) =
429 sink_key.strip_prefix(&format!("{}::", binding.connector_name))
430 {
431 t.to_string()
432 } else {
433 config
434 .topic
435 .clone()
436 .unwrap_or_else(|| format!("{}-output", binding.connector_name))
437 };
438
439 let empty_params = std::collections::HashMap::new();
440 match registry.create_sink(&binding.connector_name, &sink_topic, &empty_params)
441 {
442 Ok(sink) => {
443 engine.inject_sink(sink_key, sink);
444 }
445 Err(e) => {
446 warn!("Failed to create sink for {}: {}", sink_key, e);
447 }
448 }
449 }
450 }
451
452 connector_registry = Some(registry);
453
454 let engine = Arc::new(tokio::sync::Mutex::new(engine));
456 let engine_for_source = Arc::clone(&engine);
457
458 tokio::spawn(async move {
460 let mut event_rx = event_rx;
461 while let Some(event) = event_rx.recv().await {
462 let mut eng = engine_for_source.lock().await;
463 if let Err(e) = eng.process(event).await {
464 warn!("Source event processing error: {}", e);
465 }
466 }
467 info!("Source ingestion loop ended");
468 });
469
470 let id = Uuid::new_v4().to_string();
471 let (log_tx, _) = tokio::sync::broadcast::channel(256);
472
473 let log_tx_for_drain = log_tx.clone();
478 let ws_tx_for_drain = self.ws_broadcast.clone();
479 tokio::spawn(async move {
480 let mut output_rx = output_rx;
481 while let Some(ev) = output_rx.recv().await {
482 let _ = log_tx_for_drain.send(ev.clone());
483 if let Some(ref ws_tx) = ws_tx_for_drain {
485 let msg = serde_json::json!({
486 "type": "output_event",
487 "event_type": ev.event_type.to_string(),
488 "data": ev.data,
489 "timestamp": ev.timestamp.to_rfc3339(),
490 });
491 if let Ok(json) = serde_json::to_string(&msg) {
492 if ws_tx.send(json).is_err() {
493 tracing::debug!("No WS subscribers for drain output event");
494 }
495 }
496 }
497 }
498 });
499
500 let (_drain_tx, placeholder_rx) = mpsc::channel(1);
503
504 let pipeline = Pipeline {
505 id: id.clone(),
506 name,
507 source,
508 engine,
509 output_rx: placeholder_rx,
510 log_broadcast: log_tx,
511 created_at: Instant::now(),
512 status: PipelineStatus::Running,
513 orchestrator,
514 connector_registry,
515 global_template_id: None,
516 };
517
518 self.pipelines.insert(id.clone(), pipeline);
519 self.usage.active_pipelines = self.pipelines.len();
520
521 return Ok(id);
522 }
523
524 let engine = Arc::new(tokio::sync::Mutex::new(engine));
526 let id = Uuid::new_v4().to_string();
527 let (log_tx, _) = tokio::sync::broadcast::channel(256);
528 let pipeline = Pipeline {
529 id: id.clone(),
530 name,
531 source,
532 engine,
533 output_rx,
534 log_broadcast: log_tx,
535 created_at: Instant::now(),
536 status: PipelineStatus::Running,
537 orchestrator,
538 connector_registry,
539 global_template_id: None,
540 };
541
542 self.pipelines.insert(id.clone(), pipeline);
543 self.usage.active_pipelines = self.pipelines.len();
544
545 Ok(id)
546 }
547
548 pub fn remove_pipeline(&mut self, pipeline_id: &str) -> Result<(), TenantError> {
550 self.pipelines
551 .remove(pipeline_id)
552 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
553 self.usage.active_pipelines = self.pipelines.len();
554 Ok(())
555 }
556
557 pub async fn process_event(
559 &mut self,
560 pipeline_id: &str,
561 event: Event,
562 ) -> Result<Vec<Event>, TenantError> {
563 if !self.usage.record_event(self.quota.max_events_per_second) {
565 return Err(TenantError::RateLimitExceeded);
566 }
567
568 let pipeline = self
569 .pipelines
570 .get_mut(pipeline_id)
571 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
572
573 if pipeline.status != PipelineStatus::Running {
574 return Err(crate::engine::error::EngineError::Pipeline(format!(
575 "pipeline is {}",
576 pipeline.status
577 ))
578 .into());
579 }
580
581 if let Some(ref orchestrator) = pipeline.orchestrator {
582 let shared_event = std::sync::Arc::new(event);
584 match orchestrator.try_process(shared_event) {
585 Ok(()) => {}
586 Err(crate::context::DispatchError::ChannelFull(msg)) => {
587 if let crate::context::ContextMessage::Event(event) = msg {
589 orchestrator
590 .process(event)
591 .await
592 .map_err(|e| -> TenantError {
593 crate::engine::error::EngineError::Pipeline(e).into()
594 })?;
595 }
596 }
597 Err(crate::context::DispatchError::ChannelClosed(_)) => {
598 return Err(crate::engine::error::EngineError::Pipeline(
599 "Context channel closed".to_string(),
600 )
601 .into());
602 }
603 }
604 } else {
605 pipeline.engine.lock().await.process(event).await?;
607 }
608
609 let mut output_events = Vec::new();
611 while let Ok(ev) = pipeline.output_rx.try_recv() {
612 let _ = pipeline.log_broadcast.send(ev.clone());
614 output_events.push(ev);
615 }
616
617 Ok(output_events)
618 }
619
620 pub fn subscribe_pipeline_logs(
623 &self,
624 pipeline_id: &str,
625 ) -> Result<tokio::sync::broadcast::Receiver<Event>, TenantError> {
626 let pipeline = self
627 .pipelines
628 .get(pipeline_id)
629 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
630 Ok(pipeline.log_broadcast.subscribe())
631 }
632
633 pub async fn checkpoint_pipeline(
635 &self,
636 pipeline_id: &str,
637 ) -> Result<crate::persistence::EngineCheckpoint, TenantError> {
638 let pipeline = self
639 .pipelines
640 .get(pipeline_id)
641 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
642
643 let engine = pipeline.engine.lock().await;
644 Ok(engine.create_checkpoint())
645 }
646
647 pub async fn restore_pipeline(
649 &mut self,
650 pipeline_id: &str,
651 checkpoint: &crate::persistence::EngineCheckpoint,
652 ) -> Result<(), TenantError> {
653 let pipeline = self
654 .pipelines
655 .get_mut(pipeline_id)
656 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
657
658 let mut engine = pipeline.engine.lock().await;
659 engine
660 .restore_checkpoint(checkpoint)
661 .map_err(crate::engine::error::EngineError::Store)?;
662 Ok(())
663 }
664
665 pub async fn reload_pipeline(
667 &mut self,
668 pipeline_id: &str,
669 source: String,
670 ) -> Result<(), TenantError> {
671 let program = varpulis_parser::parse(&source)?;
672
673 let pipeline = self
674 .pipelines
675 .get_mut(pipeline_id)
676 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
677
678 pipeline.engine.lock().await.reload(&program)?;
679 pipeline.source = source;
680
681 Ok(())
682 }
683}
684
685pub struct TenantManager {
687 tenants: HashMap<TenantId, Tenant>,
688 api_key_index: HashMap<String, TenantId>,
690 store: Option<Arc<dyn StateStore>>,
692 prometheus_metrics: Option<Metrics>,
694 max_queue_depth: u64,
696 pending_events: Arc<AtomicU64>,
698 ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
701}
702
703impl std::fmt::Debug for TenantManager {
704 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
705 f.debug_struct("TenantManager")
706 .field("tenant_count", &self.tenants.len())
707 .field("max_queue_depth", &self.max_queue_depth)
708 .field("pending_events", &self.pending_events)
709 .field("has_store", &self.store.is_some())
710 .field("has_ws_broadcast", &self.ws_broadcast.is_some())
711 .finish_non_exhaustive()
712 }
713}
714
715impl TenantManager {
716 pub fn new() -> Self {
717 Self {
718 tenants: HashMap::new(),
719 api_key_index: HashMap::new(),
720 store: None,
721 prometheus_metrics: None,
722 max_queue_depth: 0,
723 pending_events: Arc::new(AtomicU64::new(0)),
724 ws_broadcast: None,
725 }
726 }
727
728 pub fn with_store(store: Arc<dyn StateStore>) -> Self {
730 Self {
731 tenants: HashMap::new(),
732 api_key_index: HashMap::new(),
733 store: Some(store),
734 prometheus_metrics: None,
735 max_queue_depth: 0,
736 pending_events: Arc::new(AtomicU64::new(0)),
737 ws_broadcast: None,
738 }
739 }
740
741 pub fn set_prometheus_metrics(&mut self, metrics: Metrics) {
743 self.prometheus_metrics = Some(metrics);
744 }
745
746 pub const fn set_max_queue_depth(&mut self, max_depth: u64) {
748 self.max_queue_depth = max_depth;
749 }
750
751 pub fn set_ws_broadcast(&mut self, tx: Arc<tokio::sync::broadcast::Sender<String>>) {
755 for tenant in self.tenants.values_mut() {
756 tenant.ws_broadcast = Some(Arc::clone(&tx));
757 }
758 self.ws_broadcast = Some(tx);
759 }
760
761 pub const fn max_queue_depth(&self) -> u64 {
763 self.max_queue_depth
764 }
765
766 pub fn pending_event_count(&self) -> u64 {
768 self.pending_events.load(Ordering::Relaxed)
769 }
770
771 pub fn pending_events_counter(&self) -> Arc<AtomicU64> {
773 Arc::clone(&self.pending_events)
774 }
775
776 pub fn check_backpressure(&self) -> Result<(), TenantError> {
778 if self.max_queue_depth == 0 {
779 return Ok(());
780 }
781 let current = self.pending_events.load(Ordering::Relaxed);
782 if current >= self.max_queue_depth {
783 return Err(TenantError::BackpressureExceeded {
784 current,
785 max: self.max_queue_depth,
786 });
787 }
788 Ok(())
789 }
790
791 pub fn queue_pressure_ratio(&self) -> f64 {
793 if self.max_queue_depth == 0 {
794 return 0.0;
795 }
796 self.pending_events.load(Ordering::Relaxed) as f64 / self.max_queue_depth as f64
797 }
798
799 pub fn create_tenant(
801 &mut self,
802 name: String,
803 api_key: String,
804 quota: TenantQuota,
805 ) -> Result<TenantId, TenantError> {
806 let key_hash = hash_api_key(&api_key);
807 if self.api_key_index.contains_key(&key_hash) {
808 return Err(TenantError::AlreadyExists(
809 "API key already in use".to_string(),
810 ));
811 }
812
813 let id = TenantId::generate();
814 let mut tenant = Tenant::new_with_hash(id.clone(), name, key_hash.clone(), quota);
815 tenant.ws_broadcast = self.ws_broadcast.clone();
816 self.tenants.insert(id.clone(), tenant);
817 self.api_key_index.insert(key_hash, id.clone());
818 self.persist_if_needed(&id);
819 Ok(id)
820 }
821
822 pub fn create_tenant_with_id(
824 &mut self,
825 id: TenantId,
826 name: String,
827 api_key_hash: String,
828 quota: TenantQuota,
829 ) -> Result<(), TenantError> {
830 if self.api_key_index.contains_key(&api_key_hash) {
831 return Err(TenantError::AlreadyExists(
832 "API key already in use".to_string(),
833 ));
834 }
835
836 let mut tenant = Tenant::new_with_hash(id.clone(), name, api_key_hash.clone(), quota);
837 tenant.ws_broadcast = self.ws_broadcast.clone();
838 self.tenants.insert(id.clone(), tenant);
839 self.api_key_index.insert(api_key_hash, id);
840 Ok(())
841 }
842
843 pub fn update_tenant_quota(&mut self, id: &TenantId, quota: TenantQuota) -> bool {
845 if let Some(tenant) = self.tenants.get_mut(id) {
846 tenant.quota = quota;
847 true
848 } else {
849 false
850 }
851 }
852
853 pub fn get_tenant_by_api_key(&self, api_key: &str) -> Option<&TenantId> {
855 let key_hash = hash_api_key(api_key);
856 self.api_key_index.get(&key_hash)
857 }
858
859 pub fn get_tenant(&self, id: &TenantId) -> Option<&Tenant> {
861 self.tenants.get(id)
862 }
863
864 pub fn get_tenant_mut(&mut self, id: &TenantId) -> Option<&mut Tenant> {
866 self.tenants.get_mut(id)
867 }
868
869 pub async fn deploy_pipeline_on_tenant(
871 &mut self,
872 tenant_id: &TenantId,
873 name: String,
874 source: String,
875 ) -> Result<String, TenantError> {
876 let metrics = self.prometheus_metrics.clone();
877 let ws_broadcast = self.ws_broadcast.clone();
878 let tenant = self
879 .tenants
880 .get_mut(tenant_id)
881 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
882 if tenant.ws_broadcast.is_none() {
884 tenant.ws_broadcast = ws_broadcast;
885 }
886 tenant
887 .deploy_pipeline_with_metrics(name, source, metrics)
888 .await
889 }
890
891 pub async fn deploy_global_pipeline_on_tenant(
893 &mut self,
894 tenant_id: &TenantId,
895 name: String,
896 source: String,
897 template_id: String,
898 ) -> Result<String, TenantError> {
899 let pipeline_id = self
900 .deploy_pipeline_on_tenant(tenant_id, name, source)
901 .await?;
902 if let Some(tenant) = self.tenants.get_mut(tenant_id) {
904 if let Some(pipeline) = tenant.pipelines.get_mut(&pipeline_id) {
905 pipeline.global_template_id = Some(template_id);
906 }
907 }
908 Ok(pipeline_id)
909 }
910
911 pub async fn process_event_with_backpressure(
917 &mut self,
918 tenant_id: &TenantId,
919 pipeline_id: &str,
920 event: Event,
921 ) -> Result<Vec<Event>, TenantError> {
922 self.check_backpressure()?;
923 self.pending_events.fetch_add(1, Ordering::Relaxed);
924 let tenant = self
925 .tenants
926 .get_mut(tenant_id)
927 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
928 let result = tenant.process_event(pipeline_id, event).await;
929 self.pending_events.fetch_sub(1, Ordering::Relaxed);
930
931 if let Some(ref metrics) = self.prometheus_metrics {
933 metrics
934 .queue_pressure_ratio
935 .with_label_values(&["_all"])
936 .set(self.queue_pressure_ratio());
937 }
938
939 result
940 }
941
942 pub fn remove_tenant(&mut self, id: &TenantId) -> Result<(), TenantError> {
944 let tenant = self
945 .tenants
946 .remove(id)
947 .ok_or_else(|| TenantError::NotFound(id.to_string()))?;
948 self.api_key_index.remove(&tenant.api_key_hash);
949 if let Some(ref store) = self.store {
950 if let Err(e) = Self::delete_tenant_state(store.as_ref(), id) {
951 warn!("Failed to delete persisted state for tenant {}: {}", id, e);
952 }
953 }
954 Ok(())
955 }
956
957 pub fn list_tenants(&self) -> Vec<&Tenant> {
959 self.tenants.values().collect()
960 }
961
962 pub fn tenant_count(&self) -> usize {
964 self.tenants.len()
965 }
966
967 pub async fn collect_pipeline_metrics(&self) -> Vec<(String, u64, u64)> {
969 let mut metrics = Vec::new();
970 for tenant in self.tenants.values() {
971 for pipeline in tenant.pipelines.values() {
972 let engine = pipeline.engine.lock().await;
973 let (events_in, events_out) = engine.event_counters();
974 metrics.push((pipeline.name.clone(), events_in, events_out));
975 }
976 }
977 metrics
978 }
979
980 pub fn collect_connector_health(
984 &self,
985 ) -> Vec<(
986 String,
987 String,
988 String,
989 crate::connector::ConnectorHealthReport,
990 )> {
991 let mut results = Vec::new();
992 for tenant in self.tenants.values() {
993 for pipeline in tenant.pipelines.values() {
994 if let Some(ref registry) = pipeline.connector_registry {
995 for (conn_name, conn_type, report) in registry.health_reports() {
996 results.push((
997 pipeline.name.clone(),
998 conn_name.to_string(),
999 conn_type.to_string(),
1000 report,
1001 ));
1002 }
1003 }
1004 }
1005 }
1006 results
1007 }
1008
1009 pub fn persist_if_needed(&self, tenant_id: &TenantId) {
1011 if let Some(ref store) = self.store {
1012 if let Some(tenant) = self.tenants.get(tenant_id) {
1013 if let Err(e) = Self::persist_tenant_to_store(store.as_ref(), tenant) {
1014 warn!("Failed to persist tenant {}: {}", tenant_id, e);
1015 }
1016 }
1017 }
1018 }
1019
1020 pub fn recover(&mut self) -> Result<usize, StoreError> {
1024 let store = match &self.store {
1025 Some(s) => Arc::clone(s),
1026 None => return Ok(0),
1027 };
1028
1029 let index_data = match store.get("tenants:index")? {
1031 Some(data) => data,
1032 None => return Ok(0),
1033 };
1034
1035 let tenant_ids: Vec<String> = serde_json::from_slice(&index_data)
1036 .map_err(|e| StoreError::SerializationError(e.to_string()))?;
1037
1038 let mut recovered = 0;
1039 let mut failed = 0;
1040
1041 for tid in &tenant_ids {
1042 let key = format!("tenant:{tid}");
1043 let data = match store.get(&key)? {
1044 Some(d) => d,
1045 None => {
1046 warn!("Tenant {} listed in index but not found in store", tid);
1047 failed += 1;
1048 continue;
1049 }
1050 };
1051
1052 let snapshot: TenantSnapshot = match serde_json::from_slice(&data) {
1053 Ok(s) => s,
1054 Err(e) => {
1055 warn!("Failed to deserialize tenant {}: {}", tid, e);
1056 failed += 1;
1057 continue;
1058 }
1059 };
1060
1061 match Self::restore_tenant_from_snapshot(snapshot) {
1062 Ok(tenant) => {
1063 let tenant_id = tenant.id.clone();
1064 self.api_key_index
1065 .insert(tenant.api_key_hash.clone(), tenant_id.clone());
1066 let pipeline_count = tenant.pipelines.len();
1067 self.tenants.insert(tenant_id.clone(), tenant);
1068 info!(
1069 "Recovered tenant {} with {} pipeline(s)",
1070 tenant_id, pipeline_count
1071 );
1072 recovered += 1;
1073 }
1074 Err(e) => {
1075 warn!("Failed to restore tenant {}: {}", tid, e);
1076 failed += 1;
1077 }
1078 }
1079 }
1080
1081 if failed > 0 {
1082 warn!(
1083 "Recovery complete: {} recovered, {} failed",
1084 recovered, failed
1085 );
1086 }
1087
1088 Ok(recovered)
1089 }
1090
1091 fn persist_tenant_to_store(store: &dyn StateStore, tenant: &Tenant) -> Result<(), StoreError> {
1092 let snapshot = tenant.snapshot();
1093 let data = serde_json::to_vec(&snapshot)
1094 .map_err(|e| StoreError::SerializationError(e.to_string()))?;
1095 let key = format!("tenant:{}", snapshot.id);
1096 store.put(&key, &data)?;
1097
1098 Self::update_tenant_index_add(store, &snapshot.id)?;
1100
1101 store.flush()
1102 }
1103
1104 fn delete_tenant_state(store: &dyn StateStore, id: &TenantId) -> Result<(), StoreError> {
1105 let key = format!("tenant:{}", id.0);
1106 store.delete(&key)?;
1107
1108 Self::update_tenant_index_remove(store, &id.0)?;
1109
1110 store.flush()
1111 }
1112
1113 fn update_tenant_index_add(store: &dyn StateStore, tenant_id: &str) -> Result<(), StoreError> {
1114 let mut ids = Self::load_tenant_index(store)?;
1115 let id_str = tenant_id.to_string();
1116 if !ids.contains(&id_str) {
1117 ids.push(id_str);
1118 }
1119 let data =
1120 serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1121 store.put("tenants:index", &data)
1122 }
1123
1124 fn update_tenant_index_remove(
1125 store: &dyn StateStore,
1126 tenant_id: &str,
1127 ) -> Result<(), StoreError> {
1128 let mut ids = Self::load_tenant_index(store)?;
1129 ids.retain(|id| id != tenant_id);
1130 let data =
1131 serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1132 store.put("tenants:index", &data)
1133 }
1134
1135 fn load_tenant_index(store: &dyn StateStore) -> Result<Vec<String>, StoreError> {
1136 match store.get("tenants:index")? {
1137 Some(data) => serde_json::from_slice(&data)
1138 .map_err(|e| StoreError::SerializationError(e.to_string())),
1139 None => Ok(Vec::new()),
1140 }
1141 }
1142
1143 fn restore_tenant_from_snapshot(snapshot: TenantSnapshot) -> Result<Tenant, TenantError> {
1144 let tenant_id = TenantId::new(&snapshot.id);
1145
1146 let mut tenant = Tenant::new_with_hash(
1147 tenant_id,
1148 snapshot.name,
1149 snapshot.api_key_hash,
1150 snapshot.quota,
1151 );
1152 tenant.usage.events_processed = snapshot.events_processed;
1153 tenant.usage.output_events_emitted = snapshot.output_events_emitted;
1154 tenant.usage.events_in_window = snapshot.events_in_window;
1155
1156 for ps in snapshot.pipelines {
1157 match Self::restore_pipeline_from_snapshot(ps.clone()) {
1158 Ok(pipeline) => {
1159 tenant.pipelines.insert(pipeline.id.clone(), pipeline);
1160 }
1161 Err(e) => {
1162 warn!(
1163 "Failed to restore pipeline '{}' ({}): {}",
1164 ps.name, ps.id, e
1165 );
1166 }
1167 }
1168 }
1169
1170 tenant.usage.active_pipelines = tenant.pipelines.len();
1171 Ok(tenant)
1172 }
1173
1174 fn restore_pipeline_from_snapshot(snapshot: PipelineSnapshot) -> Result<Pipeline, TenantError> {
1175 let program = varpulis_parser::parse(&snapshot.source)?;
1176
1177 let (output_tx, output_rx) = mpsc::channel(1000);
1178 let mut engine = Engine::new(output_tx);
1179 engine.load(&program)?;
1180
1181 let (log_tx, _) = tokio::sync::broadcast::channel(256);
1182 Ok(Pipeline {
1183 id: snapshot.id,
1184 name: snapshot.name,
1185 source: snapshot.source,
1186 engine: Arc::new(tokio::sync::Mutex::new(engine)),
1187 output_rx,
1188 log_broadcast: log_tx,
1189 created_at: Instant::now(),
1190 status: snapshot.status,
1191 orchestrator: None,
1192 connector_registry: None,
1193 global_template_id: snapshot.global_template_id,
1194 })
1195 }
1196}
1197
1198impl Default for TenantManager {
1199 fn default() -> Self {
1200 Self::new()
1201 }
1202}
1203
1204pub type SharedTenantManager = Arc<RwLock<TenantManager>>;
1206
1207pub fn shared_tenant_manager() -> SharedTenantManager {
1209 Arc::new(RwLock::new(TenantManager::new()))
1210}
1211
1212pub fn shared_tenant_manager_with_store(store: Arc<dyn StateStore>) -> SharedTenantManager {
1214 let mut mgr = TenantManager::with_store(store);
1215 match mgr.recover() {
1216 Ok(count) if count > 0 => {
1217 info!("Recovered {} tenant(s) from persistent state", count);
1218 }
1219 Ok(_) => {
1220 info!("No persisted tenant state found, starting fresh");
1221 }
1222 Err(e) => {
1223 error!("Failed to recover tenant state: {}", e);
1224 }
1225 }
1226 Arc::new(RwLock::new(mgr))
1227}
1228
1229#[derive(Debug, Clone, Serialize, Deserialize)]
1235pub struct TenantSnapshot {
1236 pub id: String,
1237 pub name: String,
1238 #[serde(alias = "api_key")]
1240 pub api_key_hash: String,
1241 pub quota: TenantQuota,
1242 pub events_processed: u64,
1243 #[serde(alias = "alerts_generated")]
1244 pub output_events_emitted: u64,
1245 pub pipelines: Vec<PipelineSnapshot>,
1246 #[serde(default)]
1247 pub created_at_ms: Option<i64>,
1248 #[serde(default)]
1249 pub events_in_window: u64,
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize)]
1254pub struct PipelineSnapshot {
1255 pub id: String,
1256 pub name: String,
1257 pub source: String,
1258 pub status: PipelineStatus,
1259 #[serde(default)]
1260 pub global_template_id: Option<String>,
1261}
1262
1263impl Pipeline {
1264 pub fn snapshot(&self) -> PipelineSnapshot {
1266 PipelineSnapshot {
1267 id: self.id.clone(),
1268 name: self.name.clone(),
1269 source: self.source.clone(),
1270 status: self.status.clone(),
1271 global_template_id: self.global_template_id.clone(),
1272 }
1273 }
1274}
1275
1276impl Tenant {
1277 pub fn snapshot(&self) -> TenantSnapshot {
1279 TenantSnapshot {
1280 id: self.id.0.clone(),
1281 name: self.name.clone(),
1282 api_key_hash: self.api_key_hash.clone(),
1283 quota: self.quota.clone(),
1284 events_processed: self.usage.events_processed,
1285 output_events_emitted: self.usage.output_events_emitted,
1286 pipelines: self.pipelines.values().map(|p| p.snapshot()).collect(),
1287 created_at_ms: Some(chrono::Utc::now().timestamp_millis()),
1288 events_in_window: self.usage.events_in_window,
1289 }
1290 }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295 use super::*;
1296
1297 #[test]
1298 fn test_tenant_id_generate() {
1299 let id1 = TenantId::generate();
1300 let id2 = TenantId::generate();
1301 assert_ne!(id1, id2);
1302 }
1303
1304 #[test]
1305 fn test_tenant_id_display() {
1306 let id = TenantId::new("test-123");
1307 assert_eq!(format!("{id}"), "test-123");
1308 assert_eq!(id.as_str(), "test-123");
1309 }
1310
1311 #[test]
1312 fn test_tenant_quota_tiers() {
1313 let free = TenantQuota::free();
1314 let pro = TenantQuota::pro();
1315 let enterprise = TenantQuota::enterprise();
1316
1317 assert!(free.max_pipelines < pro.max_pipelines);
1318 assert!(pro.max_pipelines < enterprise.max_pipelines);
1319 assert!(free.max_events_per_second < pro.max_events_per_second);
1320 assert!(pro.max_events_per_second < enterprise.max_events_per_second);
1321 }
1322
1323 #[test]
1324 fn test_usage_record_event() {
1325 let mut usage = TenantUsage::default();
1326 assert!(usage.record_event(100));
1327 assert_eq!(usage.events_processed, 1);
1328 assert_eq!(usage.events_in_window, 1);
1329 }
1330
1331 #[test]
1332 fn test_usage_rate_limit() {
1333 let mut usage = TenantUsage::default();
1334 assert!(usage.record_event(2));
1336 assert!(usage.record_event(2));
1337 assert!(!usage.record_event(2));
1339 }
1340
1341 #[test]
1342 fn test_usage_no_rate_limit() {
1343 let mut usage = TenantUsage::default();
1344 for _ in 0..1000 {
1346 assert!(usage.record_event(0));
1347 }
1348 }
1349
1350 #[test]
1351 fn test_tenant_manager_create() {
1352 let mut mgr = TenantManager::new();
1353 let id = mgr
1354 .create_tenant("Test Corp".into(), "key-123".into(), TenantQuota::free())
1355 .unwrap();
1356
1357 assert_eq!(mgr.tenant_count(), 1);
1358 assert!(mgr.get_tenant(&id).is_some());
1359 assert_eq!(mgr.get_tenant(&id).unwrap().name, "Test Corp");
1360 }
1361
1362 #[test]
1363 fn test_tenant_manager_api_key_lookup() {
1364 let mut mgr = TenantManager::new();
1365 let id = mgr
1366 .create_tenant("Test".into(), "my-key".into(), TenantQuota::default())
1367 .unwrap();
1368
1369 let found = mgr.get_tenant_by_api_key("my-key");
1370 assert_eq!(found, Some(&id));
1371
1372 assert!(mgr.get_tenant_by_api_key("wrong-key").is_none());
1373 }
1374
1375 #[test]
1376 fn test_tenant_manager_duplicate_api_key() {
1377 let mut mgr = TenantManager::new();
1378 mgr.create_tenant("A".into(), "key-1".into(), TenantQuota::default())
1379 .unwrap();
1380 let result = mgr.create_tenant("B".into(), "key-1".into(), TenantQuota::default());
1381 assert!(result.is_err());
1382 }
1383
1384 #[test]
1385 fn test_tenant_manager_remove() {
1386 let mut mgr = TenantManager::new();
1387 let id = mgr
1388 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1389 .unwrap();
1390
1391 mgr.remove_tenant(&id).unwrap();
1392 assert_eq!(mgr.tenant_count(), 0);
1393 assert!(mgr.get_tenant_by_api_key("key-1").is_none());
1394 }
1395
1396 #[tokio::test]
1397 async fn test_tenant_deploy_pipeline() {
1398 let mut mgr = TenantManager::new();
1399 let id = mgr
1400 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1401 .unwrap();
1402
1403 let tenant = mgr.get_tenant_mut(&id).unwrap();
1404 let vpl = r"
1405 stream Alerts = SensorReading
1406 .where(temperature > 100)
1407 ";
1408 let pipeline_id = tenant
1409 .deploy_pipeline("My Pipeline".into(), vpl.into())
1410 .await
1411 .unwrap();
1412 assert_eq!(tenant.pipelines.len(), 1);
1413 assert_eq!(tenant.usage.active_pipelines, 1);
1414 assert_eq!(
1415 tenant.pipelines[&pipeline_id].status,
1416 PipelineStatus::Running
1417 );
1418 }
1419
1420 #[tokio::test]
1421 async fn test_tenant_pipeline_quota() {
1422 let mut mgr = TenantManager::new();
1423 let quota = TenantQuota {
1424 max_pipelines: 1,
1425 max_events_per_second: 100,
1426 max_streams_per_pipeline: 50,
1427 };
1428 let id = mgr
1429 .create_tenant("Test".into(), "key-1".into(), quota)
1430 .unwrap();
1431
1432 let tenant = mgr.get_tenant_mut(&id).unwrap();
1433 let vpl = "stream A = SensorReading .where(x > 1)";
1434 tenant
1435 .deploy_pipeline("P1".into(), vpl.into())
1436 .await
1437 .unwrap();
1438
1439 let result = tenant.deploy_pipeline("P2".into(), vpl.into()).await;
1441 assert!(result.is_err());
1442 }
1443
1444 #[tokio::test]
1445 async fn test_tenant_remove_pipeline() {
1446 let mut mgr = TenantManager::new();
1447 let id = mgr
1448 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1449 .unwrap();
1450
1451 let tenant = mgr.get_tenant_mut(&id).unwrap();
1452 let vpl = "stream A = SensorReading .where(x > 1)";
1453 let pid = tenant
1454 .deploy_pipeline("P1".into(), vpl.into())
1455 .await
1456 .unwrap();
1457
1458 tenant.remove_pipeline(&pid).unwrap();
1459 assert_eq!(tenant.pipelines.len(), 0);
1460 assert_eq!(tenant.usage.active_pipelines, 0);
1461 }
1462
1463 #[tokio::test]
1464 async fn test_tenant_parse_error() {
1465 let mut mgr = TenantManager::new();
1466 let id = mgr
1467 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1468 .unwrap();
1469
1470 let tenant = mgr.get_tenant_mut(&id).unwrap();
1471 let result = tenant
1472 .deploy_pipeline("Bad".into(), "this is not valid VPL {{{{".into())
1473 .await;
1474 assert!(result.is_err());
1475 }
1476
1477 #[tokio::test]
1478 async fn test_tenant_process_event() {
1479 let mut mgr = TenantManager::new();
1480 let id = mgr
1481 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1482 .unwrap();
1483
1484 let tenant = mgr.get_tenant_mut(&id).unwrap();
1485 let vpl = "stream A = SensorReading .where(temperature > 100)";
1486 let pid = tenant
1487 .deploy_pipeline("P1".into(), vpl.into())
1488 .await
1489 .unwrap();
1490
1491 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1492 tenant.process_event(&pid, event).await.unwrap();
1493 assert_eq!(tenant.usage.events_processed, 1);
1494 }
1495
1496 #[tokio::test]
1497 async fn test_tenant_rate_limit_on_process() {
1498 let mut mgr = TenantManager::new();
1499 let quota = TenantQuota {
1500 max_pipelines: 10,
1501 max_events_per_second: 2,
1502 max_streams_per_pipeline: 50,
1503 };
1504 let id = mgr
1505 .create_tenant("Test".into(), "key-1".into(), quota)
1506 .unwrap();
1507
1508 let tenant = mgr.get_tenant_mut(&id).unwrap();
1509 let vpl = "stream A = SensorReading .where(x > 1)";
1510 let pid = tenant
1511 .deploy_pipeline("P1".into(), vpl.into())
1512 .await
1513 .unwrap();
1514
1515 let event = Event::new("SensorReading").with_field("x", 5);
1516 tenant.process_event(&pid, event.clone()).await.unwrap();
1517 tenant.process_event(&pid, event.clone()).await.unwrap();
1518
1519 let result = tenant.process_event(&pid, event).await;
1521 assert!(result.is_err());
1522 }
1523
1524 #[tokio::test]
1525 async fn test_tenant_reload_pipeline() {
1526 let mut mgr = TenantManager::new();
1527 let id = mgr
1528 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1529 .unwrap();
1530
1531 let tenant = mgr.get_tenant_mut(&id).unwrap();
1532 let vpl1 = "stream A = SensorReading .where(x > 1)";
1533 let pid = tenant
1534 .deploy_pipeline("P1".into(), vpl1.into())
1535 .await
1536 .unwrap();
1537
1538 let vpl2 = "stream B = SensorReading .where(x > 50)";
1539 tenant.reload_pipeline(&pid, vpl2.into()).await.unwrap();
1540
1541 assert_eq!(tenant.pipelines[&pid].source, vpl2);
1542 }
1543
1544 #[test]
1545 fn test_pipeline_status_display() {
1546 assert_eq!(format!("{}", PipelineStatus::Running), "running");
1547 assert_eq!(format!("{}", PipelineStatus::Stopped), "stopped");
1548 assert_eq!(
1549 format!("{}", PipelineStatus::Error("oops".into())),
1550 "error: oops"
1551 );
1552 }
1553
1554 #[test]
1555 fn test_tenant_error_display() {
1556 assert!(format!("{}", TenantError::NotFound("t1".into())).contains("t1"));
1557 assert!(format!("{}", TenantError::RateLimitExceeded).contains("rate limit"));
1558 let engine_err = crate::engine::error::EngineError::Compilation("bad".into());
1559 assert!(format!("{}", TenantError::EngineError(engine_err)).contains("bad"));
1560 }
1561
1562 #[test]
1563 fn test_shared_tenant_manager() {
1564 let mgr = shared_tenant_manager();
1565 assert!(Arc::strong_count(&mgr) == 1);
1566 }
1567
1568 #[test]
1569 fn test_tenant_list() {
1570 let mut mgr = TenantManager::new();
1571 mgr.create_tenant("A".into(), "key-a".into(), TenantQuota::default())
1572 .unwrap();
1573 mgr.create_tenant("B".into(), "key-b".into(), TenantQuota::default())
1574 .unwrap();
1575 assert_eq!(mgr.list_tenants().len(), 2);
1576 }
1577
1578 #[test]
1579 fn test_tenant_manager_default() {
1580 let mgr = TenantManager::default();
1581 assert_eq!(mgr.tenant_count(), 0);
1582 }
1583
1584 #[tokio::test]
1585 async fn test_tenant_snapshot_roundtrip() {
1586 let mut mgr = TenantManager::new();
1587 let id = mgr
1588 .create_tenant("Snap Corp".into(), "snap-key".into(), TenantQuota::pro())
1589 .unwrap();
1590
1591 let tenant = mgr.get_tenant_mut(&id).unwrap();
1592 let vpl = "stream A = SensorReading .where(x > 1)";
1593 tenant
1594 .deploy_pipeline("Pipeline1".into(), vpl.into())
1595 .await
1596 .unwrap();
1597 tenant.usage.events_processed = 42;
1598 tenant.usage.output_events_emitted = 7;
1599
1600 let snapshot = tenant.snapshot();
1601 let json = serde_json::to_vec(&snapshot).unwrap();
1602 let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1603
1604 assert_eq!(restored.id, id.0);
1605 assert_eq!(restored.name, "Snap Corp");
1606 assert_eq!(restored.api_key_hash, hash_api_key("snap-key"));
1607 assert_eq!(restored.events_processed, 42);
1608 assert_eq!(restored.output_events_emitted, 7);
1609 assert_eq!(restored.pipelines.len(), 1);
1610 assert_eq!(restored.pipelines[0].name, "Pipeline1");
1611 assert_eq!(restored.pipelines[0].source, vpl);
1612 assert_eq!(restored.pipelines[0].status, PipelineStatus::Running);
1613 assert_eq!(
1614 restored.quota.max_pipelines,
1615 TenantQuota::pro().max_pipelines
1616 );
1617 }
1618
1619 #[tokio::test]
1620 async fn test_tenant_manager_persistence_and_recovery() {
1621 use crate::persistence::MemoryStore;
1622
1623 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1624 let vpl = "stream A = SensorReading .where(x > 1)";
1625
1626 let (tenant_id, pipeline_name) = {
1628 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1629 let id = mgr
1630 .create_tenant(
1631 "Persisted Corp".into(),
1632 "persist-key".into(),
1633 TenantQuota::default(),
1634 )
1635 .unwrap();
1636
1637 let tenant = mgr.get_tenant_mut(&id).unwrap();
1638 tenant
1639 .deploy_pipeline("Persistent Pipeline".into(), vpl.into())
1640 .await
1641 .unwrap();
1642 tenant.usage.events_processed = 100;
1643 tenant.usage.output_events_emitted = 5;
1644 mgr.persist_if_needed(&id);
1645
1646 (id.0.clone(), "Persistent Pipeline".to_string())
1647 };
1648
1649 let mut mgr2 = TenantManager::with_store(Arc::clone(&store));
1651 let recovered = mgr2.recover().unwrap();
1652 assert_eq!(recovered, 1);
1653 assert_eq!(mgr2.tenant_count(), 1);
1654
1655 let tid = TenantId::new(&tenant_id);
1657 let tenant = mgr2.get_tenant(&tid).unwrap();
1658 assert_eq!(tenant.name, "Persisted Corp");
1659 assert_eq!(tenant.api_key_hash, hash_api_key("persist-key"));
1660 assert_eq!(tenant.usage.events_processed, 100);
1661 assert_eq!(tenant.usage.output_events_emitted, 5);
1662 assert_eq!(tenant.pipelines.len(), 1);
1663
1664 let pipeline = tenant.pipelines.values().next().unwrap();
1665 assert_eq!(pipeline.name, pipeline_name);
1666 assert_eq!(pipeline.source, vpl);
1667 assert_eq!(pipeline.status, PipelineStatus::Running);
1668
1669 assert_eq!(mgr2.get_tenant_by_api_key("persist-key"), Some(&tid));
1671 }
1672
1673 #[tokio::test]
1674 async fn test_persistence_survives_restart() {
1675 use crate::persistence::MemoryStore;
1676
1677 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1678 let vpl = "stream A = SensorReading .where(x > 1)";
1679
1680 {
1682 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1683 let id1 = mgr
1684 .create_tenant("Tenant A".into(), "key-a".into(), TenantQuota::free())
1685 .unwrap();
1686 let id2 = mgr
1687 .create_tenant("Tenant B".into(), "key-b".into(), TenantQuota::pro())
1688 .unwrap();
1689
1690 let tenant_a = mgr.get_tenant_mut(&id1).unwrap();
1691 tenant_a
1692 .deploy_pipeline("P1".into(), vpl.into())
1693 .await
1694 .unwrap();
1695 mgr.persist_if_needed(&id1);
1696
1697 let tenant_b = mgr.get_tenant_mut(&id2).unwrap();
1698 tenant_b
1699 .deploy_pipeline("P2".into(), vpl.into())
1700 .await
1701 .unwrap();
1702 tenant_b
1703 .deploy_pipeline("P3".into(), vpl.into())
1704 .await
1705 .unwrap();
1706 mgr.persist_if_needed(&id2);
1707 }
1708
1709 {
1711 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1712 let recovered = mgr.recover().unwrap();
1713 assert_eq!(recovered, 2);
1714 assert_eq!(mgr.tenant_count(), 2);
1715
1716 let tid_a = mgr.get_tenant_by_api_key("key-a").unwrap().clone();
1718 let tenant_a = mgr.get_tenant(&tid_a).unwrap();
1719 assert_eq!(tenant_a.name, "Tenant A");
1720 assert_eq!(tenant_a.pipelines.len(), 1);
1721
1722 let tid_b = mgr.get_tenant_by_api_key("key-b").unwrap().clone();
1724 let tenant_b = mgr.get_tenant(&tid_b).unwrap();
1725 assert_eq!(tenant_b.name, "Tenant B");
1726 assert_eq!(tenant_b.pipelines.len(), 2);
1727 }
1728 }
1729
1730 #[test]
1731 fn test_tenant_snapshot_created_at_and_window() {
1732 let mut mgr = TenantManager::new();
1733 let id = mgr
1734 .create_tenant(
1735 "Window Corp".into(),
1736 "window-key".into(),
1737 TenantQuota::default(),
1738 )
1739 .unwrap();
1740
1741 let tenant = mgr.get_tenant_mut(&id).unwrap();
1742 tenant.usage.events_in_window = 42;
1743 tenant.usage.events_processed = 100;
1744
1745 let snapshot = tenant.snapshot();
1746
1747 assert!(snapshot.created_at_ms.is_some());
1749 let ts = snapshot.created_at_ms.unwrap();
1750 assert!(ts > 1_704_067_200_000);
1752
1753 assert_eq!(snapshot.events_in_window, 42);
1755
1756 let json = serde_json::to_vec(&snapshot).unwrap();
1758 let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1759 assert_eq!(restored.events_in_window, 42);
1760 assert_eq!(restored.created_at_ms, snapshot.created_at_ms);
1761
1762 let restored_tenant = TenantManager::restore_tenant_from_snapshot(restored).unwrap();
1764 assert_eq!(restored_tenant.usage.events_in_window, 42);
1765 assert_eq!(restored_tenant.usage.events_processed, 100);
1766 }
1767
1768 #[test]
1769 fn test_tenant_snapshot_backwards_compat() {
1770 let old_json = r#"{
1772 "id": "compat-tenant",
1773 "name": "Compat Corp",
1774 "api_key": "compat-key",
1775 "quota": {
1776 "max_pipelines": 10,
1777 "max_events_per_second": 10000,
1778 "max_streams_per_pipeline": 50
1779 },
1780 "events_processed": 55,
1781 "alerts_generated": 3,
1782 "pipelines": []
1783 }"#;
1784
1785 let snapshot: TenantSnapshot = serde_json::from_str(old_json).unwrap();
1786 assert_eq!(snapshot.id, "compat-tenant");
1787 assert_eq!(snapshot.events_processed, 55);
1788 assert_eq!(snapshot.created_at_ms, None);
1790 assert_eq!(snapshot.events_in_window, 0);
1791
1792 let tenant = TenantManager::restore_tenant_from_snapshot(snapshot).unwrap();
1794 assert_eq!(tenant.name, "Compat Corp");
1795 assert_eq!(tenant.usage.events_processed, 55);
1796 assert_eq!(tenant.usage.events_in_window, 0);
1797 }
1798
1799 #[test]
1800 fn test_recovery_with_invalid_vpl() {
1801 use crate::persistence::MemoryStore;
1802
1803 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1804
1805 let snapshot = TenantSnapshot {
1807 id: "bad-tenant".into(),
1808 name: "Bad Corp".into(),
1809 api_key_hash: hash_api_key("bad-key"),
1810 quota: TenantQuota::default(),
1811 events_processed: 0,
1812 output_events_emitted: 0,
1813 pipelines: vec![PipelineSnapshot {
1814 id: "bad-pipeline".into(),
1815 name: "Bad Pipeline".into(),
1816 source: "this is not valid VPL {{{{".into(),
1817 status: PipelineStatus::Running,
1818 global_template_id: None,
1819 }],
1820 created_at_ms: None,
1821 events_in_window: 0,
1822 };
1823
1824 let data = serde_json::to_vec(&snapshot).unwrap();
1825 store.put("tenant:bad-tenant", &data).unwrap();
1826 let index = serde_json::to_vec(&vec!["bad-tenant"]).unwrap();
1827 store.put("tenants:index", &index).unwrap();
1828
1829 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1831 let recovered = mgr.recover().unwrap();
1832 assert_eq!(recovered, 1);
1833
1834 let tid = TenantId::new("bad-tenant");
1835 let tenant = mgr.get_tenant(&tid).unwrap();
1836 assert_eq!(tenant.name, "Bad Corp");
1837 assert_eq!(tenant.pipelines.len(), 0);
1839 }
1840
1841 #[test]
1842 fn test_backpressure_disabled_by_default() {
1843 let mgr = TenantManager::new();
1844 assert_eq!(mgr.max_queue_depth(), 0);
1845 assert!(mgr.check_backpressure().is_ok());
1846 }
1847
1848 #[test]
1849 fn test_backpressure_set_max_queue_depth() {
1850 let mut mgr = TenantManager::new();
1851 mgr.set_max_queue_depth(100);
1852 assert_eq!(mgr.max_queue_depth(), 100);
1853 assert!(mgr.check_backpressure().is_ok());
1855 }
1856
1857 #[test]
1858 fn test_backpressure_exceeded() {
1859 let mut mgr = TenantManager::new();
1860 mgr.set_max_queue_depth(10);
1861
1862 mgr.pending_events.store(10, Ordering::Relaxed);
1864 let result = mgr.check_backpressure();
1865 assert!(result.is_err());
1866
1867 if let Err(TenantError::BackpressureExceeded { current, max }) = result {
1868 assert_eq!(current, 10);
1869 assert_eq!(max, 10);
1870 } else {
1871 panic!("Expected BackpressureExceeded error");
1872 }
1873 }
1874
1875 #[test]
1876 fn test_backpressure_not_exceeded() {
1877 let mut mgr = TenantManager::new();
1878 mgr.set_max_queue_depth(10);
1879 mgr.pending_events.store(9, Ordering::Relaxed);
1880 assert!(mgr.check_backpressure().is_ok());
1881 }
1882
1883 #[test]
1884 fn test_backpressure_unlimited_when_zero() {
1885 let mut mgr = TenantManager::new();
1886 mgr.set_max_queue_depth(0);
1887 mgr.pending_events.store(1_000_000, Ordering::Relaxed);
1889 assert!(mgr.check_backpressure().is_ok());
1890 }
1891
1892 #[test]
1893 fn test_queue_pressure_ratio() {
1894 let mut mgr = TenantManager::new();
1895 mgr.set_max_queue_depth(100);
1896 mgr.pending_events.store(50, Ordering::Relaxed);
1897 let ratio = mgr.queue_pressure_ratio();
1898 assert!((ratio - 0.5).abs() < f64::EPSILON);
1899 }
1900
1901 #[test]
1902 fn test_queue_pressure_ratio_zero_max() {
1903 let mgr = TenantManager::new();
1904 assert_eq!(mgr.queue_pressure_ratio(), 0.0);
1905 }
1906
1907 #[test]
1908 fn test_pending_events_counter() {
1909 let mgr = TenantManager::new();
1910 assert_eq!(mgr.pending_event_count(), 0);
1911 let counter = mgr.pending_events_counter();
1912 counter.fetch_add(5, Ordering::Relaxed);
1913 assert_eq!(mgr.pending_event_count(), 5);
1914 }
1915
1916 #[tokio::test]
1917 async fn test_process_event_with_backpressure_ok() {
1918 let mut mgr = TenantManager::new();
1919 mgr.set_max_queue_depth(100);
1920 let id = mgr
1921 .create_tenant("Test".into(), "key-bp".into(), TenantQuota::default())
1922 .unwrap();
1923
1924 let tenant = mgr.get_tenant_mut(&id).unwrap();
1925 let vpl = "stream A = SensorReading .where(temperature > 100)";
1926 let pid = tenant
1927 .deploy_pipeline("BP Pipeline".into(), vpl.into())
1928 .await
1929 .unwrap();
1930
1931 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1932 let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1933 assert!(result.is_ok());
1934 assert_eq!(mgr.pending_event_count(), 0);
1936 }
1937
1938 #[tokio::test]
1939 async fn test_process_event_with_backpressure_rejected() {
1940 let mut mgr = TenantManager::new();
1941 mgr.set_max_queue_depth(5);
1942 mgr.pending_events.store(5, Ordering::Relaxed);
1944
1945 let id = mgr
1946 .create_tenant("Test".into(), "key-bp2".into(), TenantQuota::default())
1947 .unwrap();
1948 let tenant = mgr.get_tenant_mut(&id).unwrap();
1949 let vpl = "stream A = SensorReading .where(temperature > 100)";
1950 let pid = tenant
1951 .deploy_pipeline("BP Pipeline 2".into(), vpl.into())
1952 .await
1953 .unwrap();
1954
1955 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1956 let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1957 assert!(result.is_err());
1958 match result {
1959 Err(TenantError::BackpressureExceeded { current, max }) => {
1960 assert_eq!(current, 5);
1961 assert_eq!(max, 5);
1962 }
1963 _ => panic!("Expected BackpressureExceeded"),
1964 }
1965 }
1966
1967 #[test]
1968 fn test_backpressure_error_display() {
1969 let err = TenantError::BackpressureExceeded {
1970 current: 50000,
1971 max: 50000,
1972 };
1973 let msg = format!("{err}");
1974 assert!(msg.contains("50000"));
1975 assert!(msg.contains("exceeds maximum"));
1976 }
1977
1978 #[tokio::test]
1979 async fn test_collect_pipeline_metrics_returns_event_counts() {
1980 let mut mgr = TenantManager::new();
1981 let id = mgr
1982 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1983 .unwrap();
1984
1985 let vpl = "stream A = SensorReading .where(temperature > 100)";
1987 mgr.deploy_pipeline_on_tenant(&id, "test-pipeline".into(), vpl.into())
1988 .await
1989 .unwrap();
1990
1991 let metrics = mgr.collect_pipeline_metrics().await;
1993 assert_eq!(metrics.len(), 1, "Should have 1 pipeline");
1994 assert_eq!(metrics[0].0, "test-pipeline");
1995 assert_eq!(metrics[0].1, 0, "events_in should be 0 before processing");
1996 assert_eq!(metrics[0].2, 0, "events_out should be 0 before processing");
1997
1998 let tenant = mgr.get_tenant_mut(&id).unwrap();
2000 let pid = tenant.pipelines.keys().next().unwrap().clone();
2001 for i in 0..10 {
2002 let event = Event::new("SensorReading")
2003 .with_field("temperature", (i as f64).mul_add(20.0, 50.0));
2004 tenant.process_event(&pid, event).await.unwrap();
2005 }
2006
2007 let metrics = mgr.collect_pipeline_metrics().await;
2009 assert_eq!(metrics.len(), 1);
2010 assert_eq!(metrics[0].0, "test-pipeline");
2011 assert!(
2012 metrics[0].1 > 0,
2013 "events_in should be > 0 after processing, got {}",
2014 metrics[0].1
2015 );
2016 }
2017
2018 #[tokio::test]
2019 async fn test_collect_pipeline_metrics_multiple_tenants() {
2020 let mut mgr = TenantManager::new();
2021 let id1 = mgr
2022 .create_tenant("Tenant1".into(), "key-1".into(), TenantQuota::default())
2023 .unwrap();
2024 let id2 = mgr
2025 .create_tenant("Tenant2".into(), "key-2".into(), TenantQuota::default())
2026 .unwrap();
2027
2028 let vpl = "stream A = SensorReading .where(temperature > 100)";
2029 mgr.deploy_pipeline_on_tenant(&id1, "pipeline-1".into(), vpl.into())
2030 .await
2031 .unwrap();
2032 mgr.deploy_pipeline_on_tenant(&id2, "pipeline-2".into(), vpl.into())
2033 .await
2034 .unwrap();
2035
2036 let metrics = mgr.collect_pipeline_metrics().await;
2037 assert_eq!(metrics.len(), 2, "Should have 2 pipelines across 2 tenants");
2038 let names: Vec<&str> = metrics.iter().map(|(n, _, _)| n.as_str()).collect();
2039 assert!(names.contains(&"pipeline-1"));
2040 assert!(names.contains(&"pipeline-2"));
2041 }
2042
2043 #[tokio::test]
2044 async fn test_collect_pipeline_metrics_empty_when_no_pipelines() {
2045 let mut mgr = TenantManager::new();
2046 let _id = mgr
2047 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
2048 .unwrap();
2049
2050 let metrics = mgr.collect_pipeline_metrics().await;
2051 assert!(
2052 metrics.is_empty(),
2053 "Should be empty when no pipelines deployed"
2054 );
2055 }
2056
2057 #[tokio::test]
2058 async fn test_deploy_pipeline_on_tenant_and_collect_metrics() {
2059 let mut mgr = TenantManager::new();
2062 let id = mgr
2063 .create_tenant(
2064 "default".into(),
2065 "api-key-123".into(),
2066 TenantQuota::enterprise(),
2067 )
2068 .unwrap();
2069
2070 let vpl = r"
2071 event MarketTick:
2072 symbol: str
2073 price: float
2074 stream Ticks = MarketTick .where(price > 100)
2075 ";
2076 let pipeline_id = mgr
2077 .deploy_pipeline_on_tenant(&id, "financial-cep".into(), vpl.into())
2078 .await
2079 .unwrap();
2080
2081 let metrics = mgr.collect_pipeline_metrics().await;
2083 assert_eq!(metrics.len(), 1);
2084 assert_eq!(metrics[0].0, "financial-cep");
2085
2086 for _ in 0..5 {
2088 let event = Event::new("MarketTick")
2089 .with_field("symbol", "AAPL")
2090 .with_field("price", 150.0);
2091 mgr.process_event_with_backpressure(&id, &pipeline_id, event)
2092 .await
2093 .unwrap();
2094 }
2095
2096 let metrics = mgr.collect_pipeline_metrics().await;
2097 assert_eq!(metrics[0].1, 5, "Should have processed 5 events");
2098 }
2099}