pub mod provider;
#[cfg(feature = "permission")]
pub mod rbac;
#[cfg(feature = "permission")]
pub mod advanced;
#[cfg(feature = "permission")]
pub use advanced::AdvancedRbacProvider;
use dashmap::DashMap;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::RwLock as AsyncRwLock;
#[derive(Debug, Error)]
pub enum PermissionError {
#[error("Cache capacity must be non-zero")]
InvalidCacheCapacity,
#[error("Role '{0}' not found in permission config")]
RoleNotFound(String),
#[error("Failed to load permission config: {0}")]
ConfigLoadError(String),
#[error("Permission check rate limited")]
RateLimited,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PermissionAction {
Select,
Insert,
Update,
Delete,
}
impl std::fmt::Display for PermissionAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PermissionAction::Select => write!(f, "SELECT"),
PermissionAction::Insert => write!(f, "INSERT"),
PermissionAction::Update => write!(f, "UPDATE"),
PermissionAction::Delete => write!(f, "DELETE"),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct RateLimiter {
max_requests: u32,
window_duration: Duration,
requests: Arc<DashMap<String, Vec<Instant>>>,
}
impl RateLimiter {
pub(crate) fn new(max_requests: u32, window_duration: Duration) -> Self {
Self {
max_requests,
window_duration,
requests: Arc::new(DashMap::new()),
}
}
pub(crate) async fn check(&self, key: &str) -> bool {
let now = Instant::now();
let window_start = now - self.window_duration;
let mut entry = self.requests.entry(key.to_string()).or_default();
entry.retain(|&t| t > window_start);
let window_requests = entry.len();
if window_requests < self.max_requests as usize {
entry.push(now);
true
} else {
false
}
}
pub(crate) fn remaining(&self, key: &str) -> u32 {
let now = Instant::now();
let window_start = now - self.window_duration;
if let Some(timestamps) = self.requests.get(key) {
let valid_count = timestamps.iter().filter(|&&t| t > window_start).count();
self.max_requests.saturating_sub(valid_count as u32)
} else {
self.max_requests
}
}
pub(crate) fn reset(&self, key: &str) {
self.requests.remove(key);
}
pub(crate) fn cleanup(&self) -> usize {
let cleanup_threshold = self.window_duration * 10;
let now = Instant::now();
let mut removed_count = 0;
let keys_to_remove: Vec<String> = self
.requests
.iter()
.filter_map(|entry| {
let key = entry.key(); let latest = entry.value().iter().max();
match latest {
Some(&t) if now - t > cleanup_threshold => Some(key.to_string()),
None => Some(key.to_string()), _ => None,
}
})
.collect();
for key in &keys_to_remove {
if self.requests.remove(key.as_str()).is_some() {
removed_count += 1;
}
}
if removed_count > 0 {
tracing::info!(
"RateLimiter cleanup: removed {} stale entries, remaining {}",
removed_count,
self.requests.len()
);
}
removed_count
}
pub(crate) fn len(&self) -> usize {
self.requests.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.requests.is_empty()
}
}
impl Default for RateLimiter {
fn default() -> Self {
Self::new(100, Duration::from_secs(60))
}
}
pub type Operation = PermissionAction;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TablePermission {
pub name: String,
pub operations: Vec<PermissionAction>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RolePolicy {
pub tables: Vec<TablePermission>,
}
impl RolePolicy {
const SYSTEM_TABLES: &'static [&'static str] = &[
"sqlite_master",
"sqlite_sequence",
"sqlite_stat1",
"sqlite_stat2",
"sqlite_stat3",
"sqlite_stat4",
"information_schema",
"pg_catalog",
"pg_toast",
"mysql",
"performance_schema",
"sys",
];
pub fn allows(&self, table: &str, operation: &PermissionAction) -> bool {
let normalized_table = table.to_lowercase();
let is_system_table = Self::SYSTEM_TABLES.iter().any(|system_table| {
let system_table = system_table.to_lowercase();
normalized_table == system_table || normalized_table.starts_with(&format!("{}.", system_table))
});
for perm in &self.tables {
if perm.name == "*" {
if is_system_table {
continue;
}
if perm.operations.contains(operation) {
return true;
}
} else if perm.name == table {
if perm.operations.contains(operation) {
return true;
}
}
}
if is_system_table {
tracing::warn!(
target: "security",
"Access denied to system table '{}'",
table
);
}
false
}
}
pub struct PermissionContext {
role: String,
user_id: Option<String>,
session_id: Option<String>,
policy_cache: Arc<AsyncRwLock<LruCache<String, RolePolicy>>>,
rate_limiter: Option<Arc<RateLimiter>>,
#[cfg(feature = "permission-engine")]
provider: Option<Arc<dyn crate::permission_engine::PermissionProvider + Send + Sync>>,
}
const DEFAULT_CACHE_CAPACITY: usize = 256;
const DEFAULT_RATE_LIMIT_MAX_REQUESTS: u32 = 100;
const DEFAULT_RATE_LIMIT_WINDOW_SECS: u64 = 60;
impl Default for PermissionContext {
fn default() -> Self {
Self::with_cache_size("admin".to_string(), DEFAULT_CACHE_CAPACITY)
.expect("Default cache capacity should always be valid")
}
}
impl Clone for PermissionContext {
fn clone(&self) -> Self {
Self {
role: self.role.clone(),
user_id: self.user_id.clone(),
session_id: self.session_id.clone(),
policy_cache: self.policy_cache.clone(),
rate_limiter: self.rate_limiter.clone(),
provider: self.provider.clone(),
}
}
}
impl std::fmt::Debug for PermissionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PermissionContext")
.field("role", &self.role)
.field("user_id", &self.user_id)
.field("session_id", &self.session_id)
.field("policy_cache", &"<Arc<AsyncRwLock<LruCache>>")
.field("rate_limiter", &self.rate_limiter.is_some())
.field("provider", &self.provider.is_some())
.finish()
}
}
impl PermissionContext {
pub fn new(role: String, policy_cache: Arc<AsyncRwLock<LruCache<String, RolePolicy>>>) -> Self {
Self {
role,
user_id: None,
session_id: None,
policy_cache,
rate_limiter: Some(Arc::new(RateLimiter::new(
DEFAULT_RATE_LIMIT_MAX_REQUESTS,
Duration::from_secs(DEFAULT_RATE_LIMIT_WINDOW_SECS),
))),
provider: None,
}
}
pub fn with_provider(
role: String,
policy_cache: Arc<AsyncRwLock<LruCache<String, RolePolicy>>>,
provider: Arc<dyn crate::permission_engine::PermissionProvider + Send + Sync>,
) -> Self {
Self {
role,
user_id: None,
session_id: None,
policy_cache,
rate_limiter: Some(Arc::new(RateLimiter::new(
DEFAULT_RATE_LIMIT_MAX_REQUESTS,
Duration::from_secs(DEFAULT_RATE_LIMIT_WINDOW_SECS),
))),
provider: Some(provider),
}
}
pub fn with_provider_or_default(
role: String,
policy_cache: Arc<AsyncRwLock<LruCache<String, RolePolicy>>>,
provider: Option<Arc<dyn crate::permission_engine::PermissionProvider + Send + Sync>>,
) -> Self {
Self {
role,
user_id: None,
session_id: None,
policy_cache,
rate_limiter: Some(Arc::new(RateLimiter::new(
DEFAULT_RATE_LIMIT_MAX_REQUESTS,
Duration::from_secs(DEFAULT_RATE_LIMIT_WINDOW_SECS),
))),
provider,
}
}
pub fn with_cache_size(role: String, cache_capacity: usize) -> Result<Self, PermissionError> {
let capacity = NonZeroUsize::new(cache_capacity).ok_or(PermissionError::InvalidCacheCapacity)?;
Ok(Self {
role,
user_id: None,
session_id: None,
policy_cache: Arc::new(AsyncRwLock::new(LruCache::new(capacity))),
rate_limiter: Some(Arc::new(RateLimiter::new(
DEFAULT_RATE_LIMIT_MAX_REQUESTS,
Duration::from_secs(DEFAULT_RATE_LIMIT_WINDOW_SECS),
))),
provider: None,
})
}
pub fn with_cache_size_and_rate_limit(
role: String,
cache_capacity: usize,
max_requests: u32,
window_secs: u64,
) -> Result<Self, PermissionError> {
let capacity = NonZeroUsize::new(cache_capacity).ok_or(PermissionError::InvalidCacheCapacity)?;
Ok(Self {
role,
user_id: None,
session_id: None,
policy_cache: Arc::new(AsyncRwLock::new(LruCache::new(capacity))),
rate_limiter: Some(Arc::new(RateLimiter::new(
max_requests,
Duration::from_secs(window_secs),
))),
provider: None,
})
}
pub fn role(&self) -> &str {
&self.role
}
pub fn set_user_id(&mut self, user_id: String) {
self.user_id = Some(user_id);
}
pub fn user_id(&self) -> Option<&str> {
self.user_id.as_deref()
}
pub fn set_session_id(&mut self, session_id: String) {
self.session_id = Some(session_id);
}
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
#[cfg(feature = "permission-engine")]
pub fn set_provider(&mut self, provider: Arc<dyn crate::permission_engine::PermissionProvider + Send + Sync>) {
self.provider = Some(provider);
}
fn get_rate_limit_key(&self) -> String {
match (&self.user_id, &self.session_id) {
(Some(user_id), Some(session_id)) => {
format!("{}:{}:{}", self.role, user_id, session_id)
}
(Some(user_id), None) => format!("{}:{}", self.role, user_id),
(None, Some(session_id)) => format!("{}:{}", self.role, session_id),
(None, None) => self.role.clone(),
}
}
pub async fn check_table_access(&self, table: &str, operation: &PermissionAction) -> bool {
#[cfg(feature = "permission-engine")]
if let Some(provider) = &self.provider {
use crate::permission_engine::{
PermissionAction as EnginePermissionAction, PermissionContext, PermissionDecision, PermissionResource,
PermissionSubject,
};
let engine_action = match operation {
PermissionAction::Select => EnginePermissionAction::Select,
PermissionAction::Insert => EnginePermissionAction::Insert,
PermissionAction::Update => EnginePermissionAction::Update,
PermissionAction::Delete => EnginePermissionAction::Delete,
};
let context = PermissionContext::new(
PermissionSubject::role(&self.role),
PermissionResource::new(table),
engine_action,
);
let decision = provider.check_permission(&context).await;
return matches!(decision, PermissionDecision::Allow);
}
if let Some(limiter) = &self.rate_limiter {
let rate_limit_key = self.get_rate_limit_key();
if !limiter.check(&rate_limit_key).await {
tracing::warn!(
target: "security",
"Rate limit exceeded for '{}' on table '{}'",
rate_limit_key,
table
);
return false;
}
}
{
let cache = self.policy_cache.read().await;
if let Some(policy) = cache.peek(self.role.as_str()) {
let allowed = policy.allows(table, operation);
tracing::trace!(
"Permission check: role='{}' table='{}' operation='{}' result={}",
self.role,
table,
operation,
allowed
);
return allowed;
}
}
tracing::debug!(
target: "security",
"Permission cache miss for role '{}' on table '{}'. Access denied by default.",
self.role,
table,
);
false
}
pub async fn invalidate_cache(&self) {
let mut cache = self.policy_cache.write().await;
cache.pop(&self.role);
tracing::info!("Invalidated permission cache for role '{}'", self.role);
}
pub async fn cache_stats(&self) -> CacheStats {
let cache = self.policy_cache.read().await;
CacheStats {
cached_roles: cache.len(),
capacity: cache.cap().get(),
}
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub cached_roles: usize,
pub capacity: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_operation_display() {
assert_eq!(PermissionAction::Select.to_string(), "SELECT");
assert_eq!(PermissionAction::Insert.to_string(), "INSERT");
assert_eq!(PermissionAction::Update.to_string(), "UPDATE");
assert_eq!(PermissionAction::Delete.to_string(), "DELETE");
}
#[test]
fn test_role_policy_allows() {
let policy = RolePolicy {
tables: vec![
TablePermission {
name: "users".to_string(),
operations: vec![PermissionAction::Select, PermissionAction::Insert],
},
TablePermission {
name: "*".to_string(),
operations: vec![PermissionAction::Select],
},
],
};
assert!(policy.allows("users", &PermissionAction::Select));
assert!(policy.allows("users", &PermissionAction::Insert));
assert!(!policy.allows("users", &PermissionAction::Delete));
assert!(policy.allows("orders", &PermissionAction::Select));
assert!(!policy.allows("orders", &PermissionAction::Update));
}
#[test]
fn test_permission_context_creation() {
let cache = Arc::new(tokio::sync::RwLock::new(LruCache::new(NonZeroUsize::new(256).unwrap())));
let ctx = PermissionContext::new("admin".to_string(), cache);
assert_eq!(ctx.role(), "admin");
}
#[tokio::test]
async fn test_rate_limiter_basic() {
let limiter = RateLimiter::new(3, std::time::Duration::from_secs(60));
assert!(limiter.check("user1").await);
assert!(limiter.check("user1").await);
assert!(limiter.check("user1").await);
assert!(!limiter.check("user1").await);
}
#[tokio::test]
async fn test_rate_limiter_different_keys() {
let limiter = RateLimiter::new(2, std::time::Duration::from_secs(60));
assert!(limiter.check("user1").await);
assert!(limiter.check("user1").await);
assert!(!limiter.check("user1").await);
assert!(limiter.check("user2").await);
assert!(limiter.check("user2").await);
assert!(!limiter.check("user2").await);
}
#[tokio::test]
async fn test_rate_limiter_reset() {
let limiter = RateLimiter::new(1, std::time::Duration::from_secs(60));
assert!(limiter.check("user1").await);
assert!(!limiter.check("user1").await);
limiter.reset("user1");
assert!(limiter.check("user1").await);
}
#[tokio::test]
async fn test_rate_limiter_remaining() {
let limiter = RateLimiter::new(3, std::time::Duration::from_secs(60));
assert_eq!(limiter.remaining("user1"), 3);
limiter.check("user1").await;
assert_eq!(limiter.remaining("user1"), 2);
limiter.check("user1").await;
assert_eq!(limiter.remaining("user1"), 1);
limiter.check("user1").await;
assert_eq!(limiter.remaining("user1"), 0);
assert!(!limiter.check("user1").await);
assert_eq!(limiter.remaining("user1"), 0);
}
}