use std::collections::HashSet;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::sync::Mutex;
use std::time::Duration;
use crabka_authz::{AclSource, AuthorizationRequest, AuthorizationResult, Authorizer};
use crabka_metadata::{AclOperation, ResourceType};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use crate::time_util::now_ms;
const OPA_HTTP_TIMEOUT: Duration = Duration::from_secs(5);
pub struct OpaAuthorizer {
super_users: HashSet<String>,
http_client: reqwest::Client,
url: String,
allow_on_error: bool,
cache: Mutex<LruCache<CacheKey, CachedDecision>>,
expire_after_ms: i64,
runtime: tokio::runtime::Handle,
}
impl std::fmt::Debug for OpaAuthorizer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpaAuthorizer")
.field("super_users", &self.super_users)
.field("url", &self.url)
.field("allow_on_error", &self.allow_on_error)
.field("expire_after_ms", &self.expire_after_ms)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, PartialEq, Eq, std::hash::Hash)]
struct CacheKey {
principal: String,
operation: AclOperation,
resource_type: ResourceType,
resource_name: String,
host: IpAddr,
}
#[derive(Debug, Clone, Copy)]
struct CachedDecision {
decision: AuthorizationResult,
expires_at_ms: i64,
}
#[derive(Debug, Serialize)]
struct OpaRequest<'a> {
input: OpaInput<'a>,
}
#[derive(Debug, Serialize)]
struct OpaInput<'a> {
request: OpaRequestInner<'a>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct OpaRequestInner<'a> {
principal: String,
operation: &'a str,
resource: OpaResource<'a>,
host: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct OpaResource<'a> {
resource_type: &'a str,
name: &'a str,
pattern_type: &'a str,
}
#[derive(Debug, Deserialize)]
struct OpaResponse {
result: bool,
}
impl OpaAuthorizer {
pub fn new(
super_users: HashSet<String>,
url: String,
allow_on_error: bool,
max_cache_size: usize,
expire_after_ms: i64,
) -> Result<Self, OpaConfigError> {
let http_client = reqwest::Client::builder()
.timeout(OPA_HTTP_TIMEOUT)
.build()
.map_err(|e| OpaConfigError::Http(e.to_string()))?;
let capacity = NonZeroUsize::new(max_cache_size).ok_or(OpaConfigError::ZeroCache)?;
let cache = Mutex::new(LruCache::new(capacity));
let runtime =
tokio::runtime::Handle::try_current().map_err(|_| OpaConfigError::NoTokioRuntime)?;
Ok(Self {
super_users,
http_client,
url,
allow_on_error,
cache,
expire_after_ms,
runtime,
})
}
async fn call_opa(&self, req: &AuthorizationRequest<'_>) -> AuthorizationResult {
let body = OpaRequest {
input: OpaInput {
request: OpaRequestInner {
principal: format!("User:{}", req.principal.name),
operation: operation_str(req.operation),
resource: OpaResource {
resource_type: resource_type_str(req.resource_type),
name: req.resource_name,
pattern_type: "Literal",
},
host: req.host.ip().to_string(),
},
},
};
match self.http_client.post(&self.url).json(&body).send().await {
Ok(resp) => match resp.json::<OpaResponse>().await {
Ok(r) => {
if r.result {
AuthorizationResult::Allow
} else {
AuthorizationResult::Deny
}
}
Err(e) => {
tracing::warn!(error = %e, url = %self.url, "OPA response parse failed");
self.error_decision()
}
},
Err(e) => {
tracing::warn!(error = %e, url = %self.url, "OPA HTTP call failed");
self.error_decision()
}
}
}
fn error_decision(&self) -> AuthorizationResult {
if self.allow_on_error {
AuthorizationResult::Allow
} else {
AuthorizationResult::Deny
}
}
}
impl Authorizer for OpaAuthorizer {
fn authorize(
&self,
_source: &dyn AclSource,
req: &AuthorizationRequest<'_>,
) -> AuthorizationResult {
if self.super_users.contains(&req.principal.name) {
return AuthorizationResult::Allow;
}
let key = CacheKey {
principal: format!("User:{}", req.principal.name),
operation: req.operation,
resource_type: req.resource_type,
resource_name: req.resource_name.to_string(),
host: req.host.ip(),
};
let now = now_ms();
{
let mut cache = self.cache.lock().expect("OPA cache mutex poisoned");
if let Some(cached) = cache.get(&key)
&& cached.expires_at_ms > now
{
return cached.decision;
}
}
let decision = tokio::task::block_in_place(|| self.runtime.block_on(self.call_opa(req)));
let mut cache = self.cache.lock().expect("OPA cache mutex poisoned");
cache.put(
key,
CachedDecision {
decision,
expires_at_ms: now + self.expire_after_ms,
},
);
decision
}
}
#[derive(Debug, thiserror::Error)]
pub enum OpaConfigError {
#[error("OPA HTTP client build failed: {0}")]
Http(String),
#[error("OPA cache size must be > 0")]
ZeroCache,
#[error("OPA authorizer requires an active tokio runtime")]
NoTokioRuntime,
}
fn operation_str(op: AclOperation) -> &'static str {
match op {
AclOperation::All => "All",
AclOperation::Read => "Read",
AclOperation::Write => "Write",
AclOperation::Create => "Create",
AclOperation::Delete => "Delete",
AclOperation::Alter => "Alter",
AclOperation::Describe => "Describe",
AclOperation::ClusterAction => "ClusterAction",
AclOperation::DescribeConfigs => "DescribeConfigs",
AclOperation::AlterConfigs => "AlterConfigs",
AclOperation::IdempotentWrite => "IdempotentWrite",
}
}
fn resource_type_str(t: ResourceType) -> &'static str {
match t {
ResourceType::Topic => "Topic",
ResourceType::Group => "Group",
ResourceType::Cluster => "Cluster",
ResourceType::TransactionalId => "TransactionalId",
ResourceType::DelegationToken => "DelegationToken",
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_metadata::MetadataImage;
use crabka_security::{AuthMethod, Principal};
use std::net::SocketAddr;
use uuid::Uuid;
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
fn test_principal(name: &str) -> Principal {
Principal {
name: name.into(),
auth_method: AuthMethod::SaslPlain,
groups: vec![],
}
}
fn img() -> MetadataImage {
MetadataImage::new(Uuid::nil())
}
fn host() -> SocketAddr {
"1.2.3.4:9092".parse().unwrap()
}
fn req<'a>(p: &'a Principal, h: &'a SocketAddr, topic: &'a str) -> AuthorizationRequest<'a> {
AuthorizationRequest {
principal: p,
host: h,
resource_type: ResourceType::Topic,
resource_name: topic,
operation: AclOperation::Read,
}
}
fn opa_url(server: &MockServer) -> String {
format!("{}/v1/data/kafka/authz/allow", server.uri())
}
fn supers(names: &[&str]) -> HashSet<String> {
names.iter().map(|s| (*s).to_string()).collect()
}
#[tokio::test(flavor = "multi_thread")]
async fn super_user_bypasses_opa_call() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(
ResponseTemplate::new(200).set_body_json(serde_json::json!({"result": false})),
)
.expect(0)
.mount(&mock)
.await;
let auth =
OpaAuthorizer::new(supers(&["admin"]), opa_url(&mock), false, 100, 60_000).unwrap();
let image = img();
let p = test_principal("admin");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "anything")) == AuthorizationResult::Allow);
}
#[tokio::test(flavor = "multi_thread")]
async fn cache_hit_returns_cached_decision_without_http_call() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(
ResponseTemplate::new(200).set_body_json(serde_json::json!({"result": true})),
)
.expect(1) .mount(&mock)
.await;
let auth = OpaAuthorizer::new(HashSet::new(), opa_url(&mock), false, 100, 60_000).unwrap();
let image = img();
let p = test_principal("alice");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
}
#[tokio::test(flavor = "multi_thread")]
async fn cache_miss_calls_opa_and_caches_result() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(
ResponseTemplate::new(200).set_body_json(serde_json::json!({"result": true})),
)
.expect(1)
.mount(&mock)
.await;
let auth = OpaAuthorizer::new(HashSet::new(), opa_url(&mock), false, 100, 60_000).unwrap();
let image = img();
let p = test_principal("alice");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "fresh-topic")) == AuthorizationResult::Allow);
assert!(auth.authorize(&image, &req(&p, &h, "fresh-topic")) == AuthorizationResult::Allow);
}
#[tokio::test(flavor = "multi_thread")]
async fn cache_entry_expires_after_ttl() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(
ResponseTemplate::new(200).set_body_json(serde_json::json!({"result": true})),
)
.expect(2) .mount(&mock)
.await;
let auth = OpaAuthorizer::new(HashSet::new(), opa_url(&mock), false, 100, 10).unwrap();
let image = img();
let p = test_principal("alice");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
}
#[tokio::test(flavor = "multi_thread")]
async fn http_error_with_allow_on_error_true_returns_allow() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(500))
.mount(&mock)
.await;
let auth = OpaAuthorizer::new(HashSet::new(), opa_url(&mock), true, 100, 60_000).unwrap();
let image = img();
let p = test_principal("alice");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
}
#[tokio::test(flavor = "multi_thread")]
async fn http_error_with_allow_on_error_false_returns_deny() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(500))
.mount(&mock)
.await;
let auth = OpaAuthorizer::new(HashSet::new(), opa_url(&mock), false, 100, 60_000).unwrap();
let image = img();
let p = test_principal("alice");
let h = host();
assert!(auth.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Deny);
}
#[tokio::test(flavor = "multi_thread")]
async fn json_response_parse_error_returns_per_allow_on_error_config() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(200).set_body_string("not-json-at-all"))
.mount(&mock)
.await;
let p = test_principal("alice");
let h = host();
let image = img();
let auth_open =
OpaAuthorizer::new(HashSet::new(), opa_url(&mock), true, 100, 60_000).unwrap();
assert!(auth_open.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Allow);
let auth_closed =
OpaAuthorizer::new(HashSet::new(), opa_url(&mock), false, 100, 60_000).unwrap();
assert!(auth_closed.authorize(&image, &req(&p, &h, "t")) == AuthorizationResult::Deny);
}
}