1use crate::context::ContextOrchestrator;
6use crate::engine::Engine;
7use crate::event::Event;
8use crate::metrics::Metrics;
9use crate::persistence::{StateStore, StoreError};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::{mpsc, RwLock};
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
21pub struct TenantId(pub String);
22
23impl TenantId {
24 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn generate() -> Self {
29 Self(Uuid::new_v4().to_string())
30 }
31
32 pub fn as_str(&self) -> &str {
33 &self.0
34 }
35}
36
37impl std::fmt::Display for TenantId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}", self.0)
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TenantQuota {
46 pub max_pipelines: usize,
48 pub max_events_per_second: u64,
50 pub max_streams_per_pipeline: usize,
52}
53
54impl Default for TenantQuota {
55 fn default() -> Self {
56 Self {
57 max_pipelines: 10,
58 max_events_per_second: 10_000,
59 max_streams_per_pipeline: 50,
60 }
61 }
62}
63
64impl TenantQuota {
65 pub const fn free() -> Self {
67 Self {
68 max_pipelines: 2,
69 max_events_per_second: 100,
70 max_streams_per_pipeline: 5,
71 }
72 }
73
74 pub const fn pro() -> Self {
76 Self {
77 max_pipelines: 20,
78 max_events_per_second: 50_000,
79 max_streams_per_pipeline: 100,
80 }
81 }
82
83 pub const fn enterprise() -> Self {
85 Self {
86 max_pipelines: 1000,
87 max_events_per_second: 500_000,
88 max_streams_per_pipeline: 500,
89 }
90 }
91}
92
93#[derive(Debug, Clone, Default)]
95pub struct TenantUsage {
96 pub events_processed: u64,
98 pub events_in_window: u64,
100 pub window_start: Option<Instant>,
102 pub active_pipelines: usize,
104 pub output_events_emitted: u64,
106}
107
108impl TenantUsage {
109 pub fn record_event(&mut self, max_eps: u64) -> bool {
111 self.events_processed += 1;
112
113 let now = Instant::now();
114 match self.window_start {
115 Some(start) if now.duration_since(start).as_secs() < 1 => {
116 self.events_in_window += 1;
117 if max_eps > 0 && self.events_in_window > max_eps {
118 return false;
119 }
120 }
121 _ => {
122 self.window_start = Some(now);
123 self.events_in_window = 1;
124 }
125 }
126 true
127 }
128
129 pub const fn record_output_event(&mut self) {
130 self.output_events_emitted += 1;
131 }
132}
133
134#[derive(Debug)]
136pub struct Pipeline {
137 pub id: String,
139 pub name: String,
141 pub source: String,
143 pub engine: Arc<tokio::sync::Mutex<Engine>>,
145 pub output_rx: mpsc::Receiver<Event>,
147 pub log_broadcast: tokio::sync::broadcast::Sender<Event>,
150 pub created_at: Instant,
152 pub status: PipelineStatus,
154 pub orchestrator: Option<ContextOrchestrator>,
156 pub connector_registry: Option<crate::connector::ManagedConnectorRegistry>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub enum PipelineStatus {
163 Running,
164 Stopped,
165 Error(String),
166}
167
168impl std::fmt::Display for PipelineStatus {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 match self {
171 Self::Running => write!(f, "running"),
172 Self::Stopped => write!(f, "stopped"),
173 Self::Error(msg) => write!(f, "error: {msg}"),
174 }
175 }
176}
177
178#[derive(Debug, thiserror::Error)]
180pub enum TenantError {
181 #[error("tenant not found: {0}")]
183 NotFound(String),
184 #[error("pipeline not found: {0}")]
186 PipelineNotFound(String),
187 #[error("quota exceeded: {0}")]
189 QuotaExceeded(String),
190 #[error("rate limit exceeded")]
192 RateLimitExceeded,
193 #[error("queue depth {current} exceeds maximum {max}")]
195 BackpressureExceeded {
196 current: u64,
198 max: u64,
200 },
201 #[error("parse error: {0}")]
203 ParseError(#[from] varpulis_parser::ParseError),
204 #[error("engine error: {0}")]
206 EngineError(#[from] crate::engine::error::EngineError),
207 #[error("tenant already exists: {0}")]
209 AlreadyExists(String),
210}
211
212#[derive(Debug)]
214pub struct Tenant {
215 pub id: TenantId,
217 pub name: String,
219 pub api_key: String,
221 pub quota: TenantQuota,
223 pub usage: TenantUsage,
225 pub pipelines: HashMap<String, Pipeline>,
227 pub created_at: Instant,
229 pub(crate) ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
231}
232
233impl Tenant {
234 pub fn new(id: TenantId, name: String, api_key: String, quota: TenantQuota) -> Self {
235 Self {
236 id,
237 name,
238 api_key,
239 quota,
240 usage: TenantUsage::default(),
241 pipelines: HashMap::new(),
242 created_at: Instant::now(),
243 ws_broadcast: None,
244 }
245 }
246
247 pub async fn deploy_pipeline(
249 &mut self,
250 name: String,
251 source: String,
252 ) -> Result<String, TenantError> {
253 self.deploy_pipeline_with_metrics(name, source, None).await
254 }
255
256 pub async fn deploy_pipeline_with_metrics(
257 &mut self,
258 name: String,
259 source: String,
260 prometheus_metrics: Option<Metrics>,
261 ) -> Result<String, TenantError> {
262 if self.pipelines.len() >= self.quota.max_pipelines {
264 return Err(TenantError::QuotaExceeded(format!(
265 "max pipelines ({}) reached",
266 self.quota.max_pipelines
267 )));
268 }
269
270 let program = varpulis_parser::parse(&source)?;
272
273 let stream_count = program
275 .statements
276 .iter()
277 .filter(|s| matches!(&s.node, varpulis_core::ast::Stmt::StreamDecl { .. }))
278 .count();
279 if stream_count > self.quota.max_streams_per_pipeline {
280 return Err(TenantError::QuotaExceeded(format!(
281 "pipeline has {} streams, max is {}",
282 stream_count, self.quota.max_streams_per_pipeline
283 )));
284 }
285
286 let (output_tx, output_rx) = mpsc::channel(1000);
288 let output_tx_for_ctx = output_tx.clone();
289 let mut engine = Engine::new(output_tx);
290 if let Some(m) = prometheus_metrics {
291 engine = engine.with_metrics(m);
292 }
293 engine.load(&program)?;
294
295 let orchestrator = if engine.has_contexts() {
297 match ContextOrchestrator::build(
299 engine.context_map(),
300 &program,
301 output_tx_for_ctx,
302 1000,
303 ) {
304 Ok(orch) => Some(orch),
305 Err(e) => {
306 return Err(crate::engine::error::EngineError::Pipeline(format!(
307 "Failed to build context orchestrator: {e}"
308 ))
309 .into());
310 }
311 }
312 } else {
313 engine.connect_sinks().await?;
315 None
316 };
317
318 let bindings = engine.source_bindings().to_vec();
320 let mut connector_registry = None;
321
322 if !bindings.is_empty() {
323 let (event_tx, event_rx) = mpsc::channel::<Event>(10_000);
324
325 let mut registry = crate::connector::ManagedConnectorRegistry::from_configs(
326 engine.connector_configs(),
327 )
328 .map_err(|e| {
329 TenantError::EngineError(crate::engine::error::EngineError::Pipeline(format!(
330 "Registry build error: {e}"
331 )))
332 })?;
333
334 for binding in &bindings {
335 let config = engine.get_connector(&binding.connector_name).cloned();
336 let Some(config) = config else {
337 return Err(crate::engine::error::EngineError::Pipeline(format!(
338 "Connector '{}' referenced in .from() but not declared",
339 binding.connector_name
340 ))
341 .into());
342 };
343
344 let topic = binding
345 .topic_override
346 .as_deref()
347 .or(config.topic.as_deref())
348 .unwrap_or("varpulis/events/#");
349
350 info!(
351 "Starting {} source: {} topic={}",
352 config.connector_type, binding.connector_name, topic
353 );
354
355 registry
356 .start_source(
357 &binding.connector_name,
358 topic,
359 event_tx.clone(),
360 &binding.extra_params,
361 )
362 .await
363 .map_err(|e| -> TenantError {
364 crate::engine::error::EngineError::Pipeline(format!(
365 "Source start error: {e}"
366 ))
367 .into()
368 })?;
369
370 let sink_keys = engine.sink_keys_for_connector(&binding.connector_name);
372 for sink_key in &sink_keys {
373 let sink_topic = if let Some(t) =
374 sink_key.strip_prefix(&format!("{}::", binding.connector_name))
375 {
376 t.to_string()
377 } else {
378 config
379 .topic
380 .clone()
381 .unwrap_or_else(|| format!("{}-output", binding.connector_name))
382 };
383
384 let empty_params = std::collections::HashMap::new();
385 match registry.create_sink(&binding.connector_name, &sink_topic, &empty_params)
386 {
387 Ok(sink) => {
388 engine.inject_sink(sink_key, sink);
389 }
390 Err(e) => {
391 warn!("Failed to create sink for {}: {}", sink_key, e);
392 }
393 }
394 }
395 }
396
397 connector_registry = Some(registry);
398
399 let engine = Arc::new(tokio::sync::Mutex::new(engine));
401 let engine_for_source = Arc::clone(&engine);
402
403 tokio::spawn(async move {
405 let mut event_rx = event_rx;
406 while let Some(event) = event_rx.recv().await {
407 let mut eng = engine_for_source.lock().await;
408 if let Err(e) = eng.process(event).await {
409 warn!("Source event processing error: {}", e);
410 }
411 }
412 info!("Source ingestion loop ended");
413 });
414
415 let id = Uuid::new_v4().to_string();
416 let (log_tx, _) = tokio::sync::broadcast::channel(256);
417
418 let log_tx_for_drain = log_tx.clone();
423 let ws_tx_for_drain = self.ws_broadcast.clone();
424 tokio::spawn(async move {
425 let mut output_rx = output_rx;
426 while let Some(ev) = output_rx.recv().await {
427 let _ = log_tx_for_drain.send(ev.clone());
428 if let Some(ref ws_tx) = ws_tx_for_drain {
430 let msg = serde_json::json!({
431 "type": "output_event",
432 "event_type": ev.event_type.to_string(),
433 "data": ev.data,
434 "timestamp": ev.timestamp.to_rfc3339(),
435 });
436 if let Ok(json) = serde_json::to_string(&msg) {
437 if ws_tx.send(json).is_err() {
438 tracing::debug!("No WS subscribers for drain output event");
439 }
440 }
441 }
442 }
443 });
444
445 let (_drain_tx, placeholder_rx) = mpsc::channel(1);
448
449 let pipeline = Pipeline {
450 id: id.clone(),
451 name,
452 source,
453 engine,
454 output_rx: placeholder_rx,
455 log_broadcast: log_tx,
456 created_at: Instant::now(),
457 status: PipelineStatus::Running,
458 orchestrator,
459 connector_registry,
460 };
461
462 self.pipelines.insert(id.clone(), pipeline);
463 self.usage.active_pipelines = self.pipelines.len();
464
465 return Ok(id);
466 }
467
468 let engine = Arc::new(tokio::sync::Mutex::new(engine));
470 let id = Uuid::new_v4().to_string();
471 let (log_tx, _) = tokio::sync::broadcast::channel(256);
472 let pipeline = Pipeline {
473 id: id.clone(),
474 name,
475 source,
476 engine,
477 output_rx,
478 log_broadcast: log_tx,
479 created_at: Instant::now(),
480 status: PipelineStatus::Running,
481 orchestrator,
482 connector_registry,
483 };
484
485 self.pipelines.insert(id.clone(), pipeline);
486 self.usage.active_pipelines = self.pipelines.len();
487
488 Ok(id)
489 }
490
491 pub fn remove_pipeline(&mut self, pipeline_id: &str) -> Result<(), TenantError> {
493 self.pipelines
494 .remove(pipeline_id)
495 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
496 self.usage.active_pipelines = self.pipelines.len();
497 Ok(())
498 }
499
500 pub async fn process_event(
502 &mut self,
503 pipeline_id: &str,
504 event: Event,
505 ) -> Result<Vec<Event>, TenantError> {
506 if !self.usage.record_event(self.quota.max_events_per_second) {
508 return Err(TenantError::RateLimitExceeded);
509 }
510
511 let pipeline = self
512 .pipelines
513 .get_mut(pipeline_id)
514 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
515
516 if pipeline.status != PipelineStatus::Running {
517 return Err(crate::engine::error::EngineError::Pipeline(format!(
518 "pipeline is {}",
519 pipeline.status
520 ))
521 .into());
522 }
523
524 if let Some(ref orchestrator) = pipeline.orchestrator {
525 let shared_event = std::sync::Arc::new(event);
527 match orchestrator.try_process(shared_event) {
528 Ok(()) => {}
529 Err(crate::context::DispatchError::ChannelFull(msg)) => {
530 if let crate::context::ContextMessage::Event(event) = msg {
532 orchestrator
533 .process(event)
534 .await
535 .map_err(|e| -> TenantError {
536 crate::engine::error::EngineError::Pipeline(e).into()
537 })?;
538 }
539 }
540 Err(crate::context::DispatchError::ChannelClosed(_)) => {
541 return Err(crate::engine::error::EngineError::Pipeline(
542 "Context channel closed".to_string(),
543 )
544 .into());
545 }
546 }
547 } else {
548 pipeline.engine.lock().await.process(event).await?;
550 }
551
552 let mut output_events = Vec::new();
554 while let Ok(ev) = pipeline.output_rx.try_recv() {
555 let _ = pipeline.log_broadcast.send(ev.clone());
557 output_events.push(ev);
558 }
559
560 Ok(output_events)
561 }
562
563 pub fn subscribe_pipeline_logs(
566 &self,
567 pipeline_id: &str,
568 ) -> Result<tokio::sync::broadcast::Receiver<Event>, TenantError> {
569 let pipeline = self
570 .pipelines
571 .get(pipeline_id)
572 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
573 Ok(pipeline.log_broadcast.subscribe())
574 }
575
576 pub async fn checkpoint_pipeline(
578 &self,
579 pipeline_id: &str,
580 ) -> Result<crate::persistence::EngineCheckpoint, TenantError> {
581 let pipeline = self
582 .pipelines
583 .get(pipeline_id)
584 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
585
586 let engine = pipeline.engine.lock().await;
587 Ok(engine.create_checkpoint())
588 }
589
590 pub async fn restore_pipeline(
592 &mut self,
593 pipeline_id: &str,
594 checkpoint: &crate::persistence::EngineCheckpoint,
595 ) -> Result<(), TenantError> {
596 let pipeline = self
597 .pipelines
598 .get_mut(pipeline_id)
599 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
600
601 let mut engine = pipeline.engine.lock().await;
602 engine
603 .restore_checkpoint(checkpoint)
604 .map_err(crate::engine::error::EngineError::Store)?;
605 Ok(())
606 }
607
608 pub async fn reload_pipeline(
610 &mut self,
611 pipeline_id: &str,
612 source: String,
613 ) -> Result<(), TenantError> {
614 let program = varpulis_parser::parse(&source)?;
615
616 let pipeline = self
617 .pipelines
618 .get_mut(pipeline_id)
619 .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
620
621 pipeline.engine.lock().await.reload(&program)?;
622 pipeline.source = source;
623
624 Ok(())
625 }
626}
627
628pub struct TenantManager {
630 tenants: HashMap<TenantId, Tenant>,
631 api_key_index: HashMap<String, TenantId>,
633 store: Option<Arc<dyn StateStore>>,
635 prometheus_metrics: Option<Metrics>,
637 max_queue_depth: u64,
639 pending_events: Arc<AtomicU64>,
641 ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
644}
645
646impl std::fmt::Debug for TenantManager {
647 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648 f.debug_struct("TenantManager")
649 .field("tenant_count", &self.tenants.len())
650 .field("max_queue_depth", &self.max_queue_depth)
651 .field("pending_events", &self.pending_events)
652 .field("has_store", &self.store.is_some())
653 .field("has_ws_broadcast", &self.ws_broadcast.is_some())
654 .finish_non_exhaustive()
655 }
656}
657
658impl TenantManager {
659 pub fn new() -> Self {
660 Self {
661 tenants: HashMap::new(),
662 api_key_index: HashMap::new(),
663 store: None,
664 prometheus_metrics: None,
665 max_queue_depth: 0,
666 pending_events: Arc::new(AtomicU64::new(0)),
667 ws_broadcast: None,
668 }
669 }
670
671 pub fn with_store(store: Arc<dyn StateStore>) -> Self {
673 Self {
674 tenants: HashMap::new(),
675 api_key_index: HashMap::new(),
676 store: Some(store),
677 prometheus_metrics: None,
678 max_queue_depth: 0,
679 pending_events: Arc::new(AtomicU64::new(0)),
680 ws_broadcast: None,
681 }
682 }
683
684 pub fn set_prometheus_metrics(&mut self, metrics: Metrics) {
686 self.prometheus_metrics = Some(metrics);
687 }
688
689 pub const fn set_max_queue_depth(&mut self, max_depth: u64) {
691 self.max_queue_depth = max_depth;
692 }
693
694 pub fn set_ws_broadcast(&mut self, tx: Arc<tokio::sync::broadcast::Sender<String>>) {
698 for tenant in self.tenants.values_mut() {
699 tenant.ws_broadcast = Some(Arc::clone(&tx));
700 }
701 self.ws_broadcast = Some(tx);
702 }
703
704 pub const fn max_queue_depth(&self) -> u64 {
706 self.max_queue_depth
707 }
708
709 pub fn pending_event_count(&self) -> u64 {
711 self.pending_events.load(Ordering::Relaxed)
712 }
713
714 pub fn pending_events_counter(&self) -> Arc<AtomicU64> {
716 Arc::clone(&self.pending_events)
717 }
718
719 pub fn check_backpressure(&self) -> Result<(), TenantError> {
721 if self.max_queue_depth == 0 {
722 return Ok(());
723 }
724 let current = self.pending_events.load(Ordering::Relaxed);
725 if current >= self.max_queue_depth {
726 return Err(TenantError::BackpressureExceeded {
727 current,
728 max: self.max_queue_depth,
729 });
730 }
731 Ok(())
732 }
733
734 pub fn queue_pressure_ratio(&self) -> f64 {
736 if self.max_queue_depth == 0 {
737 return 0.0;
738 }
739 self.pending_events.load(Ordering::Relaxed) as f64 / self.max_queue_depth as f64
740 }
741
742 pub fn create_tenant(
744 &mut self,
745 name: String,
746 api_key: String,
747 quota: TenantQuota,
748 ) -> Result<TenantId, TenantError> {
749 if self.api_key_index.contains_key(&api_key) {
750 return Err(TenantError::AlreadyExists(
751 "API key already in use".to_string(),
752 ));
753 }
754
755 let id = TenantId::generate();
756 let mut tenant = Tenant::new(id.clone(), name, api_key.clone(), quota);
757 tenant.ws_broadcast = self.ws_broadcast.clone();
758 self.tenants.insert(id.clone(), tenant);
759 self.api_key_index.insert(api_key, id.clone());
760 self.persist_if_needed(&id);
761 Ok(id)
762 }
763
764 pub fn get_tenant_by_api_key(&self, api_key: &str) -> Option<&TenantId> {
766 self.api_key_index.get(api_key)
767 }
768
769 pub fn get_tenant(&self, id: &TenantId) -> Option<&Tenant> {
771 self.tenants.get(id)
772 }
773
774 pub fn get_tenant_mut(&mut self, id: &TenantId) -> Option<&mut Tenant> {
776 self.tenants.get_mut(id)
777 }
778
779 pub async fn deploy_pipeline_on_tenant(
781 &mut self,
782 tenant_id: &TenantId,
783 name: String,
784 source: String,
785 ) -> Result<String, TenantError> {
786 let metrics = self.prometheus_metrics.clone();
787 let ws_broadcast = self.ws_broadcast.clone();
788 let tenant = self
789 .tenants
790 .get_mut(tenant_id)
791 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
792 if tenant.ws_broadcast.is_none() {
794 tenant.ws_broadcast = ws_broadcast;
795 }
796 tenant
797 .deploy_pipeline_with_metrics(name, source, metrics)
798 .await
799 }
800
801 pub async fn process_event_with_backpressure(
807 &mut self,
808 tenant_id: &TenantId,
809 pipeline_id: &str,
810 event: Event,
811 ) -> Result<Vec<Event>, TenantError> {
812 self.check_backpressure()?;
813 self.pending_events.fetch_add(1, Ordering::Relaxed);
814 let tenant = self
815 .tenants
816 .get_mut(tenant_id)
817 .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
818 let result = tenant.process_event(pipeline_id, event).await;
819 self.pending_events.fetch_sub(1, Ordering::Relaxed);
820
821 if let Some(ref metrics) = self.prometheus_metrics {
823 metrics
824 .queue_pressure_ratio
825 .with_label_values(&["_all"])
826 .set(self.queue_pressure_ratio());
827 }
828
829 result
830 }
831
832 pub fn remove_tenant(&mut self, id: &TenantId) -> Result<(), TenantError> {
834 let tenant = self
835 .tenants
836 .remove(id)
837 .ok_or_else(|| TenantError::NotFound(id.to_string()))?;
838 self.api_key_index.remove(&tenant.api_key);
839 if let Some(ref store) = self.store {
840 if let Err(e) = Self::delete_tenant_state(store.as_ref(), id) {
841 warn!("Failed to delete persisted state for tenant {}: {}", id, e);
842 }
843 }
844 Ok(())
845 }
846
847 pub fn list_tenants(&self) -> Vec<&Tenant> {
849 self.tenants.values().collect()
850 }
851
852 pub fn tenant_count(&self) -> usize {
854 self.tenants.len()
855 }
856
857 pub async fn collect_pipeline_metrics(&self) -> Vec<(String, u64, u64)> {
859 let mut metrics = Vec::new();
860 for tenant in self.tenants.values() {
861 for pipeline in tenant.pipelines.values() {
862 let engine = pipeline.engine.lock().await;
863 let (events_in, events_out) = engine.event_counters();
864 metrics.push((pipeline.name.clone(), events_in, events_out));
865 }
866 }
867 metrics
868 }
869
870 pub fn collect_connector_health(
874 &self,
875 ) -> Vec<(
876 String,
877 String,
878 String,
879 crate::connector::ConnectorHealthReport,
880 )> {
881 let mut results = Vec::new();
882 for tenant in self.tenants.values() {
883 for pipeline in tenant.pipelines.values() {
884 if let Some(ref registry) = pipeline.connector_registry {
885 for (conn_name, conn_type, report) in registry.health_reports() {
886 results.push((
887 pipeline.name.clone(),
888 conn_name.to_string(),
889 conn_type.to_string(),
890 report,
891 ));
892 }
893 }
894 }
895 }
896 results
897 }
898
899 pub fn persist_if_needed(&self, tenant_id: &TenantId) {
901 if let Some(ref store) = self.store {
902 if let Some(tenant) = self.tenants.get(tenant_id) {
903 if let Err(e) = Self::persist_tenant_to_store(store.as_ref(), tenant) {
904 warn!("Failed to persist tenant {}: {}", tenant_id, e);
905 }
906 }
907 }
908 }
909
910 pub fn recover(&mut self) -> Result<usize, StoreError> {
914 let store = match &self.store {
915 Some(s) => Arc::clone(s),
916 None => return Ok(0),
917 };
918
919 let index_data = match store.get("tenants:index")? {
921 Some(data) => data,
922 None => return Ok(0),
923 };
924
925 let tenant_ids: Vec<String> = serde_json::from_slice(&index_data)
926 .map_err(|e| StoreError::SerializationError(e.to_string()))?;
927
928 let mut recovered = 0;
929 let mut failed = 0;
930
931 for tid in &tenant_ids {
932 let key = format!("tenant:{tid}");
933 let data = match store.get(&key)? {
934 Some(d) => d,
935 None => {
936 warn!("Tenant {} listed in index but not found in store", tid);
937 failed += 1;
938 continue;
939 }
940 };
941
942 let snapshot: TenantSnapshot = match serde_json::from_slice(&data) {
943 Ok(s) => s,
944 Err(e) => {
945 warn!("Failed to deserialize tenant {}: {}", tid, e);
946 failed += 1;
947 continue;
948 }
949 };
950
951 match Self::restore_tenant_from_snapshot(snapshot) {
952 Ok(tenant) => {
953 let tenant_id = tenant.id.clone();
954 self.api_key_index
955 .insert(tenant.api_key.clone(), tenant_id.clone());
956 let pipeline_count = tenant.pipelines.len();
957 self.tenants.insert(tenant_id.clone(), tenant);
958 info!(
959 "Recovered tenant {} with {} pipeline(s)",
960 tenant_id, pipeline_count
961 );
962 recovered += 1;
963 }
964 Err(e) => {
965 warn!("Failed to restore tenant {}: {}", tid, e);
966 failed += 1;
967 }
968 }
969 }
970
971 if failed > 0 {
972 warn!(
973 "Recovery complete: {} recovered, {} failed",
974 recovered, failed
975 );
976 }
977
978 Ok(recovered)
979 }
980
981 fn persist_tenant_to_store(store: &dyn StateStore, tenant: &Tenant) -> Result<(), StoreError> {
982 let snapshot = tenant.snapshot();
983 let data = serde_json::to_vec(&snapshot)
984 .map_err(|e| StoreError::SerializationError(e.to_string()))?;
985 let key = format!("tenant:{}", snapshot.id);
986 store.put(&key, &data)?;
987
988 Self::update_tenant_index_add(store, &snapshot.id)?;
990
991 store.flush()
992 }
993
994 fn delete_tenant_state(store: &dyn StateStore, id: &TenantId) -> Result<(), StoreError> {
995 let key = format!("tenant:{}", id.0);
996 store.delete(&key)?;
997
998 Self::update_tenant_index_remove(store, &id.0)?;
999
1000 store.flush()
1001 }
1002
1003 fn update_tenant_index_add(store: &dyn StateStore, tenant_id: &str) -> Result<(), StoreError> {
1004 let mut ids = Self::load_tenant_index(store)?;
1005 let id_str = tenant_id.to_string();
1006 if !ids.contains(&id_str) {
1007 ids.push(id_str);
1008 }
1009 let data =
1010 serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1011 store.put("tenants:index", &data)
1012 }
1013
1014 fn update_tenant_index_remove(
1015 store: &dyn StateStore,
1016 tenant_id: &str,
1017 ) -> Result<(), StoreError> {
1018 let mut ids = Self::load_tenant_index(store)?;
1019 ids.retain(|id| id != tenant_id);
1020 let data =
1021 serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1022 store.put("tenants:index", &data)
1023 }
1024
1025 fn load_tenant_index(store: &dyn StateStore) -> Result<Vec<String>, StoreError> {
1026 match store.get("tenants:index")? {
1027 Some(data) => serde_json::from_slice(&data)
1028 .map_err(|e| StoreError::SerializationError(e.to_string())),
1029 None => Ok(Vec::new()),
1030 }
1031 }
1032
1033 fn restore_tenant_from_snapshot(snapshot: TenantSnapshot) -> Result<Tenant, TenantError> {
1034 let tenant_id = TenantId::new(&snapshot.id);
1035
1036 let mut tenant = Tenant::new(tenant_id, snapshot.name, snapshot.api_key, snapshot.quota);
1037 tenant.usage.events_processed = snapshot.events_processed;
1038 tenant.usage.output_events_emitted = snapshot.output_events_emitted;
1039 tenant.usage.events_in_window = snapshot.events_in_window;
1040
1041 for ps in snapshot.pipelines {
1042 match Self::restore_pipeline_from_snapshot(ps.clone()) {
1043 Ok(pipeline) => {
1044 tenant.pipelines.insert(pipeline.id.clone(), pipeline);
1045 }
1046 Err(e) => {
1047 warn!(
1048 "Failed to restore pipeline '{}' ({}): {}",
1049 ps.name, ps.id, e
1050 );
1051 }
1052 }
1053 }
1054
1055 tenant.usage.active_pipelines = tenant.pipelines.len();
1056 Ok(tenant)
1057 }
1058
1059 fn restore_pipeline_from_snapshot(snapshot: PipelineSnapshot) -> Result<Pipeline, TenantError> {
1060 let program = varpulis_parser::parse(&snapshot.source)?;
1061
1062 let (output_tx, output_rx) = mpsc::channel(1000);
1063 let mut engine = Engine::new(output_tx);
1064 engine.load(&program)?;
1065
1066 let (log_tx, _) = tokio::sync::broadcast::channel(256);
1067 Ok(Pipeline {
1068 id: snapshot.id,
1069 name: snapshot.name,
1070 source: snapshot.source,
1071 engine: Arc::new(tokio::sync::Mutex::new(engine)),
1072 output_rx,
1073 log_broadcast: log_tx,
1074 created_at: Instant::now(),
1075 status: snapshot.status,
1076 orchestrator: None,
1077 connector_registry: None,
1078 })
1079 }
1080}
1081
1082impl Default for TenantManager {
1083 fn default() -> Self {
1084 Self::new()
1085 }
1086}
1087
1088pub type SharedTenantManager = Arc<RwLock<TenantManager>>;
1090
1091pub fn shared_tenant_manager() -> SharedTenantManager {
1093 Arc::new(RwLock::new(TenantManager::new()))
1094}
1095
1096pub fn shared_tenant_manager_with_store(store: Arc<dyn StateStore>) -> SharedTenantManager {
1098 let mut mgr = TenantManager::with_store(store);
1099 match mgr.recover() {
1100 Ok(count) if count > 0 => {
1101 info!("Recovered {} tenant(s) from persistent state", count);
1102 }
1103 Ok(_) => {
1104 info!("No persisted tenant state found, starting fresh");
1105 }
1106 Err(e) => {
1107 error!("Failed to recover tenant state: {}", e);
1108 }
1109 }
1110 Arc::new(RwLock::new(mgr))
1111}
1112
1113#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct TenantSnapshot {
1120 pub id: String,
1121 pub name: String,
1122 pub api_key: String,
1123 pub quota: TenantQuota,
1124 pub events_processed: u64,
1125 #[serde(alias = "alerts_generated")]
1126 pub output_events_emitted: u64,
1127 pub pipelines: Vec<PipelineSnapshot>,
1128 #[serde(default)]
1129 pub created_at_ms: Option<i64>,
1130 #[serde(default)]
1131 pub events_in_window: u64,
1132}
1133
1134#[derive(Debug, Clone, Serialize, Deserialize)]
1136pub struct PipelineSnapshot {
1137 pub id: String,
1138 pub name: String,
1139 pub source: String,
1140 pub status: PipelineStatus,
1141}
1142
1143impl Pipeline {
1144 pub fn snapshot(&self) -> PipelineSnapshot {
1146 PipelineSnapshot {
1147 id: self.id.clone(),
1148 name: self.name.clone(),
1149 source: self.source.clone(),
1150 status: self.status.clone(),
1151 }
1152 }
1153}
1154
1155impl Tenant {
1156 pub fn snapshot(&self) -> TenantSnapshot {
1158 TenantSnapshot {
1159 id: self.id.0.clone(),
1160 name: self.name.clone(),
1161 api_key: self.api_key.clone(),
1162 quota: self.quota.clone(),
1163 events_processed: self.usage.events_processed,
1164 output_events_emitted: self.usage.output_events_emitted,
1165 pipelines: self.pipelines.values().map(|p| p.snapshot()).collect(),
1166 created_at_ms: Some(chrono::Utc::now().timestamp_millis()),
1167 events_in_window: self.usage.events_in_window,
1168 }
1169 }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174 use super::*;
1175
1176 #[test]
1177 fn test_tenant_id_generate() {
1178 let id1 = TenantId::generate();
1179 let id2 = TenantId::generate();
1180 assert_ne!(id1, id2);
1181 }
1182
1183 #[test]
1184 fn test_tenant_id_display() {
1185 let id = TenantId::new("test-123");
1186 assert_eq!(format!("{id}"), "test-123");
1187 assert_eq!(id.as_str(), "test-123");
1188 }
1189
1190 #[test]
1191 fn test_tenant_quota_tiers() {
1192 let free = TenantQuota::free();
1193 let pro = TenantQuota::pro();
1194 let enterprise = TenantQuota::enterprise();
1195
1196 assert!(free.max_pipelines < pro.max_pipelines);
1197 assert!(pro.max_pipelines < enterprise.max_pipelines);
1198 assert!(free.max_events_per_second < pro.max_events_per_second);
1199 assert!(pro.max_events_per_second < enterprise.max_events_per_second);
1200 }
1201
1202 #[test]
1203 fn test_usage_record_event() {
1204 let mut usage = TenantUsage::default();
1205 assert!(usage.record_event(100));
1206 assert_eq!(usage.events_processed, 1);
1207 assert_eq!(usage.events_in_window, 1);
1208 }
1209
1210 #[test]
1211 fn test_usage_rate_limit() {
1212 let mut usage = TenantUsage::default();
1213 assert!(usage.record_event(2));
1215 assert!(usage.record_event(2));
1216 assert!(!usage.record_event(2));
1218 }
1219
1220 #[test]
1221 fn test_usage_no_rate_limit() {
1222 let mut usage = TenantUsage::default();
1223 for _ in 0..1000 {
1225 assert!(usage.record_event(0));
1226 }
1227 }
1228
1229 #[test]
1230 fn test_tenant_manager_create() {
1231 let mut mgr = TenantManager::new();
1232 let id = mgr
1233 .create_tenant("Test Corp".into(), "key-123".into(), TenantQuota::free())
1234 .unwrap();
1235
1236 assert_eq!(mgr.tenant_count(), 1);
1237 assert!(mgr.get_tenant(&id).is_some());
1238 assert_eq!(mgr.get_tenant(&id).unwrap().name, "Test Corp");
1239 }
1240
1241 #[test]
1242 fn test_tenant_manager_api_key_lookup() {
1243 let mut mgr = TenantManager::new();
1244 let id = mgr
1245 .create_tenant("Test".into(), "my-key".into(), TenantQuota::default())
1246 .unwrap();
1247
1248 let found = mgr.get_tenant_by_api_key("my-key");
1249 assert_eq!(found, Some(&id));
1250
1251 assert!(mgr.get_tenant_by_api_key("wrong-key").is_none());
1252 }
1253
1254 #[test]
1255 fn test_tenant_manager_duplicate_api_key() {
1256 let mut mgr = TenantManager::new();
1257 mgr.create_tenant("A".into(), "key-1".into(), TenantQuota::default())
1258 .unwrap();
1259 let result = mgr.create_tenant("B".into(), "key-1".into(), TenantQuota::default());
1260 assert!(result.is_err());
1261 }
1262
1263 #[test]
1264 fn test_tenant_manager_remove() {
1265 let mut mgr = TenantManager::new();
1266 let id = mgr
1267 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1268 .unwrap();
1269
1270 mgr.remove_tenant(&id).unwrap();
1271 assert_eq!(mgr.tenant_count(), 0);
1272 assert!(mgr.get_tenant_by_api_key("key-1").is_none());
1273 }
1274
1275 #[tokio::test]
1276 async fn test_tenant_deploy_pipeline() {
1277 let mut mgr = TenantManager::new();
1278 let id = mgr
1279 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1280 .unwrap();
1281
1282 let tenant = mgr.get_tenant_mut(&id).unwrap();
1283 let vpl = r"
1284 stream Alerts = SensorReading
1285 .where(temperature > 100)
1286 ";
1287 let pipeline_id = tenant
1288 .deploy_pipeline("My Pipeline".into(), vpl.into())
1289 .await
1290 .unwrap();
1291 assert_eq!(tenant.pipelines.len(), 1);
1292 assert_eq!(tenant.usage.active_pipelines, 1);
1293 assert_eq!(
1294 tenant.pipelines[&pipeline_id].status,
1295 PipelineStatus::Running
1296 );
1297 }
1298
1299 #[tokio::test]
1300 async fn test_tenant_pipeline_quota() {
1301 let mut mgr = TenantManager::new();
1302 let quota = TenantQuota {
1303 max_pipelines: 1,
1304 max_events_per_second: 100,
1305 max_streams_per_pipeline: 50,
1306 };
1307 let id = mgr
1308 .create_tenant("Test".into(), "key-1".into(), quota)
1309 .unwrap();
1310
1311 let tenant = mgr.get_tenant_mut(&id).unwrap();
1312 let vpl = "stream A = SensorReading .where(x > 1)";
1313 tenant
1314 .deploy_pipeline("P1".into(), vpl.into())
1315 .await
1316 .unwrap();
1317
1318 let result = tenant.deploy_pipeline("P2".into(), vpl.into()).await;
1320 assert!(result.is_err());
1321 }
1322
1323 #[tokio::test]
1324 async fn test_tenant_remove_pipeline() {
1325 let mut mgr = TenantManager::new();
1326 let id = mgr
1327 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1328 .unwrap();
1329
1330 let tenant = mgr.get_tenant_mut(&id).unwrap();
1331 let vpl = "stream A = SensorReading .where(x > 1)";
1332 let pid = tenant
1333 .deploy_pipeline("P1".into(), vpl.into())
1334 .await
1335 .unwrap();
1336
1337 tenant.remove_pipeline(&pid).unwrap();
1338 assert_eq!(tenant.pipelines.len(), 0);
1339 assert_eq!(tenant.usage.active_pipelines, 0);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_tenant_parse_error() {
1344 let mut mgr = TenantManager::new();
1345 let id = mgr
1346 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1347 .unwrap();
1348
1349 let tenant = mgr.get_tenant_mut(&id).unwrap();
1350 let result = tenant
1351 .deploy_pipeline("Bad".into(), "this is not valid VPL {{{{".into())
1352 .await;
1353 assert!(result.is_err());
1354 }
1355
1356 #[tokio::test]
1357 async fn test_tenant_process_event() {
1358 let mut mgr = TenantManager::new();
1359 let id = mgr
1360 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1361 .unwrap();
1362
1363 let tenant = mgr.get_tenant_mut(&id).unwrap();
1364 let vpl = "stream A = SensorReading .where(temperature > 100)";
1365 let pid = tenant
1366 .deploy_pipeline("P1".into(), vpl.into())
1367 .await
1368 .unwrap();
1369
1370 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1371 tenant.process_event(&pid, event).await.unwrap();
1372 assert_eq!(tenant.usage.events_processed, 1);
1373 }
1374
1375 #[tokio::test]
1376 async fn test_tenant_rate_limit_on_process() {
1377 let mut mgr = TenantManager::new();
1378 let quota = TenantQuota {
1379 max_pipelines: 10,
1380 max_events_per_second: 2,
1381 max_streams_per_pipeline: 50,
1382 };
1383 let id = mgr
1384 .create_tenant("Test".into(), "key-1".into(), quota)
1385 .unwrap();
1386
1387 let tenant = mgr.get_tenant_mut(&id).unwrap();
1388 let vpl = "stream A = SensorReading .where(x > 1)";
1389 let pid = tenant
1390 .deploy_pipeline("P1".into(), vpl.into())
1391 .await
1392 .unwrap();
1393
1394 let event = Event::new("SensorReading").with_field("x", 5);
1395 tenant.process_event(&pid, event.clone()).await.unwrap();
1396 tenant.process_event(&pid, event.clone()).await.unwrap();
1397
1398 let result = tenant.process_event(&pid, event).await;
1400 assert!(result.is_err());
1401 }
1402
1403 #[tokio::test]
1404 async fn test_tenant_reload_pipeline() {
1405 let mut mgr = TenantManager::new();
1406 let id = mgr
1407 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1408 .unwrap();
1409
1410 let tenant = mgr.get_tenant_mut(&id).unwrap();
1411 let vpl1 = "stream A = SensorReading .where(x > 1)";
1412 let pid = tenant
1413 .deploy_pipeline("P1".into(), vpl1.into())
1414 .await
1415 .unwrap();
1416
1417 let vpl2 = "stream B = SensorReading .where(x > 50)";
1418 tenant.reload_pipeline(&pid, vpl2.into()).await.unwrap();
1419
1420 assert_eq!(tenant.pipelines[&pid].source, vpl2);
1421 }
1422
1423 #[test]
1424 fn test_pipeline_status_display() {
1425 assert_eq!(format!("{}", PipelineStatus::Running), "running");
1426 assert_eq!(format!("{}", PipelineStatus::Stopped), "stopped");
1427 assert_eq!(
1428 format!("{}", PipelineStatus::Error("oops".into())),
1429 "error: oops"
1430 );
1431 }
1432
1433 #[test]
1434 fn test_tenant_error_display() {
1435 assert!(format!("{}", TenantError::NotFound("t1".into())).contains("t1"));
1436 assert!(format!("{}", TenantError::RateLimitExceeded).contains("rate limit"));
1437 let engine_err = crate::engine::error::EngineError::Compilation("bad".into());
1438 assert!(format!("{}", TenantError::EngineError(engine_err)).contains("bad"));
1439 }
1440
1441 #[test]
1442 fn test_shared_tenant_manager() {
1443 let mgr = shared_tenant_manager();
1444 assert!(Arc::strong_count(&mgr) == 1);
1445 }
1446
1447 #[test]
1448 fn test_tenant_list() {
1449 let mut mgr = TenantManager::new();
1450 mgr.create_tenant("A".into(), "key-a".into(), TenantQuota::default())
1451 .unwrap();
1452 mgr.create_tenant("B".into(), "key-b".into(), TenantQuota::default())
1453 .unwrap();
1454 assert_eq!(mgr.list_tenants().len(), 2);
1455 }
1456
1457 #[test]
1458 fn test_tenant_manager_default() {
1459 let mgr = TenantManager::default();
1460 assert_eq!(mgr.tenant_count(), 0);
1461 }
1462
1463 #[tokio::test]
1464 async fn test_tenant_snapshot_roundtrip() {
1465 let mut mgr = TenantManager::new();
1466 let id = mgr
1467 .create_tenant("Snap Corp".into(), "snap-key".into(), TenantQuota::pro())
1468 .unwrap();
1469
1470 let tenant = mgr.get_tenant_mut(&id).unwrap();
1471 let vpl = "stream A = SensorReading .where(x > 1)";
1472 tenant
1473 .deploy_pipeline("Pipeline1".into(), vpl.into())
1474 .await
1475 .unwrap();
1476 tenant.usage.events_processed = 42;
1477 tenant.usage.output_events_emitted = 7;
1478
1479 let snapshot = tenant.snapshot();
1480 let json = serde_json::to_vec(&snapshot).unwrap();
1481 let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1482
1483 assert_eq!(restored.id, id.0);
1484 assert_eq!(restored.name, "Snap Corp");
1485 assert_eq!(restored.api_key, "snap-key");
1486 assert_eq!(restored.events_processed, 42);
1487 assert_eq!(restored.output_events_emitted, 7);
1488 assert_eq!(restored.pipelines.len(), 1);
1489 assert_eq!(restored.pipelines[0].name, "Pipeline1");
1490 assert_eq!(restored.pipelines[0].source, vpl);
1491 assert_eq!(restored.pipelines[0].status, PipelineStatus::Running);
1492 assert_eq!(
1493 restored.quota.max_pipelines,
1494 TenantQuota::pro().max_pipelines
1495 );
1496 }
1497
1498 #[tokio::test]
1499 async fn test_tenant_manager_persistence_and_recovery() {
1500 use crate::persistence::MemoryStore;
1501
1502 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1503 let vpl = "stream A = SensorReading .where(x > 1)";
1504
1505 let (tenant_id, pipeline_name) = {
1507 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1508 let id = mgr
1509 .create_tenant(
1510 "Persisted Corp".into(),
1511 "persist-key".into(),
1512 TenantQuota::default(),
1513 )
1514 .unwrap();
1515
1516 let tenant = mgr.get_tenant_mut(&id).unwrap();
1517 tenant
1518 .deploy_pipeline("Persistent Pipeline".into(), vpl.into())
1519 .await
1520 .unwrap();
1521 tenant.usage.events_processed = 100;
1522 tenant.usage.output_events_emitted = 5;
1523 mgr.persist_if_needed(&id);
1524
1525 (id.0.clone(), "Persistent Pipeline".to_string())
1526 };
1527
1528 let mut mgr2 = TenantManager::with_store(Arc::clone(&store));
1530 let recovered = mgr2.recover().unwrap();
1531 assert_eq!(recovered, 1);
1532 assert_eq!(mgr2.tenant_count(), 1);
1533
1534 let tid = TenantId::new(&tenant_id);
1536 let tenant = mgr2.get_tenant(&tid).unwrap();
1537 assert_eq!(tenant.name, "Persisted Corp");
1538 assert_eq!(tenant.api_key, "persist-key");
1539 assert_eq!(tenant.usage.events_processed, 100);
1540 assert_eq!(tenant.usage.output_events_emitted, 5);
1541 assert_eq!(tenant.pipelines.len(), 1);
1542
1543 let pipeline = tenant.pipelines.values().next().unwrap();
1544 assert_eq!(pipeline.name, pipeline_name);
1545 assert_eq!(pipeline.source, vpl);
1546 assert_eq!(pipeline.status, PipelineStatus::Running);
1547
1548 assert_eq!(mgr2.get_tenant_by_api_key("persist-key"), Some(&tid));
1550 }
1551
1552 #[tokio::test]
1553 async fn test_persistence_survives_restart() {
1554 use crate::persistence::MemoryStore;
1555
1556 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1557 let vpl = "stream A = SensorReading .where(x > 1)";
1558
1559 {
1561 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1562 let id1 = mgr
1563 .create_tenant("Tenant A".into(), "key-a".into(), TenantQuota::free())
1564 .unwrap();
1565 let id2 = mgr
1566 .create_tenant("Tenant B".into(), "key-b".into(), TenantQuota::pro())
1567 .unwrap();
1568
1569 let tenant_a = mgr.get_tenant_mut(&id1).unwrap();
1570 tenant_a
1571 .deploy_pipeline("P1".into(), vpl.into())
1572 .await
1573 .unwrap();
1574 mgr.persist_if_needed(&id1);
1575
1576 let tenant_b = mgr.get_tenant_mut(&id2).unwrap();
1577 tenant_b
1578 .deploy_pipeline("P2".into(), vpl.into())
1579 .await
1580 .unwrap();
1581 tenant_b
1582 .deploy_pipeline("P3".into(), vpl.into())
1583 .await
1584 .unwrap();
1585 mgr.persist_if_needed(&id2);
1586 }
1587
1588 {
1590 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1591 let recovered = mgr.recover().unwrap();
1592 assert_eq!(recovered, 2);
1593 assert_eq!(mgr.tenant_count(), 2);
1594
1595 let tid_a = mgr.get_tenant_by_api_key("key-a").unwrap().clone();
1597 let tenant_a = mgr.get_tenant(&tid_a).unwrap();
1598 assert_eq!(tenant_a.name, "Tenant A");
1599 assert_eq!(tenant_a.pipelines.len(), 1);
1600
1601 let tid_b = mgr.get_tenant_by_api_key("key-b").unwrap().clone();
1603 let tenant_b = mgr.get_tenant(&tid_b).unwrap();
1604 assert_eq!(tenant_b.name, "Tenant B");
1605 assert_eq!(tenant_b.pipelines.len(), 2);
1606 }
1607 }
1608
1609 #[test]
1610 fn test_tenant_snapshot_created_at_and_window() {
1611 let mut mgr = TenantManager::new();
1612 let id = mgr
1613 .create_tenant(
1614 "Window Corp".into(),
1615 "window-key".into(),
1616 TenantQuota::default(),
1617 )
1618 .unwrap();
1619
1620 let tenant = mgr.get_tenant_mut(&id).unwrap();
1621 tenant.usage.events_in_window = 42;
1622 tenant.usage.events_processed = 100;
1623
1624 let snapshot = tenant.snapshot();
1625
1626 assert!(snapshot.created_at_ms.is_some());
1628 let ts = snapshot.created_at_ms.unwrap();
1629 assert!(ts > 1_704_067_200_000);
1631
1632 assert_eq!(snapshot.events_in_window, 42);
1634
1635 let json = serde_json::to_vec(&snapshot).unwrap();
1637 let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1638 assert_eq!(restored.events_in_window, 42);
1639 assert_eq!(restored.created_at_ms, snapshot.created_at_ms);
1640
1641 let restored_tenant = TenantManager::restore_tenant_from_snapshot(restored).unwrap();
1643 assert_eq!(restored_tenant.usage.events_in_window, 42);
1644 assert_eq!(restored_tenant.usage.events_processed, 100);
1645 }
1646
1647 #[test]
1648 fn test_tenant_snapshot_backwards_compat() {
1649 let old_json = r#"{
1651 "id": "compat-tenant",
1652 "name": "Compat Corp",
1653 "api_key": "compat-key",
1654 "quota": {
1655 "max_pipelines": 10,
1656 "max_events_per_second": 10000,
1657 "max_streams_per_pipeline": 50
1658 },
1659 "events_processed": 55,
1660 "alerts_generated": 3,
1661 "pipelines": []
1662 }"#;
1663
1664 let snapshot: TenantSnapshot = serde_json::from_str(old_json).unwrap();
1665 assert_eq!(snapshot.id, "compat-tenant");
1666 assert_eq!(snapshot.events_processed, 55);
1667 assert_eq!(snapshot.created_at_ms, None);
1669 assert_eq!(snapshot.events_in_window, 0);
1670
1671 let tenant = TenantManager::restore_tenant_from_snapshot(snapshot).unwrap();
1673 assert_eq!(tenant.name, "Compat Corp");
1674 assert_eq!(tenant.usage.events_processed, 55);
1675 assert_eq!(tenant.usage.events_in_window, 0);
1676 }
1677
1678 #[test]
1679 fn test_recovery_with_invalid_vpl() {
1680 use crate::persistence::MemoryStore;
1681
1682 let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1683
1684 let snapshot = TenantSnapshot {
1686 id: "bad-tenant".into(),
1687 name: "Bad Corp".into(),
1688 api_key: "bad-key".into(),
1689 quota: TenantQuota::default(),
1690 events_processed: 0,
1691 output_events_emitted: 0,
1692 pipelines: vec![PipelineSnapshot {
1693 id: "bad-pipeline".into(),
1694 name: "Bad Pipeline".into(),
1695 source: "this is not valid VPL {{{{".into(),
1696 status: PipelineStatus::Running,
1697 }],
1698 created_at_ms: None,
1699 events_in_window: 0,
1700 };
1701
1702 let data = serde_json::to_vec(&snapshot).unwrap();
1703 store.put("tenant:bad-tenant", &data).unwrap();
1704 let index = serde_json::to_vec(&vec!["bad-tenant"]).unwrap();
1705 store.put("tenants:index", &index).unwrap();
1706
1707 let mut mgr = TenantManager::with_store(Arc::clone(&store));
1709 let recovered = mgr.recover().unwrap();
1710 assert_eq!(recovered, 1);
1711
1712 let tid = TenantId::new("bad-tenant");
1713 let tenant = mgr.get_tenant(&tid).unwrap();
1714 assert_eq!(tenant.name, "Bad Corp");
1715 assert_eq!(tenant.pipelines.len(), 0);
1717 }
1718
1719 #[test]
1720 fn test_backpressure_disabled_by_default() {
1721 let mgr = TenantManager::new();
1722 assert_eq!(mgr.max_queue_depth(), 0);
1723 assert!(mgr.check_backpressure().is_ok());
1724 }
1725
1726 #[test]
1727 fn test_backpressure_set_max_queue_depth() {
1728 let mut mgr = TenantManager::new();
1729 mgr.set_max_queue_depth(100);
1730 assert_eq!(mgr.max_queue_depth(), 100);
1731 assert!(mgr.check_backpressure().is_ok());
1733 }
1734
1735 #[test]
1736 fn test_backpressure_exceeded() {
1737 let mut mgr = TenantManager::new();
1738 mgr.set_max_queue_depth(10);
1739
1740 mgr.pending_events.store(10, Ordering::Relaxed);
1742 let result = mgr.check_backpressure();
1743 assert!(result.is_err());
1744
1745 if let Err(TenantError::BackpressureExceeded { current, max }) = result {
1746 assert_eq!(current, 10);
1747 assert_eq!(max, 10);
1748 } else {
1749 panic!("Expected BackpressureExceeded error");
1750 }
1751 }
1752
1753 #[test]
1754 fn test_backpressure_not_exceeded() {
1755 let mut mgr = TenantManager::new();
1756 mgr.set_max_queue_depth(10);
1757 mgr.pending_events.store(9, Ordering::Relaxed);
1758 assert!(mgr.check_backpressure().is_ok());
1759 }
1760
1761 #[test]
1762 fn test_backpressure_unlimited_when_zero() {
1763 let mut mgr = TenantManager::new();
1764 mgr.set_max_queue_depth(0);
1765 mgr.pending_events.store(1_000_000, Ordering::Relaxed);
1767 assert!(mgr.check_backpressure().is_ok());
1768 }
1769
1770 #[test]
1771 fn test_queue_pressure_ratio() {
1772 let mut mgr = TenantManager::new();
1773 mgr.set_max_queue_depth(100);
1774 mgr.pending_events.store(50, Ordering::Relaxed);
1775 let ratio = mgr.queue_pressure_ratio();
1776 assert!((ratio - 0.5).abs() < f64::EPSILON);
1777 }
1778
1779 #[test]
1780 fn test_queue_pressure_ratio_zero_max() {
1781 let mgr = TenantManager::new();
1782 assert_eq!(mgr.queue_pressure_ratio(), 0.0);
1783 }
1784
1785 #[test]
1786 fn test_pending_events_counter() {
1787 let mgr = TenantManager::new();
1788 assert_eq!(mgr.pending_event_count(), 0);
1789 let counter = mgr.pending_events_counter();
1790 counter.fetch_add(5, Ordering::Relaxed);
1791 assert_eq!(mgr.pending_event_count(), 5);
1792 }
1793
1794 #[tokio::test]
1795 async fn test_process_event_with_backpressure_ok() {
1796 let mut mgr = TenantManager::new();
1797 mgr.set_max_queue_depth(100);
1798 let id = mgr
1799 .create_tenant("Test".into(), "key-bp".into(), TenantQuota::default())
1800 .unwrap();
1801
1802 let tenant = mgr.get_tenant_mut(&id).unwrap();
1803 let vpl = "stream A = SensorReading .where(temperature > 100)";
1804 let pid = tenant
1805 .deploy_pipeline("BP Pipeline".into(), vpl.into())
1806 .await
1807 .unwrap();
1808
1809 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1810 let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1811 assert!(result.is_ok());
1812 assert_eq!(mgr.pending_event_count(), 0);
1814 }
1815
1816 #[tokio::test]
1817 async fn test_process_event_with_backpressure_rejected() {
1818 let mut mgr = TenantManager::new();
1819 mgr.set_max_queue_depth(5);
1820 mgr.pending_events.store(5, Ordering::Relaxed);
1822
1823 let id = mgr
1824 .create_tenant("Test".into(), "key-bp2".into(), TenantQuota::default())
1825 .unwrap();
1826 let tenant = mgr.get_tenant_mut(&id).unwrap();
1827 let vpl = "stream A = SensorReading .where(temperature > 100)";
1828 let pid = tenant
1829 .deploy_pipeline("BP Pipeline 2".into(), vpl.into())
1830 .await
1831 .unwrap();
1832
1833 let event = Event::new("SensorReading").with_field("temperature", 150.0);
1834 let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1835 assert!(result.is_err());
1836 match result {
1837 Err(TenantError::BackpressureExceeded { current, max }) => {
1838 assert_eq!(current, 5);
1839 assert_eq!(max, 5);
1840 }
1841 _ => panic!("Expected BackpressureExceeded"),
1842 }
1843 }
1844
1845 #[test]
1846 fn test_backpressure_error_display() {
1847 let err = TenantError::BackpressureExceeded {
1848 current: 50000,
1849 max: 50000,
1850 };
1851 let msg = format!("{err}");
1852 assert!(msg.contains("50000"));
1853 assert!(msg.contains("exceeds maximum"));
1854 }
1855
1856 #[tokio::test]
1857 async fn test_collect_pipeline_metrics_returns_event_counts() {
1858 let mut mgr = TenantManager::new();
1859 let id = mgr
1860 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1861 .unwrap();
1862
1863 let vpl = "stream A = SensorReading .where(temperature > 100)";
1865 mgr.deploy_pipeline_on_tenant(&id, "test-pipeline".into(), vpl.into())
1866 .await
1867 .unwrap();
1868
1869 let metrics = mgr.collect_pipeline_metrics().await;
1871 assert_eq!(metrics.len(), 1, "Should have 1 pipeline");
1872 assert_eq!(metrics[0].0, "test-pipeline");
1873 assert_eq!(metrics[0].1, 0, "events_in should be 0 before processing");
1874 assert_eq!(metrics[0].2, 0, "events_out should be 0 before processing");
1875
1876 let tenant = mgr.get_tenant_mut(&id).unwrap();
1878 let pid = tenant.pipelines.keys().next().unwrap().clone();
1879 for i in 0..10 {
1880 let event = Event::new("SensorReading")
1881 .with_field("temperature", (i as f64).mul_add(20.0, 50.0));
1882 tenant.process_event(&pid, event).await.unwrap();
1883 }
1884
1885 let metrics = mgr.collect_pipeline_metrics().await;
1887 assert_eq!(metrics.len(), 1);
1888 assert_eq!(metrics[0].0, "test-pipeline");
1889 assert!(
1890 metrics[0].1 > 0,
1891 "events_in should be > 0 after processing, got {}",
1892 metrics[0].1
1893 );
1894 }
1895
1896 #[tokio::test]
1897 async fn test_collect_pipeline_metrics_multiple_tenants() {
1898 let mut mgr = TenantManager::new();
1899 let id1 = mgr
1900 .create_tenant("Tenant1".into(), "key-1".into(), TenantQuota::default())
1901 .unwrap();
1902 let id2 = mgr
1903 .create_tenant("Tenant2".into(), "key-2".into(), TenantQuota::default())
1904 .unwrap();
1905
1906 let vpl = "stream A = SensorReading .where(temperature > 100)";
1907 mgr.deploy_pipeline_on_tenant(&id1, "pipeline-1".into(), vpl.into())
1908 .await
1909 .unwrap();
1910 mgr.deploy_pipeline_on_tenant(&id2, "pipeline-2".into(), vpl.into())
1911 .await
1912 .unwrap();
1913
1914 let metrics = mgr.collect_pipeline_metrics().await;
1915 assert_eq!(metrics.len(), 2, "Should have 2 pipelines across 2 tenants");
1916 let names: Vec<&str> = metrics.iter().map(|(n, _, _)| n.as_str()).collect();
1917 assert!(names.contains(&"pipeline-1"));
1918 assert!(names.contains(&"pipeline-2"));
1919 }
1920
1921 #[tokio::test]
1922 async fn test_collect_pipeline_metrics_empty_when_no_pipelines() {
1923 let mut mgr = TenantManager::new();
1924 let _id = mgr
1925 .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1926 .unwrap();
1927
1928 let metrics = mgr.collect_pipeline_metrics().await;
1929 assert!(
1930 metrics.is_empty(),
1931 "Should be empty when no pipelines deployed"
1932 );
1933 }
1934
1935 #[tokio::test]
1936 async fn test_deploy_pipeline_on_tenant_and_collect_metrics() {
1937 let mut mgr = TenantManager::new();
1940 let id = mgr
1941 .create_tenant(
1942 "default".into(),
1943 "api-key-123".into(),
1944 TenantQuota::enterprise(),
1945 )
1946 .unwrap();
1947
1948 let vpl = r"
1949 event MarketTick:
1950 symbol: str
1951 price: float
1952 stream Ticks = MarketTick .where(price > 100)
1953 ";
1954 let pipeline_id = mgr
1955 .deploy_pipeline_on_tenant(&id, "financial-cep".into(), vpl.into())
1956 .await
1957 .unwrap();
1958
1959 let metrics = mgr.collect_pipeline_metrics().await;
1961 assert_eq!(metrics.len(), 1);
1962 assert_eq!(metrics[0].0, "financial-cep");
1963
1964 for _ in 0..5 {
1966 let event = Event::new("MarketTick")
1967 .with_field("symbol", "AAPL")
1968 .with_field("price", 150.0);
1969 mgr.process_event_with_backpressure(&id, &pipeline_id, event)
1970 .await
1971 .unwrap();
1972 }
1973
1974 let metrics = mgr.collect_pipeline_metrics().await;
1975 assert_eq!(metrics[0].1, 5, "Should have processed 5 events");
1976 }
1977}