Skip to main content

varpulis_runtime/
tenant.rs

1//! Multi-tenant isolation for SaaS deployment
2//!
3//! Provides tenant-scoped engine instances with resource limits and usage tracking.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Instant;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::{mpsc, RwLock};
12use tracing::{error, info, warn};
13use uuid::Uuid;
14
15use crate::context::ContextOrchestrator;
16use crate::engine::Engine;
17use crate::event::Event;
18use crate::metrics::Metrics;
19use crate::persistence::{StateStore, StoreError};
20
21/// Unique identifier for a tenant
22#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
23pub struct TenantId(pub String);
24
25impl TenantId {
26    pub fn new(id: impl Into<String>) -> Self {
27        Self(id.into())
28    }
29
30    pub fn generate() -> Self {
31        Self(Uuid::new_v4().to_string())
32    }
33
34    pub fn as_str(&self) -> &str {
35        &self.0
36    }
37}
38
39impl std::fmt::Display for TenantId {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        write!(f, "{}", self.0)
42    }
43}
44
45/// Resource limits for a tenant
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct TenantQuota {
48    /// Maximum number of pipelines
49    pub max_pipelines: usize,
50    /// Maximum events per second
51    pub max_events_per_second: u64,
52    /// Maximum streams per pipeline
53    pub max_streams_per_pipeline: usize,
54}
55
56impl Default for TenantQuota {
57    fn default() -> Self {
58        Self {
59            max_pipelines: 10,
60            max_events_per_second: 10_000,
61            max_streams_per_pipeline: 50,
62        }
63    }
64}
65
66impl TenantQuota {
67    /// Free tier limits (30-day trial)
68    pub const fn free() -> Self {
69        Self {
70            max_pipelines: 5,
71            max_events_per_second: 500,
72            max_streams_per_pipeline: 10,
73        }
74    }
75
76    /// Pro tier limits ($49/mo)
77    pub const fn pro() -> Self {
78        Self {
79            max_pipelines: 20,
80            max_events_per_second: 50_000,
81            max_streams_per_pipeline: 100,
82        }
83    }
84
85    /// Business tier limits ($199/mo)
86    pub const fn business() -> Self {
87        Self {
88            max_pipelines: 100,
89            max_events_per_second: 200_000,
90            max_streams_per_pipeline: 200,
91        }
92    }
93
94    /// Enterprise tier limits (custom)
95    pub const fn enterprise() -> Self {
96        Self {
97            max_pipelines: 1000,
98            max_events_per_second: 500_000,
99            max_streams_per_pipeline: 500,
100        }
101    }
102
103    /// Get quota for a tier name string.
104    pub fn for_tier(tier: &str) -> Self {
105        match tier {
106            "pro" => Self::pro(),
107            "business" => Self::business(),
108            "enterprise" => Self::enterprise(),
109            _ => Self::free(),
110        }
111    }
112}
113
114/// Usage statistics for a tenant
115#[derive(Debug, Clone, Default)]
116pub struct TenantUsage {
117    /// Total events processed
118    pub events_processed: u64,
119    /// Events processed in current window (for rate limiting)
120    pub events_in_window: u64,
121    /// Window start time
122    pub window_start: Option<Instant>,
123    /// Number of active pipelines
124    pub active_pipelines: usize,
125    /// Total output events emitted
126    pub output_events_emitted: u64,
127}
128
129impl TenantUsage {
130    /// Record an event and check rate limit. Returns true if within quota.
131    pub fn record_event(&mut self, max_eps: u64) -> bool {
132        self.events_processed += 1;
133
134        let now = Instant::now();
135        match self.window_start {
136            Some(start) if now.duration_since(start).as_secs() < 1 => {
137                self.events_in_window += 1;
138                if max_eps > 0 && self.events_in_window > max_eps {
139                    return false;
140                }
141            }
142            _ => {
143                self.window_start = Some(now);
144                self.events_in_window = 1;
145            }
146        }
147        true
148    }
149
150    pub const fn record_output_event(&mut self) {
151        self.output_events_emitted += 1;
152    }
153}
154
155/// A pipeline deployed by a tenant
156#[derive(Debug)]
157pub struct Pipeline {
158    /// Pipeline identifier
159    pub id: String,
160    /// Human-readable name
161    pub name: String,
162    /// The VPL source code
163    pub source: String,
164    /// The engine running this pipeline (shared with source ingestion loop)
165    pub engine: Arc<tokio::sync::Mutex<Engine>>,
166    /// Output event receiver for this pipeline
167    pub output_rx: mpsc::Receiver<Event>,
168    /// Broadcast sender for log streaming (SSE subscribers).
169    /// Output events are forwarded here after being drained from output_rx.
170    pub log_broadcast: tokio::sync::broadcast::Sender<Event>,
171    /// When the pipeline was created
172    pub created_at: Instant,
173    /// Pipeline status
174    pub status: PipelineStatus,
175    /// Optional context orchestrator for multi-threaded execution
176    pub orchestrator: Option<ContextOrchestrator>,
177    /// Managed connector registry (keeps source connections alive)
178    pub connector_registry: Option<crate::connector::ManagedConnectorRegistry>,
179    /// If set, this pipeline is a copy of a global template (admin-managed, read-only).
180    pub global_template_id: Option<String>,
181}
182
183/// Pipeline status
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub enum PipelineStatus {
186    Running,
187    Stopped,
188    Error(String),
189}
190
191impl std::fmt::Display for PipelineStatus {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        match self {
194            Self::Running => write!(f, "running"),
195            Self::Stopped => write!(f, "stopped"),
196            Self::Error(msg) => write!(f, "error: {msg}"),
197        }
198    }
199}
200
201/// Errors from tenant operations
202#[derive(Debug, thiserror::Error)]
203pub enum TenantError {
204    /// Tenant not found
205    #[error("tenant not found: {0}")]
206    NotFound(String),
207    /// Pipeline not found
208    #[error("pipeline not found: {0}")]
209    PipelineNotFound(String),
210    /// Quota exceeded
211    #[error("quota exceeded: {0}")]
212    QuotaExceeded(String),
213    /// Rate limit exceeded
214    #[error("rate limit exceeded")]
215    RateLimitExceeded,
216    /// Backpressure: queue depth exceeds configured maximum
217    #[error("queue depth {current} exceeds maximum {max}")]
218    BackpressureExceeded {
219        /// Current queue depth
220        current: u64,
221        /// Configured maximum
222        max: u64,
223    },
224    /// Parse error in VPL source
225    #[error("parse error: {0}")]
226    ParseError(#[from] varpulis_parser::ParseError),
227    /// Engine error
228    #[error("engine error: {0}")]
229    EngineError(#[from] crate::engine::error::EngineError),
230    /// Tenant already exists
231    #[error("tenant already exists: {0}")]
232    AlreadyExists(String),
233}
234
235/// Compute a SHA-256 hex hash of a raw API key.
236pub fn hash_api_key(raw: &str) -> String {
237    use sha2::Digest;
238    hex::encode(sha2::Sha256::digest(raw.as_bytes()))
239}
240
241/// A single tenant with its pipelines and quotas
242#[derive(Debug)]
243pub struct Tenant {
244    /// Tenant identifier
245    pub id: TenantId,
246    /// Display name
247    pub name: String,
248    /// SHA-256 hash of the API key (plaintext is never stored)
249    pub api_key_hash: String,
250    /// Resource quotas
251    pub quota: TenantQuota,
252    /// Usage statistics
253    pub usage: TenantUsage,
254    /// Active pipelines
255    pub pipelines: HashMap<String, Pipeline>,
256    /// When the tenant was created
257    pub created_at: Instant,
258    /// Optional WebSocket broadcast for output event relay
259    pub(crate) ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
260    /// Optional topic prefix for Kafka/MQTT/NATS topic isolation
261    pub topic_prefix: Option<String>,
262}
263
264impl Tenant {
265    pub fn new(id: TenantId, name: String, api_key: String, quota: TenantQuota) -> Self {
266        Self {
267            id,
268            name,
269            api_key_hash: hash_api_key(&api_key),
270            quota,
271            usage: TenantUsage::default(),
272            pipelines: HashMap::new(),
273            created_at: Instant::now(),
274            ws_broadcast: None,
275            topic_prefix: None,
276        }
277    }
278
279    /// Create a tenant with a pre-computed hash (e.g. loaded from DB).
280    pub fn new_with_hash(
281        id: TenantId,
282        name: String,
283        api_key_hash: String,
284        quota: TenantQuota,
285    ) -> Self {
286        Self {
287            id,
288            name,
289            api_key_hash,
290            quota,
291            usage: TenantUsage::default(),
292            pipelines: HashMap::new(),
293            created_at: Instant::now(),
294            ws_broadcast: None,
295            topic_prefix: None,
296        }
297    }
298
299    /// Deploy a new pipeline from VPL source code
300    pub async fn deploy_pipeline(
301        &mut self,
302        name: String,
303        source: String,
304    ) -> Result<String, TenantError> {
305        self.deploy_pipeline_with_metrics(name, source, None).await
306    }
307
308    pub async fn deploy_pipeline_with_metrics(
309        &mut self,
310        name: String,
311        source: String,
312        prometheus_metrics: Option<Metrics>,
313    ) -> Result<String, TenantError> {
314        // Check pipeline quota
315        if self.pipelines.len() >= self.quota.max_pipelines {
316            return Err(TenantError::QuotaExceeded(format!(
317                "max pipelines ({}) reached",
318                self.quota.max_pipelines
319            )));
320        }
321
322        // Parse the VPL source
323        let program = varpulis_parser::parse(&source)?;
324
325        // Check streams quota
326        let stream_count = program
327            .statements
328            .iter()
329            .filter(|s| matches!(&s.node, varpulis_core::ast::Stmt::StreamDecl { .. }))
330            .count();
331        if stream_count > self.quota.max_streams_per_pipeline {
332            return Err(TenantError::QuotaExceeded(format!(
333                "pipeline has {} streams, max is {}",
334                stream_count, self.quota.max_streams_per_pipeline
335            )));
336        }
337
338        // Create a new engine
339        let (output_tx, output_rx) = mpsc::channel(1000);
340        let output_tx_for_ctx = output_tx.clone();
341        let mut engine = Engine::new(output_tx);
342        if let Some(m) = prometheus_metrics {
343            engine = engine.with_metrics(m);
344        }
345        if let Some(ref prefix) = self.topic_prefix {
346            engine.set_topic_prefix(prefix);
347        }
348        engine.load(&program)?;
349
350        // Build context orchestrator if the program declares contexts
351        let orchestrator = if engine.has_contexts() {
352            // Context orchestrator connects sinks in each context thread
353            match ContextOrchestrator::build(
354                engine.context_map(),
355                &program,
356                output_tx_for_ctx,
357                1000,
358            ) {
359                Ok(orch) => Some(orch),
360                Err(e) => {
361                    return Err(crate::engine::error::EngineError::Pipeline(format!(
362                        "Failed to build context orchestrator: {e}"
363                    ))
364                    .into());
365                }
366            }
367        } else {
368            // No contexts - connect sinks on the main engine
369            engine.connect_sinks().await?;
370            None
371        };
372
373        // Start source connectors (MQTT/Kafka subscriptions)
374        let bindings = engine.source_bindings().to_vec();
375        let mut connector_registry = None;
376
377        if !bindings.is_empty() {
378            let (event_tx, event_rx) = mpsc::channel::<Event>(10_000);
379
380            let mut registry = crate::connector::ManagedConnectorRegistry::from_configs(
381                engine.connector_configs(),
382            )
383            .map_err(|e| {
384                TenantError::EngineError(crate::engine::error::EngineError::Pipeline(format!(
385                    "Registry build error: {e}"
386                )))
387            })?;
388
389            for binding in &bindings {
390                let config = engine.get_connector(&binding.connector_name).cloned();
391                let Some(config) = config else {
392                    return Err(crate::engine::error::EngineError::Pipeline(format!(
393                        "Connector '{}' referenced in .from() but not declared",
394                        binding.connector_name
395                    ))
396                    .into());
397                };
398
399                let topic = binding
400                    .topic_override
401                    .as_deref()
402                    .or(config.topic.as_deref())
403                    .unwrap_or("varpulis/events/#");
404
405                info!(
406                    "Starting {} source: {} topic={}",
407                    config.connector_type, binding.connector_name, topic
408                );
409
410                registry
411                    .start_source(
412                        &binding.connector_name,
413                        topic,
414                        event_tx.clone(),
415                        &binding.extra_params,
416                    )
417                    .await
418                    .map_err(|e| -> TenantError {
419                        crate::engine::error::EngineError::Pipeline(format!(
420                            "Source start error: {e}"
421                        ))
422                        .into()
423                    })?;
424
425                // Create shared sinks for this connector
426                let sink_keys = engine.sink_keys_for_connector(&binding.connector_name);
427                for sink_key in &sink_keys {
428                    let sink_topic = if let Some(t) =
429                        sink_key.strip_prefix(&format!("{}::", binding.connector_name))
430                    {
431                        t.to_string()
432                    } else {
433                        config
434                            .topic
435                            .clone()
436                            .unwrap_or_else(|| format!("{}-output", binding.connector_name))
437                    };
438
439                    let empty_params = std::collections::HashMap::new();
440                    match registry.create_sink(&binding.connector_name, &sink_topic, &empty_params)
441                    {
442                        Ok(sink) => {
443                            engine.inject_sink(sink_key, sink);
444                        }
445                        Err(e) => {
446                            warn!("Failed to create sink for {}: {}", sink_key, e);
447                        }
448                    }
449                }
450            }
451
452            connector_registry = Some(registry);
453
454            // Wrap engine in Arc<Mutex> for sharing with source ingestion loop
455            let engine = Arc::new(tokio::sync::Mutex::new(engine));
456            let engine_for_source = Arc::clone(&engine);
457
458            // Spawn source event ingestion loop
459            tokio::spawn(async move {
460                let mut event_rx = event_rx;
461                while let Some(event) = event_rx.recv().await {
462                    let mut eng = engine_for_source.lock().await;
463                    if let Err(e) = eng.process(event).await {
464                        warn!("Source event processing error: {}", e);
465                    }
466                }
467                info!("Source ingestion loop ended");
468            });
469
470            let id = Uuid::new_v4().to_string();
471            let (log_tx, _) = tokio::sync::broadcast::channel(256);
472
473            // Spawn output event drain task: connector-sourced pipelines produce
474            // output events via the engine's output_tx, but nobody calls
475            // process_event() to drain them. This task continuously drains
476            // output_rx → log_broadcast so WebSocket/SSE subscribers receive them.
477            let log_tx_for_drain = log_tx.clone();
478            let ws_tx_for_drain = self.ws_broadcast.clone();
479            tokio::spawn(async move {
480                let mut output_rx = output_rx;
481                while let Some(ev) = output_rx.recv().await {
482                    let _ = log_tx_for_drain.send(ev.clone());
483                    // Forward to WebSocket relay channel if configured
484                    if let Some(ref ws_tx) = ws_tx_for_drain {
485                        let msg = serde_json::json!({
486                            "type": "output_event",
487                            "event_type": ev.event_type.to_string(),
488                            "data": ev.data,
489                            "timestamp": ev.timestamp.to_rfc3339(),
490                        });
491                        if let Ok(json) = serde_json::to_string(&msg) {
492                            if ws_tx.send(json).is_err() {
493                                tracing::debug!("No WS subscribers for drain output event");
494                            }
495                        }
496                    }
497                }
498            });
499
500            // The real output_rx was moved into the drain task above;
501            // create a placeholder for the Pipeline struct.
502            let (_drain_tx, placeholder_rx) = mpsc::channel(1);
503
504            let pipeline = Pipeline {
505                id: id.clone(),
506                name,
507                source,
508                engine,
509                output_rx: placeholder_rx,
510                log_broadcast: log_tx,
511                created_at: Instant::now(),
512                status: PipelineStatus::Running,
513                orchestrator,
514                connector_registry,
515                global_template_id: None,
516            };
517
518            self.pipelines.insert(id.clone(), pipeline);
519            self.usage.active_pipelines = self.pipelines.len();
520
521            return Ok(id);
522        }
523
524        // No source bindings — standard path without Arc<Mutex>
525        let engine = Arc::new(tokio::sync::Mutex::new(engine));
526        let id = Uuid::new_v4().to_string();
527        let (log_tx, _) = tokio::sync::broadcast::channel(256);
528        let pipeline = Pipeline {
529            id: id.clone(),
530            name,
531            source,
532            engine,
533            output_rx,
534            log_broadcast: log_tx,
535            created_at: Instant::now(),
536            status: PipelineStatus::Running,
537            orchestrator,
538            connector_registry,
539            global_template_id: None,
540        };
541
542        self.pipelines.insert(id.clone(), pipeline);
543        self.usage.active_pipelines = self.pipelines.len();
544
545        Ok(id)
546    }
547
548    /// Remove a pipeline
549    pub fn remove_pipeline(&mut self, pipeline_id: &str) -> Result<(), TenantError> {
550        self.pipelines
551            .remove(pipeline_id)
552            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
553        self.usage.active_pipelines = self.pipelines.len();
554        Ok(())
555    }
556
557    /// Process an event through a specific pipeline
558    pub async fn process_event(
559        &mut self,
560        pipeline_id: &str,
561        event: Event,
562    ) -> Result<Vec<Event>, TenantError> {
563        // Check rate limit
564        if !self.usage.record_event(self.quota.max_events_per_second) {
565            return Err(TenantError::RateLimitExceeded);
566        }
567
568        let pipeline = self
569            .pipelines
570            .get_mut(pipeline_id)
571            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
572
573        if pipeline.status != PipelineStatus::Running {
574            return Err(crate::engine::error::EngineError::Pipeline(format!(
575                "pipeline is {}",
576                pipeline.status
577            ))
578            .into());
579        }
580
581        if let Some(ref orchestrator) = pipeline.orchestrator {
582            // Route through context orchestrator — try non-blocking first
583            let shared_event = std::sync::Arc::new(event);
584            match orchestrator.try_process(shared_event) {
585                Ok(()) => {}
586                Err(crate::context::DispatchError::ChannelFull(msg)) => {
587                    // Extract the event from the ContextMessage and retry with await
588                    if let crate::context::ContextMessage::Event(event) = msg {
589                        orchestrator
590                            .process(event)
591                            .await
592                            .map_err(|e| -> TenantError {
593                                crate::engine::error::EngineError::Pipeline(e).into()
594                            })?;
595                    }
596                }
597                Err(crate::context::DispatchError::ChannelClosed(_)) => {
598                    return Err(crate::engine::error::EngineError::Pipeline(
599                        "Context channel closed".to_string(),
600                    )
601                    .into());
602                }
603            }
604        } else {
605            // Direct engine processing (no contexts, zero overhead)
606            pipeline.engine.lock().await.process(event).await?;
607        }
608
609        // Drain output events from the channel and broadcast to log subscribers
610        let mut output_events = Vec::new();
611        while let Ok(ev) = pipeline.output_rx.try_recv() {
612            // Best-effort broadcast to SSE log subscribers (ignore if no receivers)
613            let _ = pipeline.log_broadcast.send(ev.clone());
614            output_events.push(ev);
615        }
616
617        Ok(output_events)
618    }
619
620    /// Subscribe to a pipeline's output event stream for log streaming.
621    /// Returns a broadcast receiver that yields events as they are produced.
622    pub fn subscribe_pipeline_logs(
623        &self,
624        pipeline_id: &str,
625    ) -> Result<tokio::sync::broadcast::Receiver<Event>, TenantError> {
626        let pipeline = self
627            .pipelines
628            .get(pipeline_id)
629            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
630        Ok(pipeline.log_broadcast.subscribe())
631    }
632
633    /// Create a checkpoint of a pipeline's engine state.
634    pub async fn checkpoint_pipeline(
635        &self,
636        pipeline_id: &str,
637    ) -> Result<crate::persistence::EngineCheckpoint, TenantError> {
638        let pipeline = self
639            .pipelines
640            .get(pipeline_id)
641            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
642
643        let engine = pipeline.engine.lock().await;
644        Ok(engine.create_checkpoint())
645    }
646
647    /// Restore a pipeline's engine state from a checkpoint.
648    pub async fn restore_pipeline(
649        &mut self,
650        pipeline_id: &str,
651        checkpoint: &crate::persistence::EngineCheckpoint,
652    ) -> Result<(), TenantError> {
653        let pipeline = self
654            .pipelines
655            .get_mut(pipeline_id)
656            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
657
658        let mut engine = pipeline.engine.lock().await;
659        engine
660            .restore_checkpoint(checkpoint)
661            .map_err(crate::engine::error::EngineError::Store)?;
662        Ok(())
663    }
664
665    /// Reload a pipeline with new VPL source
666    pub async fn reload_pipeline(
667        &mut self,
668        pipeline_id: &str,
669        source: String,
670    ) -> Result<(), TenantError> {
671        let program = varpulis_parser::parse(&source)?;
672
673        let pipeline = self
674            .pipelines
675            .get_mut(pipeline_id)
676            .ok_or_else(|| TenantError::PipelineNotFound(pipeline_id.to_string()))?;
677
678        pipeline.engine.lock().await.reload(&program)?;
679        pipeline.source = source;
680
681        Ok(())
682    }
683}
684
685/// Manages all tenants in the system
686pub struct TenantManager {
687    tenants: HashMap<TenantId, Tenant>,
688    /// API key hash → TenantId lookup (plaintext keys are never stored)
689    api_key_index: HashMap<String, TenantId>,
690    /// Optional persistent state store
691    store: Option<Arc<dyn StateStore>>,
692    /// Shared Prometheus metrics (passed to engines on deploy)
693    prometheus_metrics: Option<Metrics>,
694    /// Maximum queue depth before rejecting events with backpressure (0 = unlimited)
695    max_queue_depth: u64,
696    /// Atomic counter of events currently being processed across all pipelines
697    pending_events: Arc<AtomicU64>,
698    /// Optional global output event broadcast for WebSocket relay.
699    /// When set, all pipeline output events are forwarded here as JSON strings.
700    ws_broadcast: Option<Arc<tokio::sync::broadcast::Sender<String>>>,
701}
702
703impl std::fmt::Debug for TenantManager {
704    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
705        f.debug_struct("TenantManager")
706            .field("tenant_count", &self.tenants.len())
707            .field("max_queue_depth", &self.max_queue_depth)
708            .field("pending_events", &self.pending_events)
709            .field("has_store", &self.store.is_some())
710            .field("has_ws_broadcast", &self.ws_broadcast.is_some())
711            .finish_non_exhaustive()
712    }
713}
714
715impl TenantManager {
716    pub fn new() -> Self {
717        Self {
718            tenants: HashMap::new(),
719            api_key_index: HashMap::new(),
720            store: None,
721            prometheus_metrics: None,
722            max_queue_depth: 0,
723            pending_events: Arc::new(AtomicU64::new(0)),
724            ws_broadcast: None,
725        }
726    }
727
728    /// Create a new tenant manager backed by a state store
729    pub fn with_store(store: Arc<dyn StateStore>) -> Self {
730        Self {
731            tenants: HashMap::new(),
732            api_key_index: HashMap::new(),
733            store: Some(store),
734            prometheus_metrics: None,
735            max_queue_depth: 0,
736            pending_events: Arc::new(AtomicU64::new(0)),
737            ws_broadcast: None,
738        }
739    }
740
741    /// Set Prometheus metrics to be shared with all engines
742    pub fn set_prometheus_metrics(&mut self, metrics: Metrics) {
743        self.prometheus_metrics = Some(metrics);
744    }
745
746    /// Set maximum queue depth for backpressure (0 = unlimited)
747    pub const fn set_max_queue_depth(&mut self, max_depth: u64) {
748        self.max_queue_depth = max_depth;
749    }
750
751    /// Set a WebSocket broadcast channel for relaying output events.
752    /// When set, all connector-sourced pipeline output events are forwarded
753    /// to this channel as serialized JSON strings.
754    pub fn set_ws_broadcast(&mut self, tx: Arc<tokio::sync::broadcast::Sender<String>>) {
755        for tenant in self.tenants.values_mut() {
756            tenant.ws_broadcast = Some(Arc::clone(&tx));
757        }
758        self.ws_broadcast = Some(tx);
759    }
760
761    /// Get the maximum queue depth setting
762    pub const fn max_queue_depth(&self) -> u64 {
763        self.max_queue_depth
764    }
765
766    /// Get the current number of pending events being processed
767    pub fn pending_event_count(&self) -> u64 {
768        self.pending_events.load(Ordering::Relaxed)
769    }
770
771    /// Get a clone of the pending events counter (for sharing with API handlers)
772    pub fn pending_events_counter(&self) -> Arc<AtomicU64> {
773        Arc::clone(&self.pending_events)
774    }
775
776    /// Check backpressure: returns Err if queue depth exceeds maximum
777    pub fn check_backpressure(&self) -> Result<(), TenantError> {
778        if self.max_queue_depth == 0 {
779            return Ok(());
780        }
781        let current = self.pending_events.load(Ordering::Relaxed);
782        if current >= self.max_queue_depth {
783            return Err(TenantError::BackpressureExceeded {
784                current,
785                max: self.max_queue_depth,
786            });
787        }
788        Ok(())
789    }
790
791    /// Compute the queue pressure ratio (0.0 to 1.0+). Returns 0.0 if max_queue_depth is 0.
792    pub fn queue_pressure_ratio(&self) -> f64 {
793        if self.max_queue_depth == 0 {
794            return 0.0;
795        }
796        self.pending_events.load(Ordering::Relaxed) as f64 / self.max_queue_depth as f64
797    }
798
799    /// Create a new tenant. The raw API key is hashed before storage.
800    pub fn create_tenant(
801        &mut self,
802        name: String,
803        api_key: String,
804        quota: TenantQuota,
805    ) -> Result<TenantId, TenantError> {
806        let key_hash = hash_api_key(&api_key);
807        if self.api_key_index.contains_key(&key_hash) {
808            return Err(TenantError::AlreadyExists(
809                "API key already in use".to_string(),
810            ));
811        }
812
813        let id = TenantId::generate();
814        let mut tenant = Tenant::new_with_hash(id.clone(), name, key_hash.clone(), quota);
815        tenant.ws_broadcast = self.ws_broadcast.clone();
816        self.tenants.insert(id.clone(), tenant);
817        self.api_key_index.insert(key_hash, id.clone());
818        self.persist_if_needed(&id);
819        Ok(id)
820    }
821
822    /// Create a tenant with a specific ID (e.g. DB org UUID) and pre-hashed API key.
823    pub fn create_tenant_with_id(
824        &mut self,
825        id: TenantId,
826        name: String,
827        api_key_hash: String,
828        quota: TenantQuota,
829    ) -> Result<(), TenantError> {
830        if self.api_key_index.contains_key(&api_key_hash) {
831            return Err(TenantError::AlreadyExists(
832                "API key already in use".to_string(),
833            ));
834        }
835
836        let mut tenant = Tenant::new_with_hash(id.clone(), name, api_key_hash.clone(), quota);
837        tenant.ws_broadcast = self.ws_broadcast.clone();
838        self.tenants.insert(id.clone(), tenant);
839        self.api_key_index.insert(api_key_hash, id);
840        Ok(())
841    }
842
843    /// Update a tenant's quota (e.g., after admin limit change).
844    pub fn update_tenant_quota(&mut self, id: &TenantId, quota: TenantQuota) -> bool {
845        if let Some(tenant) = self.tenants.get_mut(id) {
846            tenant.quota = quota;
847            true
848        } else {
849            false
850        }
851    }
852
853    /// Get a tenant by raw API key (hashes internally before lookup).
854    pub fn get_tenant_by_api_key(&self, api_key: &str) -> Option<&TenantId> {
855        let key_hash = hash_api_key(api_key);
856        self.api_key_index.get(&key_hash)
857    }
858
859    /// Get a tenant by ID
860    pub fn get_tenant(&self, id: &TenantId) -> Option<&Tenant> {
861        self.tenants.get(id)
862    }
863
864    /// Get a mutable tenant by ID
865    pub fn get_tenant_mut(&mut self, id: &TenantId) -> Option<&mut Tenant> {
866        self.tenants.get_mut(id)
867    }
868
869    /// Deploy a pipeline on a tenant, passing through Prometheus metrics
870    pub async fn deploy_pipeline_on_tenant(
871        &mut self,
872        tenant_id: &TenantId,
873        name: String,
874        source: String,
875    ) -> Result<String, TenantError> {
876        let metrics = self.prometheus_metrics.clone();
877        let ws_broadcast = self.ws_broadcast.clone();
878        let tenant = self
879            .tenants
880            .get_mut(tenant_id)
881            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
882        // Ensure tenant has the WS broadcast channel
883        if tenant.ws_broadcast.is_none() {
884            tenant.ws_broadcast = ws_broadcast;
885        }
886        tenant
887            .deploy_pipeline_with_metrics(name, source, metrics)
888            .await
889    }
890
891    /// Deploy a global pipeline copy on a tenant (sets global_template_id).
892    pub async fn deploy_global_pipeline_on_tenant(
893        &mut self,
894        tenant_id: &TenantId,
895        name: String,
896        source: String,
897        template_id: String,
898    ) -> Result<String, TenantError> {
899        let pipeline_id = self
900            .deploy_pipeline_on_tenant(tenant_id, name, source)
901            .await?;
902        // Set the global_template_id on the newly deployed pipeline
903        if let Some(tenant) = self.tenants.get_mut(tenant_id) {
904            if let Some(pipeline) = tenant.pipelines.get_mut(&pipeline_id) {
905                pipeline.global_template_id = Some(template_id);
906            }
907        }
908        Ok(pipeline_id)
909    }
910
911    /// Process an event through a pipeline with backpressure checking.
912    ///
913    /// Checks queue depth before processing, increments a pending counter during
914    /// processing, and decrements it afterward. Returns `BackpressureExceeded`
915    /// if the queue depth exceeds `max_queue_depth`.
916    pub async fn process_event_with_backpressure(
917        &mut self,
918        tenant_id: &TenantId,
919        pipeline_id: &str,
920        event: Event,
921    ) -> Result<Vec<Event>, TenantError> {
922        self.check_backpressure()?;
923        self.pending_events.fetch_add(1, Ordering::Relaxed);
924        let tenant = self
925            .tenants
926            .get_mut(tenant_id)
927            .ok_or_else(|| TenantError::NotFound(tenant_id.to_string()))?;
928        let result = tenant.process_event(pipeline_id, event).await;
929        self.pending_events.fetch_sub(1, Ordering::Relaxed);
930
931        // Update pressure ratio metric
932        if let Some(ref metrics) = self.prometheus_metrics {
933            metrics
934                .queue_pressure_ratio
935                .with_label_values(&["_all"])
936                .set(self.queue_pressure_ratio());
937        }
938
939        result
940    }
941
942    /// Remove a tenant and all its pipelines
943    pub fn remove_tenant(&mut self, id: &TenantId) -> Result<(), TenantError> {
944        let tenant = self
945            .tenants
946            .remove(id)
947            .ok_or_else(|| TenantError::NotFound(id.to_string()))?;
948        self.api_key_index.remove(&tenant.api_key_hash);
949        if let Some(ref store) = self.store {
950            if let Err(e) = Self::delete_tenant_state(store.as_ref(), id) {
951                warn!("Failed to delete persisted state for tenant {}: {}", id, e);
952            }
953        }
954        Ok(())
955    }
956
957    /// List all tenants
958    pub fn list_tenants(&self) -> Vec<&Tenant> {
959        self.tenants.values().collect()
960    }
961
962    /// Get total number of tenants
963    pub fn tenant_count(&self) -> usize {
964        self.tenants.len()
965    }
966
967    /// Collect pipeline metrics across all tenants (pipeline_name, events_in, events_out).
968    pub async fn collect_pipeline_metrics(&self) -> Vec<(String, u64, u64)> {
969        let mut metrics = Vec::new();
970        for tenant in self.tenants.values() {
971            for pipeline in tenant.pipelines.values() {
972                let engine = pipeline.engine.lock().await;
973                let (events_in, events_out) = engine.event_counters();
974                metrics.push((pipeline.name.clone(), events_in, events_out));
975            }
976        }
977        metrics
978    }
979
980    /// Collect connector health across all tenants.
981    ///
982    /// Returns `(pipeline_name, connector_name, connector_type, health_report)` tuples.
983    pub fn collect_connector_health(
984        &self,
985    ) -> Vec<(
986        String,
987        String,
988        String,
989        crate::connector::ConnectorHealthReport,
990    )> {
991        let mut results = Vec::new();
992        for tenant in self.tenants.values() {
993            for pipeline in tenant.pipelines.values() {
994                if let Some(ref registry) = pipeline.connector_registry {
995                    for (conn_name, conn_type, report) in registry.health_reports() {
996                        results.push((
997                            pipeline.name.clone(),
998                            conn_name.to_string(),
999                            conn_type.to_string(),
1000                            report,
1001                        ));
1002                    }
1003                }
1004            }
1005        }
1006        results
1007    }
1008
1009    /// Persist a tenant's current state to the store (if configured)
1010    pub fn persist_if_needed(&self, tenant_id: &TenantId) {
1011        if let Some(ref store) = self.store {
1012            if let Some(tenant) = self.tenants.get(tenant_id) {
1013                if let Err(e) = Self::persist_tenant_to_store(store.as_ref(), tenant) {
1014                    warn!("Failed to persist tenant {}: {}", tenant_id, e);
1015                }
1016            }
1017        }
1018    }
1019
1020    /// Recover all tenants from the state store
1021    ///
1022    /// Returns the number of tenants successfully recovered.
1023    pub fn recover(&mut self) -> Result<usize, StoreError> {
1024        let store = match &self.store {
1025            Some(s) => Arc::clone(s),
1026            None => return Ok(0),
1027        };
1028
1029        // Load tenant index
1030        let index_data = match store.get("tenants:index")? {
1031            Some(data) => data,
1032            None => return Ok(0),
1033        };
1034
1035        let tenant_ids: Vec<String> = serde_json::from_slice(&index_data)
1036            .map_err(|e| StoreError::SerializationError(e.to_string()))?;
1037
1038        let mut recovered = 0;
1039        let mut failed = 0;
1040
1041        for tid in &tenant_ids {
1042            let key = format!("tenant:{tid}");
1043            let data = match store.get(&key)? {
1044                Some(d) => d,
1045                None => {
1046                    warn!("Tenant {} listed in index but not found in store", tid);
1047                    failed += 1;
1048                    continue;
1049                }
1050            };
1051
1052            let snapshot: TenantSnapshot = match serde_json::from_slice(&data) {
1053                Ok(s) => s,
1054                Err(e) => {
1055                    warn!("Failed to deserialize tenant {}: {}", tid, e);
1056                    failed += 1;
1057                    continue;
1058                }
1059            };
1060
1061            match Self::restore_tenant_from_snapshot(snapshot) {
1062                Ok(tenant) => {
1063                    let tenant_id = tenant.id.clone();
1064                    self.api_key_index
1065                        .insert(tenant.api_key_hash.clone(), tenant_id.clone());
1066                    let pipeline_count = tenant.pipelines.len();
1067                    self.tenants.insert(tenant_id.clone(), tenant);
1068                    info!(
1069                        "Recovered tenant {} with {} pipeline(s)",
1070                        tenant_id, pipeline_count
1071                    );
1072                    recovered += 1;
1073                }
1074                Err(e) => {
1075                    warn!("Failed to restore tenant {}: {}", tid, e);
1076                    failed += 1;
1077                }
1078            }
1079        }
1080
1081        if failed > 0 {
1082            warn!(
1083                "Recovery complete: {} recovered, {} failed",
1084                recovered, failed
1085            );
1086        }
1087
1088        Ok(recovered)
1089    }
1090
1091    fn persist_tenant_to_store(store: &dyn StateStore, tenant: &Tenant) -> Result<(), StoreError> {
1092        let snapshot = tenant.snapshot();
1093        let data = serde_json::to_vec(&snapshot)
1094            .map_err(|e| StoreError::SerializationError(e.to_string()))?;
1095        let key = format!("tenant:{}", snapshot.id);
1096        store.put(&key, &data)?;
1097
1098        // Update tenant index
1099        Self::update_tenant_index_add(store, &snapshot.id)?;
1100
1101        store.flush()
1102    }
1103
1104    fn delete_tenant_state(store: &dyn StateStore, id: &TenantId) -> Result<(), StoreError> {
1105        let key = format!("tenant:{}", id.0);
1106        store.delete(&key)?;
1107
1108        Self::update_tenant_index_remove(store, &id.0)?;
1109
1110        store.flush()
1111    }
1112
1113    fn update_tenant_index_add(store: &dyn StateStore, tenant_id: &str) -> Result<(), StoreError> {
1114        let mut ids = Self::load_tenant_index(store)?;
1115        let id_str = tenant_id.to_string();
1116        if !ids.contains(&id_str) {
1117            ids.push(id_str);
1118        }
1119        let data =
1120            serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1121        store.put("tenants:index", &data)
1122    }
1123
1124    fn update_tenant_index_remove(
1125        store: &dyn StateStore,
1126        tenant_id: &str,
1127    ) -> Result<(), StoreError> {
1128        let mut ids = Self::load_tenant_index(store)?;
1129        ids.retain(|id| id != tenant_id);
1130        let data =
1131            serde_json::to_vec(&ids).map_err(|e| StoreError::SerializationError(e.to_string()))?;
1132        store.put("tenants:index", &data)
1133    }
1134
1135    fn load_tenant_index(store: &dyn StateStore) -> Result<Vec<String>, StoreError> {
1136        match store.get("tenants:index")? {
1137            Some(data) => serde_json::from_slice(&data)
1138                .map_err(|e| StoreError::SerializationError(e.to_string())),
1139            None => Ok(Vec::new()),
1140        }
1141    }
1142
1143    fn restore_tenant_from_snapshot(snapshot: TenantSnapshot) -> Result<Tenant, TenantError> {
1144        let tenant_id = TenantId::new(&snapshot.id);
1145
1146        let mut tenant = Tenant::new_with_hash(
1147            tenant_id,
1148            snapshot.name,
1149            snapshot.api_key_hash,
1150            snapshot.quota,
1151        );
1152        tenant.usage.events_processed = snapshot.events_processed;
1153        tenant.usage.output_events_emitted = snapshot.output_events_emitted;
1154        tenant.usage.events_in_window = snapshot.events_in_window;
1155
1156        for ps in snapshot.pipelines {
1157            match Self::restore_pipeline_from_snapshot(ps.clone()) {
1158                Ok(pipeline) => {
1159                    tenant.pipelines.insert(pipeline.id.clone(), pipeline);
1160                }
1161                Err(e) => {
1162                    warn!(
1163                        "Failed to restore pipeline '{}' ({}): {}",
1164                        ps.name, ps.id, e
1165                    );
1166                }
1167            }
1168        }
1169
1170        tenant.usage.active_pipelines = tenant.pipelines.len();
1171        Ok(tenant)
1172    }
1173
1174    fn restore_pipeline_from_snapshot(snapshot: PipelineSnapshot) -> Result<Pipeline, TenantError> {
1175        let program = varpulis_parser::parse(&snapshot.source)?;
1176
1177        let (output_tx, output_rx) = mpsc::channel(1000);
1178        let mut engine = Engine::new(output_tx);
1179        engine.load(&program)?;
1180
1181        let (log_tx, _) = tokio::sync::broadcast::channel(256);
1182        Ok(Pipeline {
1183            id: snapshot.id,
1184            name: snapshot.name,
1185            source: snapshot.source,
1186            engine: Arc::new(tokio::sync::Mutex::new(engine)),
1187            output_rx,
1188            log_broadcast: log_tx,
1189            created_at: Instant::now(),
1190            status: snapshot.status,
1191            orchestrator: None,
1192            connector_registry: None,
1193            global_template_id: snapshot.global_template_id,
1194        })
1195    }
1196}
1197
1198impl Default for TenantManager {
1199    fn default() -> Self {
1200        Self::new()
1201    }
1202}
1203
1204/// Thread-safe tenant manager for use in async server context
1205pub type SharedTenantManager = Arc<RwLock<TenantManager>>;
1206
1207/// Create a new shared tenant manager
1208pub fn shared_tenant_manager() -> SharedTenantManager {
1209    Arc::new(RwLock::new(TenantManager::new()))
1210}
1211
1212/// Create a shared tenant manager backed by a state store, recovering any persisted state
1213pub fn shared_tenant_manager_with_store(store: Arc<dyn StateStore>) -> SharedTenantManager {
1214    let mut mgr = TenantManager::with_store(store);
1215    match mgr.recover() {
1216        Ok(count) if count > 0 => {
1217            info!("Recovered {} tenant(s) from persistent state", count);
1218        }
1219        Ok(_) => {
1220            info!("No persisted tenant state found, starting fresh");
1221        }
1222        Err(e) => {
1223            error!("Failed to recover tenant state: {}", e);
1224        }
1225    }
1226    Arc::new(RwLock::new(mgr))
1227}
1228
1229// =============================================================================
1230// Snapshot Types for Persistence
1231// =============================================================================
1232
1233/// Serializable snapshot of a tenant's state
1234#[derive(Debug, Clone, Serialize, Deserialize)]
1235pub struct TenantSnapshot {
1236    pub id: String,
1237    pub name: String,
1238    /// SHA-256 hash of the API key (never stores plaintext)
1239    #[serde(alias = "api_key")]
1240    pub api_key_hash: String,
1241    pub quota: TenantQuota,
1242    pub events_processed: u64,
1243    #[serde(alias = "alerts_generated")]
1244    pub output_events_emitted: u64,
1245    pub pipelines: Vec<PipelineSnapshot>,
1246    #[serde(default)]
1247    pub created_at_ms: Option<i64>,
1248    #[serde(default)]
1249    pub events_in_window: u64,
1250}
1251
1252/// Serializable snapshot of a pipeline
1253#[derive(Debug, Clone, Serialize, Deserialize)]
1254pub struct PipelineSnapshot {
1255    pub id: String,
1256    pub name: String,
1257    pub source: String,
1258    pub status: PipelineStatus,
1259    #[serde(default)]
1260    pub global_template_id: Option<String>,
1261}
1262
1263impl Pipeline {
1264    /// Create a serializable snapshot of this pipeline
1265    pub fn snapshot(&self) -> PipelineSnapshot {
1266        PipelineSnapshot {
1267            id: self.id.clone(),
1268            name: self.name.clone(),
1269            source: self.source.clone(),
1270            status: self.status.clone(),
1271            global_template_id: self.global_template_id.clone(),
1272        }
1273    }
1274}
1275
1276impl Tenant {
1277    /// Create a serializable snapshot of this tenant and all its pipelines
1278    pub fn snapshot(&self) -> TenantSnapshot {
1279        TenantSnapshot {
1280            id: self.id.0.clone(),
1281            name: self.name.clone(),
1282            api_key_hash: self.api_key_hash.clone(),
1283            quota: self.quota.clone(),
1284            events_processed: self.usage.events_processed,
1285            output_events_emitted: self.usage.output_events_emitted,
1286            pipelines: self.pipelines.values().map(|p| p.snapshot()).collect(),
1287            created_at_ms: Some(chrono::Utc::now().timestamp_millis()),
1288            events_in_window: self.usage.events_in_window,
1289        }
1290    }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295    use super::*;
1296
1297    #[test]
1298    fn test_tenant_id_generate() {
1299        let id1 = TenantId::generate();
1300        let id2 = TenantId::generate();
1301        assert_ne!(id1, id2);
1302    }
1303
1304    #[test]
1305    fn test_tenant_id_display() {
1306        let id = TenantId::new("test-123");
1307        assert_eq!(format!("{id}"), "test-123");
1308        assert_eq!(id.as_str(), "test-123");
1309    }
1310
1311    #[test]
1312    fn test_tenant_quota_tiers() {
1313        let free = TenantQuota::free();
1314        let pro = TenantQuota::pro();
1315        let enterprise = TenantQuota::enterprise();
1316
1317        assert!(free.max_pipelines < pro.max_pipelines);
1318        assert!(pro.max_pipelines < enterprise.max_pipelines);
1319        assert!(free.max_events_per_second < pro.max_events_per_second);
1320        assert!(pro.max_events_per_second < enterprise.max_events_per_second);
1321    }
1322
1323    #[test]
1324    fn test_usage_record_event() {
1325        let mut usage = TenantUsage::default();
1326        assert!(usage.record_event(100));
1327        assert_eq!(usage.events_processed, 1);
1328        assert_eq!(usage.events_in_window, 1);
1329    }
1330
1331    #[test]
1332    fn test_usage_rate_limit() {
1333        let mut usage = TenantUsage::default();
1334        // Allow 2 events per second
1335        assert!(usage.record_event(2));
1336        assert!(usage.record_event(2));
1337        // Third should be rejected
1338        assert!(!usage.record_event(2));
1339    }
1340
1341    #[test]
1342    fn test_usage_no_rate_limit() {
1343        let mut usage = TenantUsage::default();
1344        // 0 means disabled
1345        for _ in 0..1000 {
1346            assert!(usage.record_event(0));
1347        }
1348    }
1349
1350    #[test]
1351    fn test_tenant_manager_create() {
1352        let mut mgr = TenantManager::new();
1353        let id = mgr
1354            .create_tenant("Test Corp".into(), "key-123".into(), TenantQuota::free())
1355            .unwrap();
1356
1357        assert_eq!(mgr.tenant_count(), 1);
1358        assert!(mgr.get_tenant(&id).is_some());
1359        assert_eq!(mgr.get_tenant(&id).unwrap().name, "Test Corp");
1360    }
1361
1362    #[test]
1363    fn test_tenant_manager_api_key_lookup() {
1364        let mut mgr = TenantManager::new();
1365        let id = mgr
1366            .create_tenant("Test".into(), "my-key".into(), TenantQuota::default())
1367            .unwrap();
1368
1369        let found = mgr.get_tenant_by_api_key("my-key");
1370        assert_eq!(found, Some(&id));
1371
1372        assert!(mgr.get_tenant_by_api_key("wrong-key").is_none());
1373    }
1374
1375    #[test]
1376    fn test_tenant_manager_duplicate_api_key() {
1377        let mut mgr = TenantManager::new();
1378        mgr.create_tenant("A".into(), "key-1".into(), TenantQuota::default())
1379            .unwrap();
1380        let result = mgr.create_tenant("B".into(), "key-1".into(), TenantQuota::default());
1381        assert!(result.is_err());
1382    }
1383
1384    #[test]
1385    fn test_tenant_manager_remove() {
1386        let mut mgr = TenantManager::new();
1387        let id = mgr
1388            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1389            .unwrap();
1390
1391        mgr.remove_tenant(&id).unwrap();
1392        assert_eq!(mgr.tenant_count(), 0);
1393        assert!(mgr.get_tenant_by_api_key("key-1").is_none());
1394    }
1395
1396    #[tokio::test]
1397    async fn test_tenant_deploy_pipeline() {
1398        let mut mgr = TenantManager::new();
1399        let id = mgr
1400            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1401            .unwrap();
1402
1403        let tenant = mgr.get_tenant_mut(&id).unwrap();
1404        let vpl = r"
1405            stream Alerts = SensorReading
1406                .where(temperature > 100)
1407        ";
1408        let pipeline_id = tenant
1409            .deploy_pipeline("My Pipeline".into(), vpl.into())
1410            .await
1411            .unwrap();
1412        assert_eq!(tenant.pipelines.len(), 1);
1413        assert_eq!(tenant.usage.active_pipelines, 1);
1414        assert_eq!(
1415            tenant.pipelines[&pipeline_id].status,
1416            PipelineStatus::Running
1417        );
1418    }
1419
1420    #[tokio::test]
1421    async fn test_tenant_pipeline_quota() {
1422        let mut mgr = TenantManager::new();
1423        let quota = TenantQuota {
1424            max_pipelines: 1,
1425            max_events_per_second: 100,
1426            max_streams_per_pipeline: 50,
1427        };
1428        let id = mgr
1429            .create_tenant("Test".into(), "key-1".into(), quota)
1430            .unwrap();
1431
1432        let tenant = mgr.get_tenant_mut(&id).unwrap();
1433        let vpl = "stream A = SensorReading .where(x > 1)";
1434        tenant
1435            .deploy_pipeline("P1".into(), vpl.into())
1436            .await
1437            .unwrap();
1438
1439        // Second should fail
1440        let result = tenant.deploy_pipeline("P2".into(), vpl.into()).await;
1441        assert!(result.is_err());
1442    }
1443
1444    #[tokio::test]
1445    async fn test_tenant_remove_pipeline() {
1446        let mut mgr = TenantManager::new();
1447        let id = mgr
1448            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1449            .unwrap();
1450
1451        let tenant = mgr.get_tenant_mut(&id).unwrap();
1452        let vpl = "stream A = SensorReading .where(x > 1)";
1453        let pid = tenant
1454            .deploy_pipeline("P1".into(), vpl.into())
1455            .await
1456            .unwrap();
1457
1458        tenant.remove_pipeline(&pid).unwrap();
1459        assert_eq!(tenant.pipelines.len(), 0);
1460        assert_eq!(tenant.usage.active_pipelines, 0);
1461    }
1462
1463    #[tokio::test]
1464    async fn test_tenant_parse_error() {
1465        let mut mgr = TenantManager::new();
1466        let id = mgr
1467            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1468            .unwrap();
1469
1470        let tenant = mgr.get_tenant_mut(&id).unwrap();
1471        let result = tenant
1472            .deploy_pipeline("Bad".into(), "this is not valid VPL {{{{".into())
1473            .await;
1474        assert!(result.is_err());
1475    }
1476
1477    #[tokio::test]
1478    async fn test_tenant_process_event() {
1479        let mut mgr = TenantManager::new();
1480        let id = mgr
1481            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1482            .unwrap();
1483
1484        let tenant = mgr.get_tenant_mut(&id).unwrap();
1485        let vpl = "stream A = SensorReading .where(temperature > 100)";
1486        let pid = tenant
1487            .deploy_pipeline("P1".into(), vpl.into())
1488            .await
1489            .unwrap();
1490
1491        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1492        tenant.process_event(&pid, event).await.unwrap();
1493        assert_eq!(tenant.usage.events_processed, 1);
1494    }
1495
1496    #[tokio::test]
1497    async fn test_tenant_rate_limit_on_process() {
1498        let mut mgr = TenantManager::new();
1499        let quota = TenantQuota {
1500            max_pipelines: 10,
1501            max_events_per_second: 2,
1502            max_streams_per_pipeline: 50,
1503        };
1504        let id = mgr
1505            .create_tenant("Test".into(), "key-1".into(), quota)
1506            .unwrap();
1507
1508        let tenant = mgr.get_tenant_mut(&id).unwrap();
1509        let vpl = "stream A = SensorReading .where(x > 1)";
1510        let pid = tenant
1511            .deploy_pipeline("P1".into(), vpl.into())
1512            .await
1513            .unwrap();
1514
1515        let event = Event::new("SensorReading").with_field("x", 5);
1516        tenant.process_event(&pid, event.clone()).await.unwrap();
1517        tenant.process_event(&pid, event.clone()).await.unwrap();
1518
1519        // Third should hit rate limit
1520        let result = tenant.process_event(&pid, event).await;
1521        assert!(result.is_err());
1522    }
1523
1524    #[tokio::test]
1525    async fn test_tenant_reload_pipeline() {
1526        let mut mgr = TenantManager::new();
1527        let id = mgr
1528            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1529            .unwrap();
1530
1531        let tenant = mgr.get_tenant_mut(&id).unwrap();
1532        let vpl1 = "stream A = SensorReading .where(x > 1)";
1533        let pid = tenant
1534            .deploy_pipeline("P1".into(), vpl1.into())
1535            .await
1536            .unwrap();
1537
1538        let vpl2 = "stream B = SensorReading .where(x > 50)";
1539        tenant.reload_pipeline(&pid, vpl2.into()).await.unwrap();
1540
1541        assert_eq!(tenant.pipelines[&pid].source, vpl2);
1542    }
1543
1544    #[test]
1545    fn test_pipeline_status_display() {
1546        assert_eq!(format!("{}", PipelineStatus::Running), "running");
1547        assert_eq!(format!("{}", PipelineStatus::Stopped), "stopped");
1548        assert_eq!(
1549            format!("{}", PipelineStatus::Error("oops".into())),
1550            "error: oops"
1551        );
1552    }
1553
1554    #[test]
1555    fn test_tenant_error_display() {
1556        assert!(format!("{}", TenantError::NotFound("t1".into())).contains("t1"));
1557        assert!(format!("{}", TenantError::RateLimitExceeded).contains("rate limit"));
1558        let engine_err = crate::engine::error::EngineError::Compilation("bad".into());
1559        assert!(format!("{}", TenantError::EngineError(engine_err)).contains("bad"));
1560    }
1561
1562    #[test]
1563    fn test_shared_tenant_manager() {
1564        let mgr = shared_tenant_manager();
1565        assert!(Arc::strong_count(&mgr) == 1);
1566    }
1567
1568    #[test]
1569    fn test_tenant_list() {
1570        let mut mgr = TenantManager::new();
1571        mgr.create_tenant("A".into(), "key-a".into(), TenantQuota::default())
1572            .unwrap();
1573        mgr.create_tenant("B".into(), "key-b".into(), TenantQuota::default())
1574            .unwrap();
1575        assert_eq!(mgr.list_tenants().len(), 2);
1576    }
1577
1578    #[test]
1579    fn test_tenant_manager_default() {
1580        let mgr = TenantManager::default();
1581        assert_eq!(mgr.tenant_count(), 0);
1582    }
1583
1584    #[tokio::test]
1585    async fn test_tenant_snapshot_roundtrip() {
1586        let mut mgr = TenantManager::new();
1587        let id = mgr
1588            .create_tenant("Snap Corp".into(), "snap-key".into(), TenantQuota::pro())
1589            .unwrap();
1590
1591        let tenant = mgr.get_tenant_mut(&id).unwrap();
1592        let vpl = "stream A = SensorReading .where(x > 1)";
1593        tenant
1594            .deploy_pipeline("Pipeline1".into(), vpl.into())
1595            .await
1596            .unwrap();
1597        tenant.usage.events_processed = 42;
1598        tenant.usage.output_events_emitted = 7;
1599
1600        let snapshot = tenant.snapshot();
1601        let json = serde_json::to_vec(&snapshot).unwrap();
1602        let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1603
1604        assert_eq!(restored.id, id.0);
1605        assert_eq!(restored.name, "Snap Corp");
1606        assert_eq!(restored.api_key_hash, hash_api_key("snap-key"));
1607        assert_eq!(restored.events_processed, 42);
1608        assert_eq!(restored.output_events_emitted, 7);
1609        assert_eq!(restored.pipelines.len(), 1);
1610        assert_eq!(restored.pipelines[0].name, "Pipeline1");
1611        assert_eq!(restored.pipelines[0].source, vpl);
1612        assert_eq!(restored.pipelines[0].status, PipelineStatus::Running);
1613        assert_eq!(
1614            restored.quota.max_pipelines,
1615            TenantQuota::pro().max_pipelines
1616        );
1617    }
1618
1619    #[tokio::test]
1620    async fn test_tenant_manager_persistence_and_recovery() {
1621        use crate::persistence::MemoryStore;
1622
1623        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1624        let vpl = "stream A = SensorReading .where(x > 1)";
1625
1626        // Create tenants and pipelines with persistence
1627        let (tenant_id, pipeline_name) = {
1628            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1629            let id = mgr
1630                .create_tenant(
1631                    "Persisted Corp".into(),
1632                    "persist-key".into(),
1633                    TenantQuota::default(),
1634                )
1635                .unwrap();
1636
1637            let tenant = mgr.get_tenant_mut(&id).unwrap();
1638            tenant
1639                .deploy_pipeline("Persistent Pipeline".into(), vpl.into())
1640                .await
1641                .unwrap();
1642            tenant.usage.events_processed = 100;
1643            tenant.usage.output_events_emitted = 5;
1644            mgr.persist_if_needed(&id);
1645
1646            (id.0.clone(), "Persistent Pipeline".to_string())
1647        };
1648
1649        // Recover into a new manager
1650        let mut mgr2 = TenantManager::with_store(Arc::clone(&store));
1651        let recovered = mgr2.recover().unwrap();
1652        assert_eq!(recovered, 1);
1653        assert_eq!(mgr2.tenant_count(), 1);
1654
1655        // Verify tenant data
1656        let tid = TenantId::new(&tenant_id);
1657        let tenant = mgr2.get_tenant(&tid).unwrap();
1658        assert_eq!(tenant.name, "Persisted Corp");
1659        assert_eq!(tenant.api_key_hash, hash_api_key("persist-key"));
1660        assert_eq!(tenant.usage.events_processed, 100);
1661        assert_eq!(tenant.usage.output_events_emitted, 5);
1662        assert_eq!(tenant.pipelines.len(), 1);
1663
1664        let pipeline = tenant.pipelines.values().next().unwrap();
1665        assert_eq!(pipeline.name, pipeline_name);
1666        assert_eq!(pipeline.source, vpl);
1667        assert_eq!(pipeline.status, PipelineStatus::Running);
1668
1669        // Verify API key index was rebuilt
1670        assert_eq!(mgr2.get_tenant_by_api_key("persist-key"), Some(&tid));
1671    }
1672
1673    #[tokio::test]
1674    async fn test_persistence_survives_restart() {
1675        use crate::persistence::MemoryStore;
1676
1677        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1678        let vpl = "stream A = SensorReading .where(x > 1)";
1679
1680        // Phase 1: Create data
1681        {
1682            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1683            let id1 = mgr
1684                .create_tenant("Tenant A".into(), "key-a".into(), TenantQuota::free())
1685                .unwrap();
1686            let id2 = mgr
1687                .create_tenant("Tenant B".into(), "key-b".into(), TenantQuota::pro())
1688                .unwrap();
1689
1690            let tenant_a = mgr.get_tenant_mut(&id1).unwrap();
1691            tenant_a
1692                .deploy_pipeline("P1".into(), vpl.into())
1693                .await
1694                .unwrap();
1695            mgr.persist_if_needed(&id1);
1696
1697            let tenant_b = mgr.get_tenant_mut(&id2).unwrap();
1698            tenant_b
1699                .deploy_pipeline("P2".into(), vpl.into())
1700                .await
1701                .unwrap();
1702            tenant_b
1703                .deploy_pipeline("P3".into(), vpl.into())
1704                .await
1705                .unwrap();
1706            mgr.persist_if_needed(&id2);
1707        }
1708
1709        // Phase 2: Recover (simulating restart)
1710        {
1711            let mut mgr = TenantManager::with_store(Arc::clone(&store));
1712            let recovered = mgr.recover().unwrap();
1713            assert_eq!(recovered, 2);
1714            assert_eq!(mgr.tenant_count(), 2);
1715
1716            // Verify tenant A
1717            let tid_a = mgr.get_tenant_by_api_key("key-a").unwrap().clone();
1718            let tenant_a = mgr.get_tenant(&tid_a).unwrap();
1719            assert_eq!(tenant_a.name, "Tenant A");
1720            assert_eq!(tenant_a.pipelines.len(), 1);
1721
1722            // Verify tenant B
1723            let tid_b = mgr.get_tenant_by_api_key("key-b").unwrap().clone();
1724            let tenant_b = mgr.get_tenant(&tid_b).unwrap();
1725            assert_eq!(tenant_b.name, "Tenant B");
1726            assert_eq!(tenant_b.pipelines.len(), 2);
1727        }
1728    }
1729
1730    #[test]
1731    fn test_tenant_snapshot_created_at_and_window() {
1732        let mut mgr = TenantManager::new();
1733        let id = mgr
1734            .create_tenant(
1735                "Window Corp".into(),
1736                "window-key".into(),
1737                TenantQuota::default(),
1738            )
1739            .unwrap();
1740
1741        let tenant = mgr.get_tenant_mut(&id).unwrap();
1742        tenant.usage.events_in_window = 42;
1743        tenant.usage.events_processed = 100;
1744
1745        let snapshot = tenant.snapshot();
1746
1747        // created_at_ms should be populated
1748        assert!(snapshot.created_at_ms.is_some());
1749        let ts = snapshot.created_at_ms.unwrap();
1750        // Should be a reasonable recent timestamp (after 2024-01-01)
1751        assert!(ts > 1_704_067_200_000);
1752
1753        // events_in_window should be captured
1754        assert_eq!(snapshot.events_in_window, 42);
1755
1756        // Round-trip through JSON
1757        let json = serde_json::to_vec(&snapshot).unwrap();
1758        let restored: TenantSnapshot = serde_json::from_slice(&json).unwrap();
1759        assert_eq!(restored.events_in_window, 42);
1760        assert_eq!(restored.created_at_ms, snapshot.created_at_ms);
1761
1762        // Restore tenant and verify events_in_window is carried over
1763        let restored_tenant = TenantManager::restore_tenant_from_snapshot(restored).unwrap();
1764        assert_eq!(restored_tenant.usage.events_in_window, 42);
1765        assert_eq!(restored_tenant.usage.events_processed, 100);
1766    }
1767
1768    #[test]
1769    fn test_tenant_snapshot_backwards_compat() {
1770        // Simulate old JSON without the new fields
1771        let old_json = r#"{
1772            "id": "compat-tenant",
1773            "name": "Compat Corp",
1774            "api_key": "compat-key",
1775            "quota": {
1776                "max_pipelines": 10,
1777                "max_events_per_second": 10000,
1778                "max_streams_per_pipeline": 50
1779            },
1780            "events_processed": 55,
1781            "alerts_generated": 3,
1782            "pipelines": []
1783        }"#;
1784
1785        let snapshot: TenantSnapshot = serde_json::from_str(old_json).unwrap();
1786        assert_eq!(snapshot.id, "compat-tenant");
1787        assert_eq!(snapshot.events_processed, 55);
1788        // New fields should default
1789        assert_eq!(snapshot.created_at_ms, None);
1790        assert_eq!(snapshot.events_in_window, 0);
1791
1792        // Should restore fine
1793        let tenant = TenantManager::restore_tenant_from_snapshot(snapshot).unwrap();
1794        assert_eq!(tenant.name, "Compat Corp");
1795        assert_eq!(tenant.usage.events_processed, 55);
1796        assert_eq!(tenant.usage.events_in_window, 0);
1797    }
1798
1799    #[test]
1800    fn test_recovery_with_invalid_vpl() {
1801        use crate::persistence::MemoryStore;
1802
1803        let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
1804
1805        // Manually store a tenant with invalid VPL
1806        let snapshot = TenantSnapshot {
1807            id: "bad-tenant".into(),
1808            name: "Bad Corp".into(),
1809            api_key_hash: hash_api_key("bad-key"),
1810            quota: TenantQuota::default(),
1811            events_processed: 0,
1812            output_events_emitted: 0,
1813            pipelines: vec![PipelineSnapshot {
1814                id: "bad-pipeline".into(),
1815                name: "Bad Pipeline".into(),
1816                source: "this is not valid VPL {{{{".into(),
1817                status: PipelineStatus::Running,
1818                global_template_id: None,
1819            }],
1820            created_at_ms: None,
1821            events_in_window: 0,
1822        };
1823
1824        let data = serde_json::to_vec(&snapshot).unwrap();
1825        store.put("tenant:bad-tenant", &data).unwrap();
1826        let index = serde_json::to_vec(&vec!["bad-tenant"]).unwrap();
1827        store.put("tenants:index", &index).unwrap();
1828
1829        // Recovery should succeed (tenant recovered, bad pipeline skipped)
1830        let mut mgr = TenantManager::with_store(Arc::clone(&store));
1831        let recovered = mgr.recover().unwrap();
1832        assert_eq!(recovered, 1);
1833
1834        let tid = TenantId::new("bad-tenant");
1835        let tenant = mgr.get_tenant(&tid).unwrap();
1836        assert_eq!(tenant.name, "Bad Corp");
1837        // The invalid pipeline should have been skipped
1838        assert_eq!(tenant.pipelines.len(), 0);
1839    }
1840
1841    #[test]
1842    fn test_backpressure_disabled_by_default() {
1843        let mgr = TenantManager::new();
1844        assert_eq!(mgr.max_queue_depth(), 0);
1845        assert!(mgr.check_backpressure().is_ok());
1846    }
1847
1848    #[test]
1849    fn test_backpressure_set_max_queue_depth() {
1850        let mut mgr = TenantManager::new();
1851        mgr.set_max_queue_depth(100);
1852        assert_eq!(mgr.max_queue_depth(), 100);
1853        // No pending events, should pass
1854        assert!(mgr.check_backpressure().is_ok());
1855    }
1856
1857    #[test]
1858    fn test_backpressure_exceeded() {
1859        let mut mgr = TenantManager::new();
1860        mgr.set_max_queue_depth(10);
1861
1862        // Simulate 10 pending events
1863        mgr.pending_events.store(10, Ordering::Relaxed);
1864        let result = mgr.check_backpressure();
1865        assert!(result.is_err());
1866
1867        if let Err(TenantError::BackpressureExceeded { current, max }) = result {
1868            assert_eq!(current, 10);
1869            assert_eq!(max, 10);
1870        } else {
1871            panic!("Expected BackpressureExceeded error");
1872        }
1873    }
1874
1875    #[test]
1876    fn test_backpressure_not_exceeded() {
1877        let mut mgr = TenantManager::new();
1878        mgr.set_max_queue_depth(10);
1879        mgr.pending_events.store(9, Ordering::Relaxed);
1880        assert!(mgr.check_backpressure().is_ok());
1881    }
1882
1883    #[test]
1884    fn test_backpressure_unlimited_when_zero() {
1885        let mut mgr = TenantManager::new();
1886        mgr.set_max_queue_depth(0);
1887        // Even with huge pending count, should not trigger
1888        mgr.pending_events.store(1_000_000, Ordering::Relaxed);
1889        assert!(mgr.check_backpressure().is_ok());
1890    }
1891
1892    #[test]
1893    fn test_queue_pressure_ratio() {
1894        let mut mgr = TenantManager::new();
1895        mgr.set_max_queue_depth(100);
1896        mgr.pending_events.store(50, Ordering::Relaxed);
1897        let ratio = mgr.queue_pressure_ratio();
1898        assert!((ratio - 0.5).abs() < f64::EPSILON);
1899    }
1900
1901    #[test]
1902    fn test_queue_pressure_ratio_zero_max() {
1903        let mgr = TenantManager::new();
1904        assert_eq!(mgr.queue_pressure_ratio(), 0.0);
1905    }
1906
1907    #[test]
1908    fn test_pending_events_counter() {
1909        let mgr = TenantManager::new();
1910        assert_eq!(mgr.pending_event_count(), 0);
1911        let counter = mgr.pending_events_counter();
1912        counter.fetch_add(5, Ordering::Relaxed);
1913        assert_eq!(mgr.pending_event_count(), 5);
1914    }
1915
1916    #[tokio::test]
1917    async fn test_process_event_with_backpressure_ok() {
1918        let mut mgr = TenantManager::new();
1919        mgr.set_max_queue_depth(100);
1920        let id = mgr
1921            .create_tenant("Test".into(), "key-bp".into(), TenantQuota::default())
1922            .unwrap();
1923
1924        let tenant = mgr.get_tenant_mut(&id).unwrap();
1925        let vpl = "stream A = SensorReading .where(temperature > 100)";
1926        let pid = tenant
1927            .deploy_pipeline("BP Pipeline".into(), vpl.into())
1928            .await
1929            .unwrap();
1930
1931        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1932        let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1933        assert!(result.is_ok());
1934        // Pending count should be back to 0 after processing
1935        assert_eq!(mgr.pending_event_count(), 0);
1936    }
1937
1938    #[tokio::test]
1939    async fn test_process_event_with_backpressure_rejected() {
1940        let mut mgr = TenantManager::new();
1941        mgr.set_max_queue_depth(5);
1942        // Simulate queue being full
1943        mgr.pending_events.store(5, Ordering::Relaxed);
1944
1945        let id = mgr
1946            .create_tenant("Test".into(), "key-bp2".into(), TenantQuota::default())
1947            .unwrap();
1948        let tenant = mgr.get_tenant_mut(&id).unwrap();
1949        let vpl = "stream A = SensorReading .where(temperature > 100)";
1950        let pid = tenant
1951            .deploy_pipeline("BP Pipeline 2".into(), vpl.into())
1952            .await
1953            .unwrap();
1954
1955        let event = Event::new("SensorReading").with_field("temperature", 150.0);
1956        let result = mgr.process_event_with_backpressure(&id, &pid, event).await;
1957        assert!(result.is_err());
1958        match result {
1959            Err(TenantError::BackpressureExceeded { current, max }) => {
1960                assert_eq!(current, 5);
1961                assert_eq!(max, 5);
1962            }
1963            _ => panic!("Expected BackpressureExceeded"),
1964        }
1965    }
1966
1967    #[test]
1968    fn test_backpressure_error_display() {
1969        let err = TenantError::BackpressureExceeded {
1970            current: 50000,
1971            max: 50000,
1972        };
1973        let msg = format!("{err}");
1974        assert!(msg.contains("50000"));
1975        assert!(msg.contains("exceeds maximum"));
1976    }
1977
1978    #[tokio::test]
1979    async fn test_collect_pipeline_metrics_returns_event_counts() {
1980        let mut mgr = TenantManager::new();
1981        let id = mgr
1982            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
1983            .unwrap();
1984
1985        // Deploy a simple filter pipeline
1986        let vpl = "stream A = SensorReading .where(temperature > 100)";
1987        mgr.deploy_pipeline_on_tenant(&id, "test-pipeline".into(), vpl.into())
1988            .await
1989            .unwrap();
1990
1991        // Verify metrics before any events: pipeline exists with 0 events
1992        let metrics = mgr.collect_pipeline_metrics().await;
1993        assert_eq!(metrics.len(), 1, "Should have 1 pipeline");
1994        assert_eq!(metrics[0].0, "test-pipeline");
1995        assert_eq!(metrics[0].1, 0, "events_in should be 0 before processing");
1996        assert_eq!(metrics[0].2, 0, "events_out should be 0 before processing");
1997
1998        // Process events through the pipeline
1999        let tenant = mgr.get_tenant_mut(&id).unwrap();
2000        let pid = tenant.pipelines.keys().next().unwrap().clone();
2001        for i in 0..10 {
2002            let event = Event::new("SensorReading")
2003                .with_field("temperature", (i as f64).mul_add(20.0, 50.0));
2004            tenant.process_event(&pid, event).await.unwrap();
2005        }
2006
2007        // Re-collect metrics — should reflect processed events
2008        let metrics = mgr.collect_pipeline_metrics().await;
2009        assert_eq!(metrics.len(), 1);
2010        assert_eq!(metrics[0].0, "test-pipeline");
2011        assert!(
2012            metrics[0].1 > 0,
2013            "events_in should be > 0 after processing, got {}",
2014            metrics[0].1
2015        );
2016    }
2017
2018    #[tokio::test]
2019    async fn test_collect_pipeline_metrics_multiple_tenants() {
2020        let mut mgr = TenantManager::new();
2021        let id1 = mgr
2022            .create_tenant("Tenant1".into(), "key-1".into(), TenantQuota::default())
2023            .unwrap();
2024        let id2 = mgr
2025            .create_tenant("Tenant2".into(), "key-2".into(), TenantQuota::default())
2026            .unwrap();
2027
2028        let vpl = "stream A = SensorReading .where(temperature > 100)";
2029        mgr.deploy_pipeline_on_tenant(&id1, "pipeline-1".into(), vpl.into())
2030            .await
2031            .unwrap();
2032        mgr.deploy_pipeline_on_tenant(&id2, "pipeline-2".into(), vpl.into())
2033            .await
2034            .unwrap();
2035
2036        let metrics = mgr.collect_pipeline_metrics().await;
2037        assert_eq!(metrics.len(), 2, "Should have 2 pipelines across 2 tenants");
2038        let names: Vec<&str> = metrics.iter().map(|(n, _, _)| n.as_str()).collect();
2039        assert!(names.contains(&"pipeline-1"));
2040        assert!(names.contains(&"pipeline-2"));
2041    }
2042
2043    #[tokio::test]
2044    async fn test_collect_pipeline_metrics_empty_when_no_pipelines() {
2045        let mut mgr = TenantManager::new();
2046        let _id = mgr
2047            .create_tenant("Test".into(), "key-1".into(), TenantQuota::default())
2048            .unwrap();
2049
2050        let metrics = mgr.collect_pipeline_metrics().await;
2051        assert!(
2052            metrics.is_empty(),
2053            "Should be empty when no pipelines deployed"
2054        );
2055    }
2056
2057    #[tokio::test]
2058    async fn test_deploy_pipeline_on_tenant_and_collect_metrics() {
2059        // Test via TenantManager.deploy_pipeline_on_tenant → collect_pipeline_metrics
2060        // This mirrors the exact code path used in production (coordinator deploys to worker)
2061        let mut mgr = TenantManager::new();
2062        let id = mgr
2063            .create_tenant(
2064                "default".into(),
2065                "api-key-123".into(),
2066                TenantQuota::enterprise(),
2067            )
2068            .unwrap();
2069
2070        let vpl = r"
2071            event MarketTick:
2072                symbol: str
2073                price: float
2074            stream Ticks = MarketTick .where(price > 100)
2075        ";
2076        let pipeline_id = mgr
2077            .deploy_pipeline_on_tenant(&id, "financial-cep".into(), vpl.into())
2078            .await
2079            .unwrap();
2080
2081        // Verify the pipeline appears in metrics
2082        let metrics = mgr.collect_pipeline_metrics().await;
2083        assert_eq!(metrics.len(), 1);
2084        assert_eq!(metrics[0].0, "financial-cep");
2085
2086        // Process events and verify counts update
2087        for _ in 0..5 {
2088            let event = Event::new("MarketTick")
2089                .with_field("symbol", "AAPL")
2090                .with_field("price", 150.0);
2091            mgr.process_event_with_backpressure(&id, &pipeline_id, event)
2092                .await
2093                .unwrap();
2094        }
2095
2096        let metrics = mgr.collect_pipeline_metrics().await;
2097        assert_eq!(metrics[0].1, 5, "Should have processed 5 events");
2098    }
2099}