orion-server 0.2.0

Declarative services runtime powered by dataflow-rs
use std::collections::HashMap;
use std::sync::Arc;

use datalogic_rs::{Engine as DatalogicEngine, Logic};
use tokio::sync::{RwLock, Semaphore};

use super::KeyedLimiter;
use super::build_keyed_limiter;
use super::config::ChannelConfig;
use super::routing::{RouteMatch, RouteTable};

use crate::config::{TraceStorageMode, TracingStorageConfig};
use crate::connector::ConnectorConfig;
use crate::connector::ConnectorRegistry;
use crate::connector::cache_backend::{CacheBackend, CachePool};
use crate::storage::models::Channel;

/// Trace-storage policy resolved for a single channel.
///
/// Produced by merging the global `[tracing.storage]` config with the
/// channel-level `tracing` override (`ChannelTracingConfig`). Lives on
/// `ChannelRuntimeConfig` so the request hot path looks up exactly one
/// `Arc<ChannelRuntimeConfig>` and reads pre-resolved values.
#[derive(Debug, Clone, Copy)]
pub struct EffectiveTraceConfig {
    pub mode: TraceStorageMode,
    pub sample_rate: f64,
    pub errors_only: bool,
    /// When `true`, the engine should capture per-task execution traces
    /// and persist them. No global default — only enabled per-channel
    /// because the storage cost scales with task count and payload size.
    pub task_details: bool,
}

impl EffectiveTraceConfig {
    /// Returns `Some(reason)` if a trace with the given error state should be
    /// dropped per this config (off / errors_only / sampled out), or `None`
    /// to persist. Used by both the sync request path and the async-queue
    /// post-processing path so filter semantics stay consistent.
    pub fn should_drop(&self, has_errors: bool) -> Option<&'static str> {
        if matches!(self.mode, TraceStorageMode::Off) {
            return Some("off");
        }
        if self.errors_only && !has_errors {
            return Some("errors_only");
        }
        if self.sample_rate < 1.0 && rand::random::<f64>() >= self.sample_rate {
            return Some("sampled_out");
        }
        None
    }

    /// Compute the effective config by overlaying a channel-level
    /// override on top of the global storage config.
    pub fn resolve(
        global: &TracingStorageConfig,
        channel: Option<&super::config::ChannelTracingConfig>,
    ) -> Self {
        let (mode, sample_rate, errors_only, task_details) = match channel {
            Some(c) => (
                c.mode.unwrap_or(global.mode),
                c.sample_rate.unwrap_or(global.sample_rate),
                c.errors_only.unwrap_or(global.errors_only),
                c.task_details.unwrap_or(false),
            ),
            None => (global.mode, global.sample_rate, global.errors_only, false),
        };
        Self {
            mode,
            sample_rate,
            errors_only,
            task_details,
        }
    }
}

/// Runtime state for a single active channel.
pub struct ChannelRuntimeConfig {
    /// The channel DB model.
    pub channel: Channel,
    /// Parsed per-channel configuration.
    pub parsed_config: ChannelConfig,
    /// Per-channel rate limiter, built from `parsed_config.rate_limit` if configured.
    pub rate_limiter: Option<Arc<KeyedLimiter>>,
    /// Pre-compiled JSONLogic expression for computing the rate limit key.
    pub rate_limit_key_logic: Option<Logic>,
    /// Pre-compiled JSONLogic expression for input validation.
    /// Evaluated against request data — truthy = pass, falsy = 400 reject.
    pub validation_logic: Option<Logic>,
    /// Per-channel concurrency limiter for backpressure.
    /// Limits max in-flight requests — returns 503 when exhausted.
    pub backpressure_semaphore: Option<Arc<Semaphore>>,
    /// Per-channel deduplication backend for idempotent request handling.
    /// Can be backed by in-memory DashMap or Redis, depending on channel config.
    pub dedup_store: Option<Arc<dyn CacheBackend>>,
    /// Per-channel response cache backend.
    /// When set, sync responses are cached with a configurable TTL.
    pub response_cache: Option<Arc<dyn CacheBackend>>,
    /// Trace storage policy after merging the global and per-channel config.
    pub trace_storage: EffectiveTraceConfig,
}

/// In-memory registry of active channels, rebuilt on engine reload.
/// Mirrors the ConnectorRegistry pattern.
pub struct ChannelRegistry {
    by_name: RwLock<HashMap<String, Arc<ChannelRuntimeConfig>>>,
    route_table: RwLock<RouteTable>,
}

impl Default for ChannelRegistry {
    fn default() -> Self {
        Self::new()
    }
}

impl ChannelRegistry {
    pub fn new() -> Self {
        Self {
            by_name: RwLock::new(HashMap::new()),
            route_table: RwLock::new(RouteTable::new()),
        }
    }

    /// Look up an active channel by name.
    pub async fn get_by_name(&self, name: &str) -> Option<Arc<ChannelRuntimeConfig>> {
        self.by_name.read().await.get(name).cloned()
    }

    /// Match a request (method, path) against REST channel route patterns.
    /// Path should NOT include the `/api/v1/data/` prefix.
    pub async fn match_route(&self, method: &str, path: &str) -> Option<RouteMatch> {
        self.route_table.read().await.match_route(method, path)
    }

    /// Rebuild the registry from a list of active channels.
    /// Builds per-channel rate limiters from `config_json.rate_limit` if configured.
    pub async fn reload(
        &self,
        channels: &[Channel],
        connector_registry: &ConnectorRegistry,
        cache_pool: &CachePool,
        datalogic: &DatalogicEngine,
        global_trace_storage: &TracingStorageConfig,
    ) {
        let mut new_map = HashMap::new();
        for channel in channels {
            let parsed_config: ChannelConfig =
                serde_json::from_str(&channel.config_json).unwrap_or_default();

            let rate_limiter = parsed_config.rate_limit.as_ref().map(|rl| {
                let burst = rl.burst.unwrap_or(rl.requests_per_second / 2 + 1);
                Arc::new(build_keyed_limiter(rl.requests_per_second, burst))
            });

            let rate_limit_key_logic = parsed_config
                .rate_limit
                .as_ref()
                .and_then(|rl| rl.key_logic.as_ref())
                .and_then(|logic| {
                    datalogic
                        .compile(logic)
                        .map_err(|e| {
                            tracing::warn!(
                                channel = %channel.name,
                                error = %e,
                                "Failed to compile rate limit key_logic, falling back to client_ip"
                            );
                        })
                        .ok()
                });

            let validation_logic = parsed_config.validation_logic.as_ref().and_then(|logic| {
                datalogic
                    .compile(logic)
                    .map_err(|e| {
                        tracing::warn!(
                            channel = %channel.name,
                            error = %e,
                            "Failed to compile validation_logic, skipping input validation"
                        );
                    })
                    .ok()
            });

            let backpressure_semaphore = parsed_config
                .backpressure
                .as_ref()
                .map(|bp| Arc::new(Semaphore::new(bp.max_concurrent)));

            let dedup_store: Option<Arc<dyn CacheBackend>> =
                if let Some(ref dedup) = parsed_config.deduplication {
                    if let Some(ref connector_name) = dedup.connector {
                        // Use the configured cache connector for dedup
                        match connector_registry.get(connector_name).await {
                            Some(cfg) => match cfg.as_ref() {
                                ConnectorConfig::Cache(cache_cfg) => {
                                    match cache_pool.get_backend(connector_name, cache_cfg).await {
                                        Ok(backend) => Some(backend),
                                        Err(e) => {
                                            tracing::warn!(
                                                channel = %channel.name,
                                                connector = %connector_name,
                                                error = %e,
                                                "Failed to create dedup backend from connector, \
                                                 falling back to in-memory"
                                            );
                                            Some(cache_pool.memory())
                                        }
                                    }
                                }
                                _ => {
                                    tracing::warn!(
                                        channel = %channel.name,
                                        connector = %connector_name,
                                        "Dedup connector is not a cache connector, \
                                         falling back to in-memory"
                                    );
                                    Some(cache_pool.memory())
                                }
                            },
                            None => {
                                tracing::warn!(
                                    channel = %channel.name,
                                    connector = %connector_name,
                                    "Dedup connector not found, falling back to in-memory"
                                );
                                Some(cache_pool.memory())
                            }
                        }
                    } else {
                        // No connector specified — use built-in in-memory
                        Some(cache_pool.memory())
                    }
                } else {
                    None
                };

            // Resolve response cache backend (same pattern as dedup)
            let response_cache: Option<Arc<dyn CacheBackend>> = if let Some(ref cache_cfg) =
                parsed_config.cache
                && cache_cfg.enabled
            {
                if let Some(ref connector_name) = cache_cfg.connector {
                    match connector_registry.get(connector_name).await {
                        Some(cfg) => match cfg.as_ref() {
                            ConnectorConfig::Cache(cc) => {
                                match cache_pool.get_backend(connector_name, cc).await {
                                    Ok(backend) => Some(backend),
                                    Err(e) => {
                                        tracing::warn!(
                                            channel = %channel.name,
                                            connector = %connector_name,
                                            error = %e,
                                            "Failed to create cache backend from connector, \
                                             falling back to in-memory"
                                        );
                                        Some(cache_pool.memory())
                                    }
                                }
                            }
                            _ => {
                                tracing::warn!(
                                    channel = %channel.name,
                                    connector = %connector_name,
                                    "Cache connector is not a cache connector, \
                                     falling back to in-memory"
                                );
                                Some(cache_pool.memory())
                            }
                        },
                        None => {
                            tracing::warn!(
                                channel = %channel.name,
                                connector = %connector_name,
                                "Cache connector not found, falling back to in-memory"
                            );
                            Some(cache_pool.memory())
                        }
                    }
                } else {
                    Some(cache_pool.memory())
                }
            } else {
                None
            };

            let trace_storage =
                EffectiveTraceConfig::resolve(global_trace_storage, parsed_config.tracing.as_ref());
            let runtime = Arc::new(ChannelRuntimeConfig {
                channel: channel.clone(),
                parsed_config,
                rate_limiter,
                rate_limit_key_logic,
                validation_logic,
                backpressure_semaphore,
                dedup_store,
                response_cache,
                trace_storage,
            });
            new_map.insert(channel.name.clone(), runtime);
        }
        *self.by_name.write().await = new_map;

        // Rebuild the REST route table from active channels
        *self.route_table.write().await = RouteTable::build(channels);
    }

    /// Get all active channel names.
    pub async fn channel_names(&self) -> Vec<String> {
        self.by_name.read().await.keys().cloned().collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_channel_registry_empty() {
        let registry = ChannelRegistry::new();
        assert!(registry.get_by_name("nonexistent").await.is_none());
        assert!(registry.channel_names().await.is_empty());
    }
}