Skip to main content

varpulis_runtime/
tenant.rs

1//! Multi-tenant isolation for SaaS deployment
2//!
3//! Provides tenant-scoped engine instances with resource limits and usage tracking.
4
5use crate::context::ContextOrchestrator;
6use crate::engine::Engine;
7use crate::event::Event;
8use crate::metrics::Metrics;
9use crate::persistence::{StateStore, StoreError};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::{mpsc, RwLock};
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19/// Unique identifier for a tenant
20#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
21pub struct TenantId(pub String);
22
23impl TenantId {
24    pub fn new(id: impl Into<String>) -> Self {
25        Self(id.into())
26    }
27
28    pub fn generate() -> Self {
29        Self(Uuid::new_v4().to_string())
30    }
31
32    pub fn as_str(&self) -> &str {
33        &self.0
34    }
35}
36
37impl std::fmt::Display for TenantId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}", self.0)
40    }
41}
42
43/// Resource limits for a tenant
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TenantQuota {
46    /// Maximum number of pipelines
47    pub max_pipelines: usize,
48    /// Maximum events per second
49    pub max_events_per_second: u64,
50    /// Maximum streams per pipeline
51    pub max_streams_per_pipeline: usize,
52}
53
54impl Default for TenantQuota {
55    fn default() -> Self {
56        Self {
57            max_pipelines: 10,
58            max_events_per_second: 10_000,
59            max_streams_per_pipeline: 50,
60        }
61    }
62}
63
64impl TenantQuota {
65    /// Free tier limits
66    pub const fn free() -> Self {
67        Self {
68            max_pipelines: 2,
69            max_events_per_second: 100,
70            max_streams_per_pipeline: 5,
71        }
72    }
73
74    /// Pro tier limits
75    pub const fn pro() -> Self {
76        Self {
77            max_pipelines: 20,
78            max_events_per_second: 50_000,
79            max_streams_per_pipeline: 100,
80        }
81    }
82
83    /// Enterprise tier limits
84    pub const fn enterprise() -> Self {
85        Self {
86            max_pipelines: 1000,
87            max_events_per_second: 500_000,
88            max_streams_per_pipeline: 500,
89        }
90    }
91}
92
93/// Usage statistics for a tenant
94#[derive(Debug, Clone, Default)]
95pub struct TenantUsage {
96    /// Total events processed
97    pub events_processed: u64,
98    /// Events processed in current window (for rate limiting)
99    pub events_in_window: u64,
100    /// Window start time
101    pub window_start: Option<Instant>,
102    /// Number of active pipelines
103    pub active_pipelines: usize,
104    /// Total output events emitted
105    pub output_events_emitted: u64,
106}
107
108impl TenantUsage {
109    /// Record an event and check rate limit. Returns true if within quota.
110    pub fn record_event(&mut self, max_eps: u64) -> bool {
111        self.events_processed += 1;
112
113        let now = Instant::now();
114        match self.window_start {
115            Some(start) if now.duration_since(start).as_secs() < 1 => {
116                self.events_in_window += 1;
117                if max_eps > 0 && self.events_in_window > max_eps {
118                    return false;
119                }
120            }
121            _ => {
122                self.window_start = Some(now);
123                self.events_in_window = 1;
124            }
125        }
126        true
127    }
128
129    pub const fn record_output_event(&mut self) {
130        self.output_events_emitted += 1;
131    }
132}
133
134/// A pipeline deployed by a tenant
135#[derive(Debug)]
136pub struct Pipeline {
137    /// Pipeline identifier
138    pub id: String,
139    /// Human-readable name
140    pub name: String,
141    /// The VPL source code
142    pub source: String,
143    /// The engine running this pipeline (shared with source ingestion loop)
144    pub engine: Arc<tokio::sync::Mutex<Engine>>,
145    /// Output event receiver for this pipeline
146    pub output_rx: mpsc::Receiver<Event>,
147    /// Broadcast sender for log streaming (SSE subscribers).
148    /// Output events are forwarded here after being drained from output_rx.
149    pub log_broadcast: tokio::sync::broadcast::Sender<Event>,
150    /// When the pipeline was created
151    pub created_at: Instant,
152    /// Pipeline status
153    pub status: PipelineStatus,
154    /// Optional context orchestrator for multi-threaded execution
155    pub orchestrator: Option<ContextOrchestrator>,
156    /// Managed connector registry (keeps source connections alive)
157    pub connector_registry: Option<crate::connector::ManagedConnectorRegistry>,
158}
159
160/// Pipeline status
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub enum PipelineStatus {
163    Running,
164    Stopped,
165    Error(String),
166}
167
168impl std::fmt::Display for PipelineStatus {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match self {
171            Self::Running => write!(f, "running"),
172            Self::Stopped => write!(f, "stopped"),
173            Self::Error(msg) => write!(f, "error: {msg}"),
174        }
175    }
176}
177
178/// Errors from tenant operations
179#[derive(Debug, thiserror::Error)]
180pub enum TenantError {
181    /// Tenant not found
182    #[error("tenant not found: {0}")]
183    NotFound(String),
184    /// Pipeline not found
185    #[error("pipeline not found: {0}")]
186    PipelineNotFound(String),
187    /// Quota exceeded
188    #[error("quota exceeded: {0}")]
189    QuotaExceeded(String),
190    /// Rate limit exceeded
191    #[error("rate limit exceeded")]
192    RateLimitExceeded,
193    /// Backpressure: queue depth exceeds configured maximum
194    #[error("queue depth {current} exceeds maximum {max}")]
195    BackpressureExceeded {
196        /// Current queue depth
197        current: u64,
198        /// Configured maximum
199        max: u64,
200    },
201    /// Parse error in VPL source
202    #[error("parse error: {0}")]
203    ParseError(#[from] varpulis_parser::ParseError),
204    /// Engine error
205    #[error("engine error: {0}")]
206    EngineError(#[from] crate::engine::error::EngineError),
207    /// Tenant already exists
208    #[error("tenant already exists: {0}")]
209    AlreadyExists(String),
210}
211
212/// A single tenant with its pipelines and quotas
213#[derive(Debug)]
214pub struct Tenant {
215    /// Tenant identifier
216    pub id: TenantId,
217    /// Display name
218    pub name: String,
219    /// API key for authentication
220    pub api_key: String,
221    /// Resource quotas
222    pub quota: TenantQuota,
223    /// Usage statistics
224    pub usage: TenantUsage,
225    /// Active pipelines
226    pub pipelines: HashMap<String, Pipeline>,
227    /// When the tenant was created
228    pub created_at: Instant,
229    /// Optional WebSocket broadcast for output event relay
230    pub(crate) ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
231}
232
233impl Tenant {
234    pub fn new(id: TenantId, name: String, api_key: String, quota: TenantQuota) -> Self {
235        Self {
236            id,
237            name,
238            api_key,
239            quota,
240            usage: TenantUsage::default(),
241            pipelines: HashMap::new(),
242            created_at: Instant::now(),
243            ws_broadcast: None,
244        }
245    }
246
247    /// Deploy a new pipeline from VPL source code
248    pub async fn deploy_pipeline(
249        &mut self,
250        name: String,
251        source: String,
252    ) -> Result<String, TenantError> {
253        self.deploy_pipeline_with_metrics(name, source, None).await
254    }
255
256    pub async fn deploy_pipeline_with_metrics(
257        &mut self,
258        name: String,
259        source: String,
260        prometheus_metrics: Option<Metrics>,
261    ) -> Result<String, TenantError> {
262        // Check pipeline quota
263        if self.pipelines.len() >= self.quota.max_pipelines {
264            return Err(TenantError::QuotaExceeded(format!(
265                "max pipelines ({}) reached",
266                self.quota.max_pipelines
267            )));
268        }
269
270        // Parse the VPL source
271        let program = varpulis_parser::parse(&source)?;
272
273        // Check streams quota
274        let stream_count = program
275            .statements
276            .iter()
277            .filter(|s| matches!(&s.node, varpulis_core::ast::Stmt::StreamDecl { .. }))
278            .count();
279        if stream_count > self.quota.max_streams_per_pipeline {
280            return Err(TenantError::QuotaExceeded(format!(
281                "pipeline has {} streams, max is {}",
282                stream_count, self.quota.max_streams_per_pipeline
283            )));
284        }
285
286        // Create a new engine
287        let (output_tx, output_rx) = mpsc::channel(1000);
288        let output_tx_for_ctx = output_tx.clone();
289        let mut engine = Engine::new(output_tx);
290        if let Some(m) = prometheus_metrics {
291            engine = engine.with_metrics(m);
292        }
293        engine.load(&program)?;
294
295        // Build context orchestrator if the program declares contexts
296        let orchestrator = if engine.has_contexts() {
297            // Context orchestrator connects sinks in each context thread
298            match ContextOrchestrator::build(
299                engine.context_map(),
300                &program,
301                output_tx_for_ctx,
302                1000,
303            ) {
304                Ok(orch) => Some(orch),
305                Err(e) => {
306                    return Err(crate::engine::error::EngineError::Pipeline(format!(
307                        "Failed to build context orchestrator: {e}"
308                    ))
309                    .into());
310                }
311            }
312        } else {
313            // No contexts - connect sinks on the main engine
314            engine.connect_sinks().await?;
315            None
316        };
317
318        // Start source connectors (MQTT/Kafka subscriptions)
319        let bindings = engine.source_bindings().to_vec();
320        let mut connector_registry = None;
321
322        if !bindings.is_empty() {
323            let (event_tx, event_rx) = mpsc::channel::<Event>(10_000);
324
325            let mut registry = crate::connector::ManagedConnectorRegistry::from_configs(
326                engine.connector_configs(),
327            )
328            .map_err(|e| {
329                TenantError::EngineError(crate::engine::error::EngineError::Pipeline(format!(
330                    "Registry build error: {e}"
331                )))
332            })?;
333
334            for binding in &bindings {
335                let config = engine.get_connector(&binding.connector_name).cloned();
336                let Some(config) = config else {
337                    return Err(crate::engine::error::EngineError::Pipeline(format!(
338                        "Connector '{}' referenced in .from() but not declared",
339                        binding.connector_name
340                    ))
341                    .into());
342                };
343
344                let topic = binding
345                    .topic_override
346                    .as_deref()
347                    .or(config.topic.as_deref())
348                    .unwrap_or("varpulis/events/#");
349
350                info!(
351                    "Starting {} source: {} topic={}",
352                    config.connector_type, binding.connector_name, topic
353                );
354
355                registry
356                    .start_source(
357                        &binding.connector_name,
358                        topic,
359                        event_tx.clone(),
360                        &binding.extra_params,
361                    )
362                    .await
363                    .map_err(|e| -> TenantError {
364                        crate::engine::error::EngineError::Pipeline(format!(
365                            "Source start error: {e}"
366                        ))
367                        .into()
368                    })?;
369
370                // Create shared sinks for this connector
371                let sink_keys = engine.sink_keys_for_connector(&binding.connector_name);
372                for sink_key in &sink_keys {
373                    let sink_topic = if let Some(t) =
374                        sink_key.strip_prefix(&format!("{}::", binding.connector_name))
375                    {
376                        t.to_string()
377                    } else {
378                        config
379                            .topic
380                            .clone()
381                            .unwrap_or_else(|| format!("{}-output", binding.connector_name))
382                    };
383
384                    let empty_params = std::collections::HashMap::new();
385                    match registry.create_sink(&binding.connector_name, &sink_topic, &empty_params)
386                    {
387                        Ok(sink) => {
388                            engine.inject_sink(sink_key, sink);
389                        }
390                        Err(e) => {
391                            warn!("Failed to create sink for {}: {}", sink_key, e);
392                        }
393                    }
394                }
395            }
396
397            connector_registry = Some(registry);
398
399            // Wrap engine in Arc<Mutex> for sharing with source ingestion loop
400            let engine = Arc::new(tokio::sync::Mutex::new(engine));
401            let engine_for_source = Arc::clone(&engine);
402
403            // Spawn source event ingestion loop
404            tokio::spawn(async move {
405                let mut event_rx = event_rx;
406                while let Some(event) = event_rx.recv().await {
407                    let mut eng = engine_for_source.lock().await;
408                    if let Err(e) = eng.process(event).await {
409                        warn!("Source event processing error: {}", e);
410                    }
411                }
412                info!("Source ingestion loop ended");
413            });
414
415            let id = Uuid::new_v4().to_string();
416            let (log_tx, _) = tokio::sync::broadcast::channel(256);
417
418            // Spawn output event drain task: connector-sourced pipelines produce
419            // output events via the engine's output_tx, but nobody calls
420            // process_event() to drain them. This task continuously drains
421            // output_rx → log_broadcast so WebSocket/SSE subscribers receive them.
422            let log_tx_for_drain = log_tx.clone();
423            let ws_tx_for_drain = self.ws_broadcast.clone();
424            tokio::spawn(async move {
425                let mut output_rx = output_rx;
426                while let Some(ev) = output_rx.recv().await {
427                    let _ = log_tx_for_drain.send(ev.clone());
428                    // Forward to WebSocket relay channel if configured
429                    if let Some(ref ws_tx) = ws_tx_for_drain {
430                        let msg = serde_json::json!({
431                            "type": "output_event",
432                            "event_type": ev.event_type.to_string(),
433                            "data": ev.data,
434                            "timestamp": ev.timestamp.to_rfc3339(),
435                        });
436                        if let Ok(json) = serde_json::to_string(&msg) {
437                            if ws_tx.send(json).is_err() {
438                                tracing::debug!("No WS subscribers for drain output event");
439                            }
440                        }
441                    }
442                }
443            });
444
445            // The real output_rx was moved into the drain task above;
446            // create a placeholder for the Pipeline struct.
447            let (_drain_tx, placeholder_rx) = mpsc::channel(1);
448
449            let pipeline = Pipeline {
450                id: id.clone(),
451                name,
452                source,
453                engine,
454                output_rx: placeholder_rx,
455                log_broadcast: log_tx,
456                created_at: Instant::now(),
457                status: PipelineStatus::Running,
458                orchestrator,
459                connector_registry,
460            };
461
462            self.pipelines.insert(id.clone(), pipeline);
463            self.usage.active_pipelines = self.pipelines.len();
464
465            return Ok(id);
466        }
467
468        // No source bindings — standard path without Arc<Mutex>
469        let engine = Arc::new(tokio::sync::Mutex::new(engine));
470        let id = Uuid::new_v4().to_string();
471        let (log_tx, _) = tokio::sync::broadcast::channel(256);
472        let pipeline = Pipeline {
473            id: id.clone(),
474            name,
475            source,
476            engine,
477            output_rx,
478            log_broadcast: log_tx,
479            created_at: Instant::now(),
480            status: PipelineStatus::Running,
481            orchestrator,
482            connector_registry,
483        };
484
485        self.pipelines.insert(id.clone(), pipeline);
486        self.usage.active_pipelines = self.pipelines.len();
487
488        Ok(id)
489    }
490
491    /// Remove a pipeline
492    pub fn remove_pipeline(&mut self, pipeline_id: &str) -> Result<(), TenantError> {
493        self.pipelines
494            .remove(pipeline_id)
495            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
496        self.usage.active_pipelines = self.pipelines.len();
497        Ok(())
498    }
499
500    /// Process an event through a specific pipeline
501    pub async fn process_event(
502        &mut self,
503        pipeline_id: &str,
504        event: Event,
505    ) -> Result<Vec<Event>, TenantError> {
506        // Check rate limit
507        if !self.usage.record_event(self.quota.max_events_per_second) {
508            return Err(TenantError::RateLimitExceeded);
509        }
510
511        let pipeline = self
512            .pipelines
513            .get_mut(pipeline_id)
514            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
515
516        if pipeline.status != PipelineStatus::Running {
517            return Err(crate::engine::error::EngineError::Pipeline(format!(
518                "pipeline is {}",
519                pipeline.status
520            ))
521            .into());
522        }
523
524        if let Some(ref orchestrator) = pipeline.orchestrator {
525            // Route through context orchestrator — try non-blocking first
526            let shared_event = std::sync::Arc::new(event);
527            match orchestrator.try_process(shared_event) {
528                Ok(()) => {}
529                Err(crate::context::DispatchError::ChannelFull(msg)) => {
530                    // Extract the event from the ContextMessage and retry with await
531                    if let crate::context::ContextMessage::Event(event) = msg {
532                        orchestrator
533                            .process(event)
534                            .await
535                            .map_err(|e| -> TenantError {
536                                crate::engine::error::EngineError::Pipeline(e).into()
537                            })?;
538                    }
539                }
540                Err(crate::context::DispatchError::ChannelClosed(_)) => {
541                    return Err(crate::engine::error::EngineError::Pipeline(
542                        "Context channel closed".to_string(),
543                    )
544                    .into());
545                }
546            }
547        } else {
548            // Direct engine processing (no contexts, zero overhead)
549            pipeline.engine.lock().await.process(event).await?;
550        }
551
552        // Drain output events from the channel and broadcast to log subscribers
553        let mut output_events = Vec::new();
554        while let Ok(ev) = pipeline.output_rx.try_recv() {
555            // Best-effort broadcast to SSE log subscribers (ignore if no receivers)
556            let _ = pipeline.log_broadcast.send(ev.clone());
557            output_events.push(ev);
558        }
559
560        Ok(output_events)
561    }
562
563    /// Subscribe to a pipeline's output event stream for log streaming.
564    /// Returns a broadcast receiver that yields events as they are produced.
565    pub fn subscribe_pipeline_logs(
566        &self,
567        pipeline_id: &str,
568    ) -> Result<tokio::sync::broadcast::Receiver<Event>, TenantError> {
569        let pipeline = self
570            .pipelines
571            .get(pipeline_id)
572            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
573        Ok(pipeline.log_broadcast.subscribe())
574    }
575
576    /// Create a checkpoint of a pipeline's engine state.
577    pub async fn checkpoint_pipeline(
578        &self,
579        pipeline_id: &str,
580    ) -> Result<crate::persistence::EngineCheckpoint, TenantError> {
581        let pipeline = self
582            .pipelines
583            .get(pipeline_id)
584            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
585
586        let engine = pipeline.engine.lock().await;
587        Ok(engine.create_checkpoint())
588    }
589
590    /// Restore a pipeline's engine state from a checkpoint.
591    pub async fn restore_pipeline(
592        &mut self,
593        pipeline_id: &str,
594        checkpoint: &crate::persistence::EngineCheckpoint,
595    ) -> Result<(), TenantError> {
596        let pipeline = self
597            .pipelines
598            .get_mut(pipeline_id)
599            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
600
601        let mut engine = pipeline.engine.lock().await;
602        engine
603            .restore_checkpoint(checkpoint)
604            .map_err(crate::engine::error::EngineError::Store)?;
605        Ok(())
606    }
607
608    /// Reload a pipeline with new VPL source
609    pub async fn reload_pipeline(
610        &mut self,
611        pipeline_id: &str,
612        source: String,
613    ) -> Result<(), TenantError> {
614        let program = varpulis_parser::parse(&source)?;
615
616        let pipeline = self
617            .pipelines
618            .get_mut(pipeline_id)
619            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
620
621        pipeline.engine.lock().await.reload(&program)?;
622        pipeline.source = source;
623
624        Ok(())
625    }
626}
627
628/// Manages all tenants in the system
629pub struct TenantManager {
630    tenants: HashMap<TenantId, Tenant>,
631    /// API key → TenantId lookup
632    api_key_index: HashMap<String, TenantId>,
633    /// Optional persistent state store
634    store: Option<Arc<dyn StateStore>>,
635    /// Shared Prometheus metrics (passed to engines on deploy)
636    prometheus_metrics: Option<Metrics>,
637    /// Maximum queue depth before rejecting events with backpressure (0 = unlimited)
638    max_queue_depth: u64,
639    /// Atomic counter of events currently being processed across all pipelines
640    pending_events: Arc<AtomicU64>,
641    /// Optional global output event broadcast for WebSocket relay.
642    /// When set, all pipeline output events are forwarded here as JSON strings.
643    ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
644}
645
646impl std::fmt::Debug for TenantManager {
647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648        f.debug_struct("TenantManager")
649            .field("tenant_count", &self.tenants.len())
650            .field("max_queue_depth", &self.max_queue_depth)
651            .field("pending_events", &self.pending_events)
652            .field("has_store", &self.store.is_some())
653            .field("has_ws_broadcast", &self.ws_broadcast.is_some())
654            .finish_non_exhaustive()
655    }
656}
657
658impl TenantManager {
659    pub fn new() -> Self {
660        Self {
661            tenants: HashMap::new(),
662            api_key_index: HashMap::new(),
663            store: None,
664            prometheus_metrics: None,
665            max_queue_depth: 0,
666            pending_events: Arc::new(AtomicU64::new(0)),
667            ws_broadcast: None,
668        }
669    }
670
671    /// Create a new tenant manager backed by a state store
672    pub fn with_store(store: Arc<dyn StateStore>) -> Self {
673        Self {
674            tenants: HashMap::new(),
675            api_key_index: HashMap::new(),
676            store: Some(store),
677            prometheus_metrics: None,
678            max_queue_depth: 0,
679            pending_events: Arc::new(AtomicU64::new(0)),
680            ws_broadcast: None,
681        }
682    }
683
684    /// Set Prometheus metrics to be shared with all engines
685    pub fn set_prometheus_metrics(&mut self, metrics: Metrics) {
686        self.prometheus_metrics = Some(metrics);
687    }
688
689    /// Set maximum queue depth for backpressure (0 = unlimited)
690    pub const fn set_max_queue_depth(&mut self, max_depth: u64) {
691        self.max_queue_depth = max_depth;
692    }
693
694    /// Set a WebSocket broadcast channel for relaying output events.
695    /// When set, all connector-sourced pipeline output events are forwarded
696    /// to this channel as serialized JSON strings.
697    pub fn set_ws_broadcast(&mut self, tx: Arc<tokio::sync::broadcast::Sender<String>>) {
698        for tenant in self.tenants.values_mut() {
699            tenant.ws_broadcast = Some(Arc::clone(&tx));
700        }
701        self.ws_broadcast = Some(tx);
702    }
703
704    /// Get the maximum queue depth setting
705    pub const fn max_queue_depth(&self) -> u64 {
706        self.max_queue_depth
707    }
708
709    /// Get the current number of pending events being processed
710    pub fn pending_event_count(&self) -> u64 {
711        self.pending_events.load(Ordering::Relaxed)
712    }
713
714    /// Get a clone of the pending events counter (for sharing with API handlers)
715    pub fn pending_events_counter(&self) -> Arc<AtomicU64> {
716        Arc::clone(&self.pending_events)
717    }
718
719    /// Check backpressure: returns Err if queue depth exceeds maximum
720    pub fn check_backpressure(&self) -> Result<(), TenantError> {
721        if self.max_queue_depth == 0 {
722            return Ok(());
723        }
724        let current = self.pending_events.load(Ordering::Relaxed);
725        if current >= self.max_queue_depth {
726            return Err(TenantError::BackpressureExceeded {
727                current,
728                max: self.max_queue_depth,
729            });
730        }
731        Ok(())
732    }
733
734    /// Compute the queue pressure ratio (0.0 to 1.0+). Returns 0.0 if max_queue_depth is 0.
735    pub fn queue_pressure_ratio(&self) -> f64 {
736        if self.max_queue_depth == 0 {
737            return 0.0;
738        }
739        self.pending_events.load(Ordering::Relaxed) as f64 / self.max_queue_depth as f64
740    }
741
742    /// Create a new tenant
743    pub fn create_tenant(
744        &mut self,
745        name: String,
746        api_key: String,
747        quota: TenantQuota,
748    ) -> Result<TenantId, TenantError> {
749        if self.api_key_index.contains_key(&api_key) {
750            return Err(TenantError::AlreadyExists(
751                "API key already in use".to_string(),
752            ));
753        }
754
755        let id = TenantId::generate();
756        let mut tenant = Tenant::new(id.clone(), name, api_key.clone(), quota);
757        tenant.ws_broadcast = self.ws_broadcast.clone();
758        self.tenants.insert(id.clone(), tenant);
759        self.api_key_index.insert(api_key, id.clone());
760        self.persist_if_needed(&id);
761        Ok(id)
762    }
763
764    /// Get a tenant by API key
765    pub fn get_tenant_by_api_key(&self, api_key: &str) -> Option<&TenantId> {
766        self.api_key_index.get(api_key)
767    }
768
769    /// Get a tenant by ID
770    pub fn get_tenant(&self, id: &TenantId) -> Option<&Tenant> {
771        self.tenants.get(id)
772    }
773
774    /// Get a mutable tenant by ID
775    pub fn get_tenant_mut(&mut self, id: &TenantId) -> Option<&mut Tenant> {
776        self.tenants.get_mut(id)
777    }
778
779    /// Deploy a pipeline on a tenant, passing through Prometheus metrics
780    pub async fn deploy_pipeline_on_tenant(
781        &mut self,
782        tenant_id: &TenantId,
783        name: String,
784        source: String,
785    ) -> Result<String, TenantError> {
786        let metrics = self.prometheus_metrics.clone();
787        let ws_broadcast = self.ws_broadcast.clone();
788        let tenant = self
789            .tenants
790            .get_mut(tenant_id)
791            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
792        // Ensure tenant has the WS broadcast channel
793        if tenant.ws_broadcast.is_none() {
794            tenant.ws_broadcast = ws_broadcast;
795        }
796        tenant
797            .deploy_pipeline_with_metrics(name, source, metrics)
798            .await
799    }
800
801    /// Process an event through a pipeline with backpressure checking.
802    ///
803    /// Checks queue depth before processing, increments a pending counter during
804    /// processing, and decrements it afterward. Returns `BackpressureExceeded`
805    /// if the queue depth exceeds `max_queue_depth`.
806    pub async fn process_event_with_backpressure(
807        &mut self,
808        tenant_id: &TenantId,
809        pipeline_id: &str,
810        event: Event,
811    ) -> Result<Vec<Event>, TenantError> {
812        self.check_backpressure()?;
813        self.pending_events.fetch_add(1, Ordering::Relaxed);
814        let tenant = self
815            .tenants
816            .get_mut(tenant_id)
817            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
818        let result = tenant.process_event(pipeline_id, event).await;
819        self.pending_events.fetch_sub(1, Ordering::Relaxed);
820
821        // Update pressure ratio metric
822        if let Some(ref metrics) = self.prometheus_metrics {
823            metrics
824                .queue_pressure_ratio
825                .with_label_values(&["_all"])
826                .set(self.queue_pressure_ratio());
827        }
828
829        result
830    }
831
832    /// Remove a tenant and all its pipelines
833    pub fn remove_tenant(&mut self, id: &TenantId) -> Result<(), TenantError> {
834        let tenant = self
835            .tenants
836            .remove(id)
837            .ok_or_else(|| TenantError::NotFound(id.to_string()))?;
838        self.api_key_index.remove(&tenant.api_key);
839        if let Some(ref store) = self.store {
840            if let Err(e) = Self::delete_tenant_state(store.as_ref(), id) {
841                warn!("Failed to delete persisted state for tenant {}: {}", id, e);
842            }
843        }
844        Ok(())
845    }
846
847    /// List all tenants
848    pub fn list_tenants(&self) -> Vec<&Tenant> {
849        self.tenants.values().collect()
850    }
851
852    /// Get total number of tenants
853    pub fn tenant_count(&self) -> usize {
854        self.tenants.len()
855    }
856
857    /// Collect pipeline metrics across all tenants (pipeline_name, events_in, events_out).
858    pub async fn collect_pipeline_metrics(&self) -> Vec<(String, u64, u64)> {
859        let mut metrics = Vec::new();
860        for tenant in self.tenants.values() {
861            for pipeline in tenant.pipelines.values() {
862                let engine = pipeline.engine.lock().await;
863                let (events_in, events_out) = engine.event_counters();
864                metrics.push((pipeline.name.clone(), events_in, events_out));
865            }
866        }
867        metrics
868    }
869
870    /// Collect connector health across all tenants.
871    ///
872    /// Returns `(pipeline_name, connector_name, connector_type, health_report)` tuples.
873    pub fn collect_connector_health(
874        &self,
875    ) -> Vec<(
876        String,
877        String,
878        String,
879        crate::connector::ConnectorHealthReport,
880    )> {
881        let mut results = Vec::new();
882        for tenant in self.tenants.values() {
883            for pipeline in tenant.pipelines.values() {
884                if let Some(ref registry) = pipeline.connector_registry {
885                    for (conn_name, conn_type, report) in registry.health_reports() {
886                        results.push((
887                            pipeline.name.clone(),
888                            conn_name.to_string(),
889                            conn_type.to_string(),
890                            report,
891                        ));
892                    }
893                }
894            }
895        }
896        results
897    }
898
899    /// Persist a tenant's current state to the store (if configured)
900    pub fn persist_if_needed(&self, tenant_id: &TenantId) {
901        if let Some(ref store) = self.store {
902            if let Some(tenant) = self.tenants.get(tenant_id) {
903                if let Err(e) = Self::persist_tenant_to_store(store.as_ref(), tenant) {
904                    warn!("Failed to persist tenant {}: {}", tenant_id, e);
905                }
906            }
907        }
908    }
909
910    /// Recover all tenants from the state store
911    ///
912    /// Returns the number of tenants successfully recovered.
913    pub fn recover(&mut self) -> Result<usize, StoreError> {
914        let store = match &self.store {
915            Some(s) => Arc::clone(s),
916            None => return Ok(0),
917        };
918
919        // Load tenant index
920        let index_data = match store.get("tenants:index")? {
921            Some(data) => data,
922            None => return Ok(0),
923        };
924
925        let tenant_ids: Vec<String> = serde_json::from_slice(&index_data)
926            .map_err(|e| StoreError::SerializationError(e.to_string()))?;
927
928        let mut recovered = 0;
929        let mut failed = 0;
930
931        for tid in &tenant_ids {
932            let key = format!("tenant:{tid}");
933            let data = match store.get(&key)? {
934                Some(d) => d,
935                None => {
936                    warn!("Tenant {} listed in index but not found in store", tid);
937                    failed += 1;
938                    continue;
939                }
940            };
941
942            let snapshot: TenantSnapshot = match serde_json::from_slice(&data) {
943                Ok(s) => s,
944                Err(e) => {
945                    warn!("Failed to deserialize tenant {}: {}", tid, e);
946                    failed += 1;
947                    continue;
948                }
949            };
950
951            match Self::restore_tenant_from_snapshot(snapshot) {
952                Ok(tenant) => {
953                    let tenant_id = tenant.id.clone();
954                    self.api_key_index
955                        .insert(tenant.api_key.clone(), tenant_id.clone());
956                    let pipeline_count = tenant.pipelines.len();
957                    self.tenants.insert(tenant_id.clone(), tenant);
958                    info!(
959                        "Recovered tenant {} with {} pipeline(s)",
960                        tenant_id, pipeline_count
961                    );
962                    recovered += 1;
963                }
964                Err(e) => {
965                    warn!("Failed to restore tenant {}: {}", tid, e);
966                    failed += 1;
967                }
968            }
969        }
970
971        if failed > 0 {
972            warn!(
973                "Recovery complete: {} recovered, {} failed",
974                recovered, failed
975            );
976        }
977
978        Ok(recovered)
979    }
980
981    fn persist_tenant_to_store(store: &dyn StateStore, tenant: &Tenant) -> Result<(), StoreError> {
982        let snapshot = tenant.snapshot();
983        let data = serde_json::to_vec(&snapshot)
984            .map_err(|e| StoreError::SerializationError(e.to_string()))?;
985        let key = format!("tenant:{}", snapshot.id);
986        store.put(&key, &data)?;
987
988        // Update tenant index
989        Self::update_tenant_index_add(store, &snapshot.id)?;
990
991        store.flush()
992    }
993
994    fn delete_tenant_state(store: &dyn StateStore, id: &TenantId) -> Result<(), StoreError> {
995        let key = format!("tenant:{}", id.0);
996        store.delete(&key)?;
997
998        Self::update_tenant_index_remove(store, &id.0)?;
999
1000        store.flush()
1001    }
1002
1003    fn update_tenant_index_add(store: &dyn StateStore, tenant_id: &str) -> Result<(), StoreError> {
1004        let mut ids = Self::load_tenant_index(store)?;
1005        let id_str = tenant_id.to_string();
1006        if !ids.contains(&id_str) {
1007            ids.push(id_str);
1008        }
1009        let data =
1010            serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1011        store.put("tenants:index", &data)
1012    }
1013
1014    fn update_tenant_index_remove(
1015        store: &dyn StateStore,
1016        tenant_id: &str,
1017    ) -> Result<(), StoreError> {
1018        let mut ids = Self::load_tenant_index(store)?;
1019        ids.retain(|id| id != tenant_id);
1020        let data =
1021            serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1022        store.put("tenants:index", &data)
1023    }
1024
1025    fn load_tenant_index(store: &dyn StateStore) -> Result<Vec<String>, StoreError> {
1026        match store.get("tenants:index")? {
1027            Some(data) => serde_json::from_slice(&data)
1028                .map_err(|e| StoreError::SerializationError(e.to_string())),
1029            None => Ok(Vec::new()),
1030        }
1031    }
1032
1033    fn restore_tenant_from_snapshot(snapshot: TenantSnapshot) -> Result<Tenant, TenantError> {
1034        let tenant_id = TenantId::new(&snapshot.id);
1035
1036        let mut tenant = Tenant::new(tenant_id, snapshot.name, snapshot.api_key, snapshot.quota);
1037        tenant.usage.events_processed = snapshot.events_processed;
1038        tenant.usage.output_events_emitted = snapshot.output_events_emitted;
1039        tenant.usage.events_in_window = snapshot.events_in_window;
1040
1041        for ps in snapshot.pipelines {
1042            match Self::restore_pipeline_from_snapshot(ps.clone()) {
1043                Ok(pipeline) => {
1044                    tenant.pipelines.insert(pipeline.id.clone(), pipeline);
1045                }
1046                Err(e) => {
1047                    warn!(
1048                        "Failed to restore pipeline '{}' ({}): {}",
1049                        ps.name, ps.id, e
1050                    );
1051                }
1052            }
1053        }
1054
1055        tenant.usage.active_pipelines = tenant.pipelines.len();
1056        Ok(tenant)
1057    }
1058
1059    fn restore_pipeline_from_snapshot(snapshot: PipelineSnapshot) -> Result<Pipeline, TenantError> {
1060        let program = varpulis_parser::parse(&snapshot.source)?;
1061
1062        let (output_tx, output_rx) = mpsc::channel(1000);
1063        let mut engine = Engine::new(output_tx);
1064        engine.load(&program)?;
1065
1066        let (log_tx, _) = tokio::sync::broadcast::channel(256);
1067        Ok(Pipeline {
1068            id: snapshot.id,
1069            name: snapshot.name,
1070            source: snapshot.source,
1071            engine: Arc::new(tokio::sync::Mutex::new(engine)),
1072            output_rx,
1073            log_broadcast: log_tx,
1074            created_at: Instant::now(),
1075            status: snapshot.status,
1076            orchestrator: None,
1077            connector_registry: None,
1078        })
1079    }
1080}
1081
1082impl Default for TenantManager {
1083    fn default() -> Self {
1084        Self::new()
1085    }
1086}
1087
1088/// Thread-safe tenant manager for use in async server context
1089pub type SharedTenantManager = Arc<RwLock<TenantManager>>;
1090
1091/// Create a new shared tenant manager
1092pub fn shared_tenant_manager() -> SharedTenantManager {
1093    Arc::new(RwLock::new(TenantManager::new()))
1094}
1095
1096/// Create a shared tenant manager backed by a state store, recovering any persisted state
1097pub fn shared_tenant_manager_with_store(store: Arc<dyn StateStore>) -> SharedTenantManager {
1098    let mut mgr = TenantManager::with_store(store);
1099    match mgr.recover() {
1100        Ok(count) if count > 0 => {
1101            info!("Recovered {} tenant(s) from persistent state", count);
1102        }
1103        Ok(_) => {
1104            info!("No persisted tenant state found, starting fresh");
1105        }
1106        Err(e) => {
1107            error!("Failed to recover tenant state: {}", e);
1108        }
1109    }
1110    Arc::new(RwLock::new(mgr))
1111}
1112
1113// =============================================================================
1114// Snapshot Types for Persistence
1115// =============================================================================
1116
1117/// Serializable snapshot of a tenant's state
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct TenantSnapshot {
1120    pub id: String,
1121    pub name: String,
1122    pub api_key: String,
1123    pub quota: TenantQuota,
1124    pub events_processed: u64,
1125    #[serde(alias = "alerts_generated")]
1126    pub output_events_emitted: u64,
1127    pub pipelines: Vec<PipelineSnapshot>,
1128    #[serde(default)]
1129    pub created_at_ms: Option<i64>,
1130    #[serde(default)]
1131    pub events_in_window: u64,
1132}
1133
1134/// Serializable snapshot of a pipeline
1135#[derive(Debug, Clone, Serialize, Deserialize)]
1136pub struct PipelineSnapshot {
1137    pub id: String,
1138    pub name: String,
1139    pub source: String,
1140    pub status: PipelineStatus,
1141}
1142
1143impl Pipeline {
1144    /// Create a serializable snapshot of this pipeline
1145    pub fn snapshot(&self) -> PipelineSnapshot {
1146        PipelineSnapshot {
1147            id: self.id.clone(),
1148            name: self.name.clone(),
1149            source: self.source.clone(),
1150            status: self.status.clone(),
1151        }
1152    }
1153}
1154
1155impl Tenant {
1156    /// Create a serializable snapshot of this tenant and all its pipelines
1157    pub fn snapshot(&self) -> TenantSnapshot {
1158        TenantSnapshot {
1159            id: self.id.0.clone(),
1160            name: self.name.clone(),
1161            api_key: self.api_key.clone(),
1162            quota: self.quota.clone(),
1163            events_processed: self.usage.events_processed,
1164            output_events_emitted: self.usage.output_events_emitted,
1165            pipelines: self.pipelines.values().map(|p| p.snapshot()).collect(),
1166            created_at_ms: Some(chrono::Utc::now().timestamp_millis()),
1167            events_in_window: self.usage.events_in_window,
1168        }
1169    }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174    use super::*;
1175
1176    #[test]
1177    fn test_tenant_id_generate() {
1178        let id1 = TenantId::generate();
1179        let id2 = TenantId::generate();
1180        assert_ne!(id1, id2);
1181    }
1182
1183    #[test]
1184    fn test_tenant_id_display() {
1185        let id = TenantId::new("test-123");
1186        assert_eq!(format!("{id}"), "test-123");
1187        assert_eq!(id.as_str(), "test-123");
1188    }
1189
1190    #[test]
1191    fn test_tenant_quota_tiers() {
1192        let free = TenantQuota::free();
1193        let pro = TenantQuota::pro();
1194        let enterprise = TenantQuota::enterprise();
1195
1196        assert!(free.max_pipelines < pro.max_pipelines);
1197        assert!(pro.max_pipelines < enterprise.max_pipelines);
1198        assert!(free.max_events_per_second < pro.max_events_per_second);
1199        assert!(pro.max_events_per_second < enterprise.max_events_per_second);
1200    }
1201
1202    #[test]
1203    fn test_usage_record_event() {
1204        let mut usage = TenantUsage::default();
1205        assert!(usage.record_event(100));
1206        assert_eq!(usage.events_processed, 1);
1207        assert_eq!(usage.events_in_window, 1);
1208    }
1209
1210    #[test]
1211    fn test_usage_rate_limit() {
1212        let mut usage = TenantUsage::default();
1213        // Allow 2 events per second
1214        assert!(usage.record_event(2));
1215        assert!(usage.record_event(2));
1216        // Third should be rejected
1217        assert!(!usage.record_event(2));
1218    }
1219
1220    #[test]
1221    fn test_usage_no_rate_limit() {
1222        let mut usage = TenantUsage::default();
1223        // 0 means disabled
1224        for _ in 0..1000 {
1225            assert!(usage.record_event(0));
1226        }
1227    }
1228
1229    #[test]
1230    fn test_tenant_manager_create() {
1231        let mut mgr = TenantManager::new();
1232        let id = mgr
1233            .create_tenant("Test Corp".into(), "key-123".into(), TenantQuota::free())
1234            .unwrap();
1235
1236        assert_eq!(mgr.tenant_count(), 1);
1237        assert!(mgr.get_tenant(&id).is_some());
1238        assert_eq!(mgr.get_tenant(&id).unwrap().name, "Test Corp");
1239    }
1240
1241    #[test]
1242    fn test_tenant_manager_api_key_lookup() {
1243        let mut mgr = TenantManager::new();
1244        let id = mgr
1245            .create_tenant("Test".into(), "my-key".into(), TenantQuota::default())
1246            .unwrap();
1247
1248        let found = mgr.get_tenant_by_api_key("my-key");
1249        assert_eq!(found, Some(&id));
1250
1251        assert!(mgr.get_tenant_by_api_key("wrong-key").is_none());
1252    }
1253
1254    #[test]
1255    fn test_tenant_manager_duplicate_api_key() {
1256        let mut mgr = TenantManager::new();
1257        mgr.create_tenant("A".into(), "key-1".into(), TenantQuota::default())
1258            .unwrap();
1259        let result = mgr.create_tenant("B".into(), "key-1".into(), TenantQuota::default());
1260        assert!(result.is_err());
1261    }
1262
1263    #[test]
1264    fn test_tenant_manager_remove() {
1265        let mut mgr = TenantManager::new();
1266        let id = mgr
1267            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1268            .unwrap();
1269
1270        mgr.remove_tenant(&id).unwrap();
1271        assert_eq!(mgr.tenant_count(), 0);
1272        assert!(mgr.get_tenant_by_api_key("key-1").is_none());
1273    }
1274
1275    #[tokio::test]
1276    async fn test_tenant_deploy_pipeline() {
1277        let mut mgr = TenantManager::new();
1278        let id = mgr
1279            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1280            .unwrap();
1281
1282        let tenant = mgr.get_tenant_mut(&id).unwrap();
1283        let vpl = r"
1284            stream Alerts = SensorReading
1285                .where(temperature > 100)
1286        ";
1287        let pipeline_id = tenant
1288            .deploy_pipeline("My Pipeline".into(), vpl.into())
1289            .await
1290            .unwrap();
1291        assert_eq!(tenant.pipelines.len(), 1);
1292        assert_eq!(tenant.usage.active_pipelines, 1);
1293        assert_eq!(
1294            tenant.pipelines[&pipeline_id].status,
1295            PipelineStatus::Running
1296        );
1297    }
1298
1299    #[tokio::test]
1300    async fn test_tenant_pipeline_quota() {
1301        let mut mgr = TenantManager::new();
1302        let quota = TenantQuota {
1303            max_pipelines: 1,
1304            max_events_per_second: 100,
1305            max_streams_per_pipeline: 50,
1306        };
1307        let id = mgr
1308            .create_tenant("Test".into(), "key-1".into(), quota)
1309            .unwrap();
1310
1311        let tenant = mgr.get_tenant_mut(&id).unwrap();
1312        let vpl = "stream A = SensorReading .where(x > 1)";
1313        tenant
1314            .deploy_pipeline("P1".into(), vpl.into())
1315            .await
1316            .unwrap();
1317
1318        // Second should fail
1319        let result = tenant.deploy_pipeline("P2".into(), vpl.into()).await;
1320        assert!(result.is_err());
1321    }
1322
1323    #[tokio::test]
1324    async fn test_tenant_remove_pipeline() {
1325        let mut mgr = TenantManager::new();
1326        let id = mgr
1327            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1328            .unwrap();
1329
1330        let tenant = mgr.get_tenant_mut(&id).unwrap();
1331        let vpl = "stream A = SensorReading .where(x > 1)";
1332        let pid = tenant
1333            .deploy_pipeline("P1".into(), vpl.into())
1334            .await
1335            .unwrap();
1336
1337        tenant.remove_pipeline(&pid).unwrap();
1338        assert_eq!(tenant.pipelines.len(), 0);
1339        assert_eq!(tenant.usage.active_pipelines, 0);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_tenant_parse_error() {
1344        let mut mgr = TenantManager::new();
1345        let id = mgr
1346            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1347            .unwrap();
1348
1349        let tenant = mgr.get_tenant_mut(&id).unwrap();
1350        let result = tenant
1351            .deploy_pipeline("Bad".into(), "this is not valid VPL {{{{".into())
1352            .await;
1353        assert!(result.is_err());
1354    }
1355
1356    #[tokio::test]
1357    async fn test_tenant_process_event() {
1358        let mut mgr = TenantManager::new();
1359        let id = mgr
1360            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1361            .unwrap();
1362
1363        let tenant = mgr.get_tenant_mut(&id).unwrap();
1364        let vpl = "stream A = SensorReading .where(temperature > 100)";
1365        let pid = tenant
1366            .deploy_pipeline("P1".into(), vpl.into())
1367            .await
1368            .unwrap();
1369
1370        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1371        tenant.process_event(&pid, event).await.unwrap();
1372        assert_eq!(tenant.usage.events_processed, 1);
1373    }
1374
1375    #[tokio::test]
1376    async fn test_tenant_rate_limit_on_process() {
1377        let mut mgr = TenantManager::new();
1378        let quota = TenantQuota {
1379            max_pipelines: 10,
1380            max_events_per_second: 2,
1381            max_streams_per_pipeline: 50,
1382        };
1383        let id = mgr
1384            .create_tenant("Test".into(), "key-1".into(), quota)
1385            .unwrap();
1386
1387        let tenant = mgr.get_tenant_mut(&id).unwrap();
1388        let vpl = "stream A = SensorReading .where(x > 1)";
1389        let pid = tenant
1390            .deploy_pipeline("P1".into(), vpl.into())
1391            .await
1392            .unwrap();
1393
1394        let event = Event::new("SensorReading").with_field("x", 5);
1395        tenant.process_event(&pid, event.clone()).await.unwrap();
1396        tenant.process_event(&pid, event.clone()).await.unwrap();
1397
1398        // Third should hit rate limit
1399        let result = tenant.process_event(&pid, event).await;
1400        assert!(result.is_err());
1401    }
1402
1403    #[tokio::test]
1404    async fn test_tenant_reload_pipeline() {
1405        let mut mgr = TenantManager::new();
1406        let id = mgr
1407            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1408            .unwrap();
1409
1410        let tenant = mgr.get_tenant_mut(&id).unwrap();
1411        let vpl1 = "stream A = SensorReading .where(x > 1)";
1412        let pid = tenant
1413            .deploy_pipeline("P1".into(), vpl1.into())
1414            .await
1415            .unwrap();
1416
1417        let vpl2 = "stream B = SensorReading .where(x > 50)";
1418        tenant.reload_pipeline(&pid, vpl2.into()).await.unwrap();
1419
1420        assert_eq!(tenant.pipelines[&pid].source, vpl2);
1421    }
1422
1423    #[test]
1424    fn test_pipeline_status_display() {
1425        assert_eq!(format!("{}", PipelineStatus::Running), "running");
1426        assert_eq!(format!("{}", PipelineStatus::Stopped), "stopped");
1427        assert_eq!(
1428            format!("{}", PipelineStatus::Error("oops".into())),
1429            "error: oops"
1430        );
1431    }
1432
1433    #[test]
1434    fn test_tenant_error_display() {
1435        assert!(format!("{}", TenantError::NotFound("t1".into())).contains("t1"));
1436        assert!(format!("{}", TenantError::RateLimitExceeded).contains("rate limit"));
1437        let engine_err = crate::engine::error::EngineError::Compilation("bad".into());
1438        assert!(format!("{}", TenantError::EngineError(engine_err)).contains("bad"));
1439    }
1440
1441    #[test]
1442    fn test_shared_tenant_manager() {
1443        let mgr = shared_tenant_manager();
1444        assert!(Arc::strong_count(&mgr) == 1);
1445    }
1446
1447    #[test]
1448    fn test_tenant_list() {
1449        let mut mgr = TenantManager::new();
1450        mgr.create_tenant("A".into(), "key-a".into(), TenantQuota::default())
1451            .unwrap();
1452        mgr.create_tenant("B".into(), "key-b".into(), TenantQuota::default())
1453            .unwrap();
1454        assert_eq!(mgr.list_tenants().len(), 2);
1455    }
1456
1457    #[test]
1458    fn test_tenant_manager_default() {
1459        let mgr = TenantManager::default();
1460        assert_eq!(mgr.tenant_count(), 0);
1461    }
1462
1463    #[tokio::test]
1464    async fn test_tenant_snapshot_roundtrip() {
1465        let mut mgr = TenantManager::new();
1466        let id = mgr
1467            .create_tenant("Snap Corp".into(), "snap-key".into(), TenantQuota::pro())
1468            .unwrap();
1469
1470        let tenant = mgr.get_tenant_mut(&id).unwrap();
1471        let vpl = "stream A = SensorReading .where(x > 1)";
1472        tenant
1473            .deploy_pipeline("Pipeline1".into(), vpl.into())
1474            .await
1475            .unwrap();
1476        tenant.usage.events_processed = 42;
1477        tenant.usage.output_events_emitted = 7;
1478
1479        let snapshot = tenant.snapshot();
1480        let json = serde_json::to_vec(&snapshot).unwrap();
1481        let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1482
1483        assert_eq!(restored.id, id.0);
1484        assert_eq!(restored.name, "Snap Corp");
1485        assert_eq!(restored.api_key, "snap-key");
1486        assert_eq!(restored.events_processed, 42);
1487        assert_eq!(restored.output_events_emitted, 7);
1488        assert_eq!(restored.pipelines.len(), 1);
1489        assert_eq!(restored.pipelines[0].name, "Pipeline1");
1490        assert_eq!(restored.pipelines[0].source, vpl);
1491        assert_eq!(restored.pipelines[0].status, PipelineStatus::Running);
1492        assert_eq!(
1493            restored.quota.max_pipelines,
1494            TenantQuota::pro().max_pipelines
1495        );
1496    }
1497
1498    #[tokio::test]
1499    async fn test_tenant_manager_persistence_and_recovery() {
1500        use crate::persistence::MemoryStore;
1501
1502        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1503        let vpl = "stream A = SensorReading .where(x > 1)";
1504
1505        // Create tenants and pipelines with persistence
1506        let (tenant_id, pipeline_name) = {
1507            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1508            let id = mgr
1509                .create_tenant(
1510                    "Persisted Corp".into(),
1511                    "persist-key".into(),
1512                    TenantQuota::default(),
1513                )
1514                .unwrap();
1515
1516            let tenant = mgr.get_tenant_mut(&id).unwrap();
1517            tenant
1518                .deploy_pipeline("Persistent Pipeline".into(), vpl.into())
1519                .await
1520                .unwrap();
1521            tenant.usage.events_processed = 100;
1522            tenant.usage.output_events_emitted = 5;
1523            mgr.persist_if_needed(&id);
1524
1525            (id.0.clone(), "Persistent Pipeline".to_string())
1526        };
1527
1528        // Recover into a new manager
1529        let mut mgr2 = TenantManager::with_store(Arc::clone(&store));
1530        let recovered = mgr2.recover().unwrap();
1531        assert_eq!(recovered, 1);
1532        assert_eq!(mgr2.tenant_count(), 1);
1533
1534        // Verify tenant data
1535        let tid = TenantId::new(&tenant_id);
1536        let tenant = mgr2.get_tenant(&tid).unwrap();
1537        assert_eq!(tenant.name, "Persisted Corp");
1538        assert_eq!(tenant.api_key, "persist-key");
1539        assert_eq!(tenant.usage.events_processed, 100);
1540        assert_eq!(tenant.usage.output_events_emitted, 5);
1541        assert_eq!(tenant.pipelines.len(), 1);
1542
1543        let pipeline = tenant.pipelines.values().next().unwrap();
1544        assert_eq!(pipeline.name, pipeline_name);
1545        assert_eq!(pipeline.source, vpl);
1546        assert_eq!(pipeline.status, PipelineStatus::Running);
1547
1548        // Verify API key index was rebuilt
1549        assert_eq!(mgr2.get_tenant_by_api_key("persist-key"), Some(&tid));
1550    }
1551
1552    #[tokio::test]
1553    async fn test_persistence_survives_restart() {
1554        use crate::persistence::MemoryStore;
1555
1556        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1557        let vpl = "stream A = SensorReading .where(x > 1)";
1558
1559        // Phase 1: Create data
1560        {
1561            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1562            let id1 = mgr
1563                .create_tenant("Tenant A".into(), "key-a".into(), TenantQuota::free())
1564                .unwrap();
1565            let id2 = mgr
1566                .create_tenant("Tenant B".into(), "key-b".into(), TenantQuota::pro())
1567                .unwrap();
1568
1569            let tenant_a = mgr.get_tenant_mut(&id1).unwrap();
1570            tenant_a
1571                .deploy_pipeline("P1".into(), vpl.into())
1572                .await
1573                .unwrap();
1574            mgr.persist_if_needed(&id1);
1575
1576            let tenant_b = mgr.get_tenant_mut(&id2).unwrap();
1577            tenant_b
1578                .deploy_pipeline("P2".into(), vpl.into())
1579                .await
1580                .unwrap();
1581            tenant_b
1582                .deploy_pipeline("P3".into(), vpl.into())
1583                .await
1584                .unwrap();
1585            mgr.persist_if_needed(&id2);
1586        }
1587
1588        // Phase 2: Recover (simulating restart)
1589        {
1590            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1591            let recovered = mgr.recover().unwrap();
1592            assert_eq!(recovered, 2);
1593            assert_eq!(mgr.tenant_count(), 2);
1594
1595            // Verify tenant A
1596            let tid_a = mgr.get_tenant_by_api_key("key-a").unwrap().clone();
1597            let tenant_a = mgr.get_tenant(&tid_a).unwrap();
1598            assert_eq!(tenant_a.name, "Tenant A");
1599            assert_eq!(tenant_a.pipelines.len(), 1);
1600
1601            // Verify tenant B
1602            let tid_b = mgr.get_tenant_by_api_key("key-b").unwrap().clone();
1603            let tenant_b = mgr.get_tenant(&tid_b).unwrap();
1604            assert_eq!(tenant_b.name, "Tenant B");
1605            assert_eq!(tenant_b.pipelines.len(), 2);
1606        }
1607    }
1608
1609    #[test]
1610    fn test_tenant_snapshot_created_at_and_window() {
1611        let mut mgr = TenantManager::new();
1612        let id = mgr
1613            .create_tenant(
1614                "Window Corp".into(),
1615                "window-key".into(),
1616                TenantQuota::default(),
1617            )
1618            .unwrap();
1619
1620        let tenant = mgr.get_tenant_mut(&id).unwrap();
1621        tenant.usage.events_in_window = 42;
1622        tenant.usage.events_processed = 100;
1623
1624        let snapshot = tenant.snapshot();
1625
1626        // created_at_ms should be populated
1627        assert!(snapshot.created_at_ms.is_some());
1628        let ts = snapshot.created_at_ms.unwrap();
1629        // Should be a reasonable recent timestamp (after 2024-01-01)
1630        assert!(ts > 1_704_067_200_000);
1631
1632        // events_in_window should be captured
1633        assert_eq!(snapshot.events_in_window, 42);
1634
1635        // Round-trip through JSON
1636        let json = serde_json::to_vec(&snapshot).unwrap();
1637        let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1638        assert_eq!(restored.events_in_window, 42);
1639        assert_eq!(restored.created_at_ms, snapshot.created_at_ms);
1640
1641        // Restore tenant and verify events_in_window is carried over
1642        let restored_tenant = TenantManager::restore_tenant_from_snapshot(restored).unwrap();
1643        assert_eq!(restored_tenant.usage.events_in_window, 42);
1644        assert_eq!(restored_tenant.usage.events_processed, 100);
1645    }
1646
1647    #[test]
1648    fn test_tenant_snapshot_backwards_compat() {
1649        // Simulate old JSON without the new fields
1650        let old_json = r#"{
1651            "id": "compat-tenant",
1652            "name": "Compat Corp",
1653            "api_key": "compat-key",
1654            "quota": {
1655                "max_pipelines": 10,
1656                "max_events_per_second": 10000,
1657                "max_streams_per_pipeline": 50
1658            },
1659            "events_processed": 55,
1660            "alerts_generated": 3,
1661            "pipelines": []
1662        }"#;
1663
1664        let snapshot: TenantSnapshot = serde_json::from_str(old_json).unwrap();
1665        assert_eq!(snapshot.id, "compat-tenant");
1666        assert_eq!(snapshot.events_processed, 55);
1667        // New fields should default
1668        assert_eq!(snapshot.created_at_ms, None);
1669        assert_eq!(snapshot.events_in_window, 0);
1670
1671        // Should restore fine
1672        let tenant = TenantManager::restore_tenant_from_snapshot(snapshot).unwrap();
1673        assert_eq!(tenant.name, "Compat Corp");
1674        assert_eq!(tenant.usage.events_processed, 55);
1675        assert_eq!(tenant.usage.events_in_window, 0);
1676    }
1677
1678    #[test]
1679    fn test_recovery_with_invalid_vpl() {
1680        use crate::persistence::MemoryStore;
1681
1682        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1683
1684        // Manually store a tenant with invalid VPL
1685        let snapshot = TenantSnapshot {
1686            id: "bad-tenant".into(),
1687            name: "Bad Corp".into(),
1688            api_key: "bad-key".into(),
1689            quota: TenantQuota::default(),
1690            events_processed: 0,
1691            output_events_emitted: 0,
1692            pipelines: vec![PipelineSnapshot {
1693                id: "bad-pipeline".into(),
1694                name: "Bad Pipeline".into(),
1695                source: "this is not valid VPL {{{{".into(),
1696                status: PipelineStatus::Running,
1697            }],
1698            created_at_ms: None,
1699            events_in_window: 0,
1700        };
1701
1702        let data = serde_json::to_vec(&snapshot).unwrap();
1703        store.put("tenant:bad-tenant", &data).unwrap();
1704        let index = serde_json::to_vec(&vec!["bad-tenant"]).unwrap();
1705        store.put("tenants:index", &index).unwrap();
1706
1707        // Recovery should succeed (tenant recovered, bad pipeline skipped)
1708        let mut mgr = TenantManager::with_store(Arc::clone(&store));
1709        let recovered = mgr.recover().unwrap();
1710        assert_eq!(recovered, 1);
1711
1712        let tid = TenantId::new("bad-tenant");
1713        let tenant = mgr.get_tenant(&tid).unwrap();
1714        assert_eq!(tenant.name, "Bad Corp");
1715        // The invalid pipeline should have been skipped
1716        assert_eq!(tenant.pipelines.len(), 0);
1717    }
1718
1719    #[test]
1720    fn test_backpressure_disabled_by_default() {
1721        let mgr = TenantManager::new();
1722        assert_eq!(mgr.max_queue_depth(), 0);
1723        assert!(mgr.check_backpressure().is_ok());
1724    }
1725
1726    #[test]
1727    fn test_backpressure_set_max_queue_depth() {
1728        let mut mgr = TenantManager::new();
1729        mgr.set_max_queue_depth(100);
1730        assert_eq!(mgr.max_queue_depth(), 100);
1731        // No pending events, should pass
1732        assert!(mgr.check_backpressure().is_ok());
1733    }
1734
1735    #[test]
1736    fn test_backpressure_exceeded() {
1737        let mut mgr = TenantManager::new();
1738        mgr.set_max_queue_depth(10);
1739
1740        // Simulate 10 pending events
1741        mgr.pending_events.store(10, Ordering::Relaxed);
1742        let result = mgr.check_backpressure();
1743        assert!(result.is_err());
1744
1745        if let Err(TenantError::BackpressureExceeded { current, max }) = result {
1746            assert_eq!(current, 10);
1747            assert_eq!(max, 10);
1748        } else {
1749            panic!("Expected BackpressureExceeded error");
1750        }
1751    }
1752
1753    #[test]
1754    fn test_backpressure_not_exceeded() {
1755        let mut mgr = TenantManager::new();
1756        mgr.set_max_queue_depth(10);
1757        mgr.pending_events.store(9, Ordering::Relaxed);
1758        assert!(mgr.check_backpressure().is_ok());
1759    }
1760
1761    #[test]
1762    fn test_backpressure_unlimited_when_zero() {
1763        let mut mgr = TenantManager::new();
1764        mgr.set_max_queue_depth(0);
1765        // Even with huge pending count, should not trigger
1766        mgr.pending_events.store(1_000_000, Ordering::Relaxed);
1767        assert!(mgr.check_backpressure().is_ok());
1768    }
1769
1770    #[test]
1771    fn test_queue_pressure_ratio() {
1772        let mut mgr = TenantManager::new();
1773        mgr.set_max_queue_depth(100);
1774        mgr.pending_events.store(50, Ordering::Relaxed);
1775        let ratio = mgr.queue_pressure_ratio();
1776        assert!((ratio - 0.5).abs() < f64::EPSILON);
1777    }
1778
1779    #[test]
1780    fn test_queue_pressure_ratio_zero_max() {
1781        let mgr = TenantManager::new();
1782        assert_eq!(mgr.queue_pressure_ratio(), 0.0);
1783    }
1784
1785    #[test]
1786    fn test_pending_events_counter() {
1787        let mgr = TenantManager::new();
1788        assert_eq!(mgr.pending_event_count(), 0);
1789        let counter = mgr.pending_events_counter();
1790        counter.fetch_add(5, Ordering::Relaxed);
1791        assert_eq!(mgr.pending_event_count(), 5);
1792    }
1793
1794    #[tokio::test]
1795    async fn test_process_event_with_backpressure_ok() {
1796        let mut mgr = TenantManager::new();
1797        mgr.set_max_queue_depth(100);
1798        let id = mgr
1799            .create_tenant("Test".into(), "key-bp".into(), TenantQuota::default())
1800            .unwrap();
1801
1802        let tenant = mgr.get_tenant_mut(&id).unwrap();
1803        let vpl = "stream A = SensorReading .where(temperature > 100)";
1804        let pid = tenant
1805            .deploy_pipeline("BP Pipeline".into(), vpl.into())
1806            .await
1807            .unwrap();
1808
1809        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1810        let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1811        assert!(result.is_ok());
1812        // Pending count should be back to 0 after processing
1813        assert_eq!(mgr.pending_event_count(), 0);
1814    }
1815
1816    #[tokio::test]
1817    async fn test_process_event_with_backpressure_rejected() {
1818        let mut mgr = TenantManager::new();
1819        mgr.set_max_queue_depth(5);
1820        // Simulate queue being full
1821        mgr.pending_events.store(5, Ordering::Relaxed);
1822
1823        let id = mgr
1824            .create_tenant("Test".into(), "key-bp2".into(), TenantQuota::default())
1825            .unwrap();
1826        let tenant = mgr.get_tenant_mut(&id).unwrap();
1827        let vpl = "stream A = SensorReading .where(temperature > 100)";
1828        let pid = tenant
1829            .deploy_pipeline("BP Pipeline 2".into(), vpl.into())
1830            .await
1831            .unwrap();
1832
1833        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1834        let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1835        assert!(result.is_err());
1836        match result {
1837            Err(TenantError::BackpressureExceeded { current, max }) => {
1838                assert_eq!(current, 5);
1839                assert_eq!(max, 5);
1840            }
1841            _ => panic!("Expected BackpressureExceeded"),
1842        }
1843    }
1844
1845    #[test]
1846    fn test_backpressure_error_display() {
1847        let err = TenantError::BackpressureExceeded {
1848            current: 50000,
1849            max: 50000,
1850        };
1851        let msg = format!("{err}");
1852        assert!(msg.contains("50000"));
1853        assert!(msg.contains("exceeds maximum"));
1854    }
1855
1856    #[tokio::test]
1857    async fn test_collect_pipeline_metrics_returns_event_counts() {
1858        let mut mgr = TenantManager::new();
1859        let id = mgr
1860            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1861            .unwrap();
1862
1863        // Deploy a simple filter pipeline
1864        let vpl = "stream A = SensorReading .where(temperature > 100)";
1865        mgr.deploy_pipeline_on_tenant(&id, "test-pipeline".into(), vpl.into())
1866            .await
1867            .unwrap();
1868
1869        // Verify metrics before any events: pipeline exists with 0 events
1870        let metrics = mgr.collect_pipeline_metrics().await;
1871        assert_eq!(metrics.len(), 1, "Should have 1 pipeline");
1872        assert_eq!(metrics[0].0, "test-pipeline");
1873        assert_eq!(metrics[0].1, 0, "events_in should be 0 before processing");
1874        assert_eq!(metrics[0].2, 0, "events_out should be 0 before processing");
1875
1876        // Process events through the pipeline
1877        let tenant = mgr.get_tenant_mut(&id).unwrap();
1878        let pid = tenant.pipelines.keys().next().unwrap().clone();
1879        for i in 0..10 {
1880            let event = Event::new("SensorReading")
1881                .with_field("temperature", (i as f64).mul_add(20.0, 50.0));
1882            tenant.process_event(&pid, event).await.unwrap();
1883        }
1884
1885        // Re-collect metrics — should reflect processed events
1886        let metrics = mgr.collect_pipeline_metrics().await;
1887        assert_eq!(metrics.len(), 1);
1888        assert_eq!(metrics[0].0, "test-pipeline");
1889        assert!(
1890            metrics[0].1 > 0,
1891            "events_in should be > 0 after processing, got {}",
1892            metrics[0].1
1893        );
1894    }
1895
1896    #[tokio::test]
1897    async fn test_collect_pipeline_metrics_multiple_tenants() {
1898        let mut mgr = TenantManager::new();
1899        let id1 = mgr
1900            .create_tenant("Tenant1".into(), "key-1".into(), TenantQuota::default())
1901            .unwrap();
1902        let id2 = mgr
1903            .create_tenant("Tenant2".into(), "key-2".into(), TenantQuota::default())
1904            .unwrap();
1905
1906        let vpl = "stream A = SensorReading .where(temperature > 100)";
1907        mgr.deploy_pipeline_on_tenant(&id1, "pipeline-1".into(), vpl.into())
1908            .await
1909            .unwrap();
1910        mgr.deploy_pipeline_on_tenant(&id2, "pipeline-2".into(), vpl.into())
1911            .await
1912            .unwrap();
1913
1914        let metrics = mgr.collect_pipeline_metrics().await;
1915        assert_eq!(metrics.len(), 2, "Should have 2 pipelines across 2 tenants");
1916        let names: Vec<&str> = metrics.iter().map(|(n, _, _)| n.as_str()).collect();
1917        assert!(names.contains(&"pipeline-1"));
1918        assert!(names.contains(&"pipeline-2"));
1919    }
1920
1921    #[tokio::test]
1922    async fn test_collect_pipeline_metrics_empty_when_no_pipelines() {
1923        let mut mgr = TenantManager::new();
1924        let _id = mgr
1925            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1926            .unwrap();
1927
1928        let metrics = mgr.collect_pipeline_metrics().await;
1929        assert!(
1930            metrics.is_empty(),
1931            "Should be empty when no pipelines deployed"
1932        );
1933    }
1934
1935    #[tokio::test]
1936    async fn test_deploy_pipeline_on_tenant_and_collect_metrics() {
1937        // Test via TenantManager.deploy_pipeline_on_tenant → collect_pipeline_metrics
1938        // This mirrors the exact code path used in production (coordinator deploys to worker)
1939        let mut mgr = TenantManager::new();
1940        let id = mgr
1941            .create_tenant(
1942                "default".into(),
1943                "api-key-123".into(),
1944                TenantQuota::enterprise(),
1945            )
1946            .unwrap();
1947
1948        let vpl = r"
1949            event MarketTick:
1950                symbol: str
1951                price: float
1952            stream Ticks = MarketTick .where(price > 100)
1953        ";
1954        let pipeline_id = mgr
1955            .deploy_pipeline_on_tenant(&id, "financial-cep".into(), vpl.into())
1956            .await
1957            .unwrap();
1958
1959        // Verify the pipeline appears in metrics
1960        let metrics = mgr.collect_pipeline_metrics().await;
1961        assert_eq!(metrics.len(), 1);
1962        assert_eq!(metrics[0].0, "financial-cep");
1963
1964        // Process events and verify counts update
1965        for _ in 0..5 {
1966            let event = Event::new("MarketTick")
1967                .with_field("symbol", "AAPL")
1968                .with_field("price", 150.0);
1969            mgr.process_event_with_backpressure(&id, &pipeline_id, event)
1970                .await
1971                .unwrap();
1972        }
1973
1974        let metrics = mgr.collect_pipeline_metrics().await;
1975        assert_eq!(metrics[0].1, 5, "Should have processed 5 events");
1976    }
1977}