use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, RwLock};
use tracing::warn;
use nodedb_types::config::TuningConfig;
use crate::bridge::dispatch::Dispatcher;
use crate::control::request_tracker::RequestTracker;
use crate::control::security::apikey::ApiKeyStore;
use crate::control::security::audit::AuditLog;
use crate::control::security::credential::CredentialStore;
use crate::control::security::permission::PermissionStore;
use crate::control::security::rls::RlsPolicyStore;
use crate::control::security::role::RoleStore;
use crate::control::security::tenant::{QuotaCheck, TenantIsolation, TenantQuota};
use crate::control::server::sync::dlq::{DlqConfig, SyncDlq};
use crate::types::TenantId;
use crate::wal::WalManager;
pub struct SharedState {
pub dispatcher: Mutex<Dispatcher>,
pub tracker: RequestTracker,
pub wal: Arc<WalManager>,
pub credentials: Arc<CredentialStore>,
pub audit: Mutex<AuditLog>,
pub api_keys: ApiKeyStore,
pub roles: RoleStore,
pub permissions: PermissionStore,
pub tenants: Mutex<TenantIsolation>,
pub rls: RlsPolicyStore,
pub blacklist: crate::control::security::blacklist::store::BlacklistStore,
pub auth_users: crate::control::security::jit::auth_user::AuthUserStore,
pub orgs: crate::control::security::org::store::OrgStore,
pub scope_defs: crate::control::security::scope::store::ScopeStore,
pub scope_grants: crate::control::security::scope::grant::ScopeGrantStore,
pub rate_limiter: crate::control::security::ratelimit::limiter::RateLimiter,
pub session_handles: crate::control::security::session_handle::SessionHandleStore,
pub session_registry: crate::control::security::session_registry::SessionRegistry,
pub escalation: crate::control::security::escalation::EscalationEngine,
pub usage_counter: std::sync::Arc<crate::control::security::metering::counter::UsageCounter>,
pub usage_store: std::sync::Arc<crate::control::security::metering::store::UsageStore>,
pub quota_manager: crate::control::security::metering::quota::QuotaManager,
pub auth_api_keys: crate::control::security::auth_apikey::AuthApiKeyStore,
pub impersonation: crate::control::security::impersonation::ImpersonationStore,
pub emergency: crate::control::security::emergency::EmergencyState,
pub auth_metrics: crate::control::security::observability::AuthMetrics,
pub ceilings: crate::control::security::ceiling::CeilingStore,
pub redaction: crate::control::security::redaction::RedactionStore,
pub risk_scorer: crate::control::security::risk::RiskScorer,
pub tls_policy: crate::control::security::tls_policy::TlsPolicy,
pub siem: crate::control::security::siem::SiemExporter,
pub jwks_registry:
Option<std::sync::Arc<crate::control::security::jwks::registry::JwksRegistry>>,
pub sync_dlq: Mutex<SyncDlq>,
audit_retention_days: u32,
idle_timeout_secs: u64,
pub cluster_topology: Option<Arc<RwLock<nodedb_cluster::ClusterTopology>>>,
pub cluster_routing: Option<Arc<RwLock<nodedb_cluster::RoutingTable>>>,
pub cluster_transport: Option<Arc<nodedb_cluster::NexarTransport>>,
pub node_id: u64,
pub propose_tracker: Option<Arc<crate::control::wal_replication::ProposeTracker>>,
pub raft_proposer: Option<Arc<crate::control::wal_replication::RaftProposer>>,
pub raft_status_fn: Option<Arc<dyn Fn() -> Vec<nodedb_cluster::GroupStatus> + Send + Sync>>,
pub migration_tracker: Option<Arc<nodedb_cluster::MigrationTracker>>,
pub ws_sessions: std::sync::RwLock<std::collections::HashMap<String, u64>>,
pub topic_registry: crate::control::pubsub::TopicRegistry,
pub shape_registry: crate::control::server::sync::shape::ShapeRegistry,
pub change_stream: crate::control::change_stream::ChangeStream,
pub connections_rejected: AtomicU64,
pub connections_accepted: AtomicU64,
pub system_metrics: Option<Arc<crate::control::metrics::SystemMetrics>>,
pub epoch_tracker: Mutex<std::collections::HashMap<String, u64>>,
pub ts_partition_registries: Option<
Mutex<
std::collections::HashMap<
String,
crate::engine::timeseries::partition_registry::PartitionRegistry,
>,
>,
>,
pub cold_storage: Option<Arc<crate::storage::cold::ColdStorage>>,
pub cluster_version_state: Mutex<crate::control::rolling_upgrade::ClusterVersionState>,
pub tuning: TuningConfig,
}
impl SharedState {
pub fn new(dispatcher: Dispatcher, wal: Arc<WalManager>) -> Arc<Self> {
Arc::new(Self {
dispatcher: Mutex::new(dispatcher),
tracker: RequestTracker::new(),
wal,
credentials: Arc::new(CredentialStore::new()),
audit: Mutex::new(AuditLog::new(10_000)),
api_keys: ApiKeyStore::new(),
roles: RoleStore::new(),
permissions: PermissionStore::new(),
tenants: Mutex::new(TenantIsolation::new(TenantQuota::default())),
cluster_topology: None,
cluster_routing: None,
cluster_transport: None,
node_id: 0,
propose_tracker: None,
raft_proposer: None,
raft_status_fn: None,
migration_tracker: None,
rls: RlsPolicyStore::new(),
blacklist: crate::control::security::blacklist::store::BlacklistStore::new(),
auth_users: crate::control::security::jit::auth_user::AuthUserStore::new(),
orgs: crate::control::security::org::store::OrgStore::new(),
scope_defs: crate::control::security::scope::store::ScopeStore::new(),
scope_grants: crate::control::security::scope::grant::ScopeGrantStore::new(),
rate_limiter: crate::control::security::ratelimit::limiter::RateLimiter::default(),
session_handles: crate::control::security::session_handle::SessionHandleStore::default(
),
session_registry: crate::control::security::session_registry::SessionRegistry::new(),
escalation: crate::control::security::escalation::EscalationEngine::default(),
usage_counter: std::sync::Arc::new(
crate::control::security::metering::counter::UsageCounter::new(),
),
usage_store: std::sync::Arc::new(
crate::control::security::metering::store::UsageStore::default(),
),
quota_manager: crate::control::security::metering::quota::QuotaManager::new(),
auth_api_keys: crate::control::security::auth_apikey::AuthApiKeyStore::new(),
impersonation: crate::control::security::impersonation::ImpersonationStore::default(),
emergency: crate::control::security::emergency::EmergencyState::default(),
auth_metrics: crate::control::security::observability::AuthMetrics::new(),
ceilings: crate::control::security::ceiling::CeilingStore::new(),
redaction: crate::control::security::redaction::RedactionStore::new(),
risk_scorer: crate::control::security::risk::RiskScorer::default(),
tls_policy: crate::control::security::tls_policy::TlsPolicy::default(),
siem: crate::control::security::siem::SiemExporter::default(),
jwks_registry: None,
sync_dlq: Mutex::new(SyncDlq::new(DlqConfig::default())),
audit_retention_days: 0,
idle_timeout_secs: 0,
ws_sessions: std::sync::RwLock::new(std::collections::HashMap::new()),
topic_registry: crate::control::pubsub::TopicRegistry::new(10_000),
shape_registry: crate::control::server::sync::shape::ShapeRegistry::new(),
change_stream: crate::control::change_stream::ChangeStream::new(4096),
connections_rejected: AtomicU64::new(0),
connections_accepted: AtomicU64::new(0),
system_metrics: Some(Arc::new(crate::control::metrics::SystemMetrics::new())),
epoch_tracker: Mutex::new(std::collections::HashMap::new()),
ts_partition_registries: Some(Mutex::new(std::collections::HashMap::new())),
cold_storage: None,
cluster_version_state: Mutex::new(
crate::control::rolling_upgrade::ClusterVersionState::new(),
),
tuning: TuningConfig::default(),
})
}
pub fn open(
dispatcher: Dispatcher,
wal: Arc<WalManager>,
catalog_path: &std::path::Path,
auth_config: &crate::config::auth::AuthConfig,
tuning: TuningConfig,
) -> crate::Result<Arc<Self>> {
let mut credentials = CredentialStore::open(catalog_path)?;
credentials.set_lockout_policy(
auth_config.max_failed_logins,
auth_config.lockout_duration_secs,
auth_config.password_expiry_days,
);
let api_keys = ApiKeyStore::new();
let roles = RoleStore::new();
let permissions = PermissionStore::new();
let blacklist = crate::control::security::blacklist::store::BlacklistStore::new();
let mut audit_start_seq = 1u64;
if let Some(catalog) = credentials.catalog() {
api_keys.load_from(catalog)?;
roles.load_from(catalog)?;
permissions.load_from(catalog)?;
blacklist.load_from(catalog)?;
let max_seq = catalog.load_audit_max_seq()?;
if max_seq > 0 {
audit_start_seq = max_seq + 1;
}
}
let mut audit_log = AuditLog::new(10_000);
audit_log.set_next_seq(audit_start_seq);
Ok(Arc::new(Self {
dispatcher: Mutex::new(dispatcher),
tracker: RequestTracker::new(),
wal,
credentials: Arc::new(credentials),
audit: Mutex::new(audit_log),
api_keys,
roles,
permissions,
tenants: Mutex::new(TenantIsolation::new(TenantQuota::default())),
cluster_topology: None,
cluster_routing: None,
cluster_transport: None,
node_id: 0,
propose_tracker: None,
raft_proposer: None,
raft_status_fn: None,
migration_tracker: None,
rls: RlsPolicyStore::new(),
blacklist,
auth_users: crate::control::security::jit::auth_user::AuthUserStore::new(),
orgs: crate::control::security::org::store::OrgStore::new(),
scope_defs: crate::control::security::scope::store::ScopeStore::new(),
scope_grants: crate::control::security::scope::grant::ScopeGrantStore::new(),
rate_limiter: crate::control::security::ratelimit::limiter::RateLimiter::default(),
session_handles: crate::control::security::session_handle::SessionHandleStore::default(
),
session_registry: crate::control::security::session_registry::SessionRegistry::new(),
escalation: crate::control::security::escalation::EscalationEngine::default(),
usage_counter: std::sync::Arc::new(
crate::control::security::metering::counter::UsageCounter::new(),
),
usage_store: std::sync::Arc::new(
crate::control::security::metering::store::UsageStore::default(),
),
quota_manager: crate::control::security::metering::quota::QuotaManager::new(),
auth_api_keys: crate::control::security::auth_apikey::AuthApiKeyStore::new(),
impersonation: crate::control::security::impersonation::ImpersonationStore::default(),
emergency: crate::control::security::emergency::EmergencyState::default(),
auth_metrics: crate::control::security::observability::AuthMetrics::new(),
ceilings: crate::control::security::ceiling::CeilingStore::new(),
redaction: crate::control::security::redaction::RedactionStore::new(),
risk_scorer: crate::control::security::risk::RiskScorer::default(),
tls_policy: crate::control::security::tls_policy::TlsPolicy::default(),
siem: crate::control::security::siem::SiemExporter::default(),
jwks_registry: None,
sync_dlq: Mutex::new(SyncDlq::new(DlqConfig::default())),
audit_retention_days: auth_config.audit_retention_days,
idle_timeout_secs: auth_config.idle_timeout_secs,
ws_sessions: std::sync::RwLock::new(std::collections::HashMap::new()),
topic_registry: crate::control::pubsub::TopicRegistry::new(10_000),
shape_registry: crate::control::server::sync::shape::ShapeRegistry::new(),
change_stream: crate::control::change_stream::ChangeStream::new(4096),
connections_rejected: AtomicU64::new(0),
connections_accepted: AtomicU64::new(0),
system_metrics: Some(Arc::new(crate::control::metrics::SystemMetrics::new())),
epoch_tracker: Mutex::new(std::collections::HashMap::new()),
ts_partition_registries: Some(Mutex::new(std::collections::HashMap::new())),
cold_storage: None,
cluster_version_state: Mutex::new(
crate::control::rolling_upgrade::ClusterVersionState::new(),
),
tuning,
}))
}
pub fn idle_timeout_secs(&self) -> u64 {
self.idle_timeout_secs
}
pub fn timeseries_registries(
&self,
) -> Option<
&Mutex<
std::collections::HashMap<
String,
crate::engine::timeseries::partition_registry::PartitionRegistry,
>,
>,
> {
self.ts_partition_registries.as_ref()
}
pub fn check_tenant_quota(&self, tenant_id: TenantId) -> crate::Result<()> {
let tenants = match self.tenants.lock() {
Ok(t) => t,
Err(poisoned) => {
warn!("tenant isolation mutex poisoned, recovering");
poisoned.into_inner()
}
};
match tenants.check(tenant_id) {
QuotaCheck::Allowed => Ok(()),
QuotaCheck::MemoryExceeded { used, limit } => Err(crate::Error::MemoryExhausted {
engine: format!("tenant {tenant_id}: {used}/{limit} bytes"),
}),
QuotaCheck::ConcurrencyExceeded { active, limit } => Err(crate::Error::BadRequest {
detail: format!("tenant {tenant_id}: {active}/{limit} concurrent requests"),
}),
QuotaCheck::RateLimited { qps, limit } => Err(crate::Error::BadRequest {
detail: format!("tenant {tenant_id}: rate limited ({qps}/{limit} qps)"),
}),
QuotaCheck::StorageExceeded { used, limit } => Err(crate::Error::BadRequest {
detail: format!("tenant {tenant_id}: storage quota ({used}/{limit} bytes)"),
}),
}
}
pub fn tenant_request_start(&self, tenant_id: TenantId) {
match self.tenants.lock() {
Ok(mut t) => t.request_start(tenant_id),
Err(poisoned) => poisoned.into_inner().request_start(tenant_id),
}
}
pub fn tenant_request_end(&self, tenant_id: TenantId) {
match self.tenants.lock() {
Ok(mut t) => t.request_end(tenant_id),
Err(poisoned) => poisoned.into_inner().request_end(tenant_id),
}
}
pub fn reset_tenant_rate_counters(&self) {
match self.tenants.lock() {
Ok(mut t) => t.reset_rate_counters(),
Err(poisoned) => poisoned.into_inner().reset_rate_counters(),
}
}
pub fn audit_record(
&self,
event: crate::control::security::audit::AuditEvent,
tenant_id: Option<crate::types::TenantId>,
source: &str,
detail: &str,
) {
match self.audit.lock() {
Ok(mut log) => {
log.record(event, tenant_id, source, detail);
}
Err(poisoned) => {
warn!("audit log mutex poisoned, recovering");
poisoned
.into_inner()
.record(event, tenant_id, source, detail);
}
}
}
pub fn update_tenant_memory_estimates(&self) {
let total_allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap_or(0) as u64;
let mut tenants = match self.tenants.lock() {
Ok(t) => t,
Err(p) => p.into_inner(),
};
let tenant_requests: Vec<(crate::types::TenantId, u64)> = {
let users = self.credentials.list_user_details();
let mut seen = std::collections::HashSet::new();
let mut result = Vec::new();
for user in &users {
if seen.insert(user.tenant_id) {
let total = tenants
.usage(user.tenant_id)
.map_or(0, |u| u.total_requests);
result.push((user.tenant_id, total));
}
}
result
};
let total_reqs: u64 = tenant_requests.iter().map(|(_, r)| *r).sum();
if total_reqs == 0 {
return;
}
for (tid, reqs) in &tenant_requests {
let proportion = *reqs as f64 / total_reqs as f64;
let estimated_bytes = (total_allocated as f64 * proportion) as u64;
tenants.update_memory(*tid, estimated_bytes);
}
}
pub fn flush_audit_log(&self) {
let entries = match self.audit.lock() {
Ok(mut log) => log.drain_for_persistence(),
Err(poisoned) => {
warn!("audit log mutex poisoned during flush, recovering");
poisoned.into_inner().drain_for_persistence()
}
};
if entries.is_empty() {
return;
}
if let Some(catalog) = self.credentials.catalog() {
let stored: Vec<crate::control::security::catalog::StoredAuditEntry> = entries
.iter()
.map(|e| crate::control::security::catalog::StoredAuditEntry {
seq: e.seq,
timestamp_us: e.timestamp_us,
event: format!("{:?}", e.event),
tenant_id: e.tenant_id.map(|t| t.as_u32()),
source: e.source.clone(),
detail: e.detail.clone(),
prev_hash: e.prev_hash.clone(),
})
.collect();
if let Err(e) = catalog.append_audit_entries(&stored) {
warn!(error = %e, count = stored.len(), "failed to persist audit entries");
if let Ok(mut log) = self.audit.lock() {
for entry in entries {
log.record(entry.event, entry.tenant_id, &entry.source, &entry.detail);
}
}
} else {
tracing::debug!(count = stored.len(), "flushed audit entries to catalog");
if self.audit_retention_days > 0 {
let retention_us = self.audit_retention_days as u64 * 86400 * 1_000_000; let now_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64;
let cutoff = now_us.saturating_sub(retention_us);
match catalog.prune_audit_before(cutoff) {
Ok(0) => {}
Ok(n) => tracing::info!(
pruned = n,
days = self.audit_retention_days,
"pruned old audit entries"
),
Err(e) => warn!(error = %e, "failed to prune old audit entries"),
}
}
}
}
}
pub fn poll_and_route_responses(&self) {
let responses = match self.dispatcher.lock() {
Ok(mut d) => d.poll_responses(),
Err(poisoned) => {
warn!("dispatcher mutex poisoned, recovering");
poisoned.into_inner().poll_responses()
}
};
for resp in responses {
if !self.tracker.complete(resp) {
warn!("response for unknown or cancelled request");
}
}
}
}