#![allow(deprecated)]
use crate::{Permission, PermissionContext};
use async_trait::async_trait;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::RwLock;
type PermissionMap = Arc<RwLock<HashMap<String, Vec<String>>>>;
pub struct DjangoModelPermissions {
user_permissions: PermissionMap,
model_name: Option<String>,
}
impl DjangoModelPermissions {
pub fn new() -> Self {
Self {
user_permissions: Arc::new(RwLock::new(HashMap::new())),
model_name: None,
}
}
pub fn with_model_name(model_name: &str) -> Self {
let Some((app_label, model)) = model_name.split_once('.') else {
panic!(
"model_name must be in `app_label.model` format, got: {:?}",
model_name
);
};
assert!(
!app_label.is_empty() && !model.is_empty(),
"both app_label and model must be non-empty in `app_label.model` format, got: {:?}",
model_name
);
Self {
user_permissions: Arc::new(RwLock::new(HashMap::new())),
model_name: Some(model_name.to_string()),
}
}
fn get_required_permissions(&self, method: &str) -> Vec<String> {
let Some(ref model_name) = self.model_name else {
return Vec::new();
};
let Some((app_label, model)) = model_name.split_once('.') else {
return Vec::new();
};
let actions = match method {
"POST" => vec!["add"],
"PUT" | "PATCH" => vec!["change"],
"DELETE" => vec!["delete"],
"GET" | "HEAD" | "OPTIONS" => vec!["view"],
_ => return Vec::new(),
};
actions
.into_iter()
.map(|action| format!("{}.{}_{}", app_label, action, model))
.collect()
}
pub fn add_user_permission(&mut self, username: &str, permission: &str) {
let user_perms = Arc::clone(&self.user_permissions);
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let mut perms = user_perms.write().await;
perms
.entry(username.to_string())
.or_default()
.push(permission.to_string());
})
});
}
pub async fn user_has_permission(&self, username: &str, permission: &str) -> bool {
let perms = self.user_permissions.read().await;
if let Some(user_perms) = perms.get(username) {
return user_perms.iter().any(|p| p == permission);
}
false
}
}
impl Default for DjangoModelPermissions {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Permission for DjangoModelPermissions {
async fn has_permission(&self, context: &PermissionContext<'_>) -> bool {
if !context.is_authenticated {
return false;
}
let required_perms = self.get_required_permissions(context.request.method.as_str());
if required_perms.is_empty() {
return context.is_admin;
}
if let Some(user) = &context.user {
let perms = self.user_permissions.read().await;
if let Some(user_perms) = perms.get(user.username()) {
return required_perms
.iter()
.all(|required| user_perms.iter().any(|p| p == required));
}
}
false
}
}
pub struct DjangoModelPermissionsOrAnonReadOnly {
base: DjangoModelPermissions,
}
impl DjangoModelPermissionsOrAnonReadOnly {
pub fn new() -> Self {
Self {
base: DjangoModelPermissions::new(),
}
}
pub fn add_user_permission(&mut self, username: &str, permission: &str) {
self.base.add_user_permission(username, permission);
}
pub async fn user_has_permission(&self, username: &str, permission: &str) -> bool {
self.base.user_has_permission(username, permission).await
}
}
impl Default for DjangoModelPermissionsOrAnonReadOnly {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Permission for DjangoModelPermissionsOrAnonReadOnly {
async fn has_permission(&self, context: &PermissionContext<'_>) -> bool {
if !context.is_authenticated {
return matches!(context.request.method.as_str(), "GET" | "HEAD" | "OPTIONS");
}
self.base.has_permission(context).await
}
}
pub struct ModelPermission<T> {
operation: String,
_phantom: PhantomData<T>,
}
impl<T> ModelPermission<T> {
pub fn new(operation: impl Into<String>) -> Self {
Self {
operation: operation.into(),
_phantom: PhantomData,
}
}
pub fn operation(&self) -> &str {
&self.operation
}
}
#[async_trait]
impl<T: Send + Sync> Permission for ModelPermission<T> {
async fn has_permission(&self, context: &PermissionContext<'_>) -> bool {
context.is_authenticated
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use hyper::Method;
use reinhardt_http::Request;
#[derive(Debug)]
struct TestModel;
#[test]
fn test_model_permission_creation() {
let perm = ModelPermission::<TestModel>::new("create");
assert_eq!(perm.operation(), "create");
}
#[test]
fn test_model_permission_operations() {
let create = ModelPermission::<TestModel>::new("create");
let read = ModelPermission::<TestModel>::new("read");
let update = ModelPermission::<TestModel>::new("update");
let delete = ModelPermission::<TestModel>::new("delete");
assert_eq!(create.operation(), "create");
assert_eq!(read.operation(), "read");
assert_eq!(update.operation(), "update");
assert_eq!(delete.operation(), "delete");
}
#[tokio::test]
async fn test_model_permission_authenticated() {
let perm = ModelPermission::<TestModel>::new("read");
let request = Request::builder()
.method(Method::GET)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: None,
};
assert!(perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_model_permission_unauthenticated() {
let perm = ModelPermission::<TestModel>::new("create");
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: false,
is_admin: false,
is_active: false,
user: None,
};
assert!(!perm.has_permission(&context).await);
}
#[derive(Debug)]
struct Article;
#[derive(Debug)]
struct Comment;
#[tokio::test]
async fn test_different_model_types() {
let article_perm = ModelPermission::<Article>::new("update");
let comment_perm = ModelPermission::<Comment>::new("delete");
let request = Request::builder()
.method(Method::PUT)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: None,
};
assert!(article_perm.has_permission(&context).await);
assert!(comment_perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_creation() {
let perm = DjangoModelPermissions::new();
assert!(!perm.user_has_permission("alice", "blog.add_article").await);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_add_permission() {
let mut perm = DjangoModelPermissions::new();
perm.add_user_permission("alice", "blog.add_article");
perm.add_user_permission("alice", "blog.change_article");
assert!(perm.user_has_permission("alice", "blog.add_article").await);
assert!(
perm.user_has_permission("alice", "blog.change_article")
.await
);
assert!(
!perm
.user_has_permission("alice", "blog.delete_article")
.await
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_different_users() {
let mut perm = DjangoModelPermissions::new();
perm.add_user_permission("alice", "blog.add_article");
perm.add_user_permission("bob", "blog.view_article");
assert!(perm.user_has_permission("alice", "blog.add_article").await);
assert!(!perm.user_has_permission("alice", "blog.view_article").await);
assert!(perm.user_has_permission("bob", "blog.view_article").await);
assert!(!perm.user_has_permission("bob", "blog.add_article").await);
}
#[tokio::test]
async fn test_django_model_permissions_trait_authenticated_admin() {
let perm = DjangoModelPermissions::new();
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: true,
is_active: true,
user: None,
};
assert!(perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_trait_authenticated_not_admin() {
let perm = DjangoModelPermissions::new();
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: None,
};
assert!(!perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_trait_unauthenticated() {
let perm = DjangoModelPermissions::new();
let request = Request::builder()
.method(Method::GET)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: false,
is_admin: false,
is_active: false,
user: None,
};
assert!(!perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_or_anon_read_only_get() {
let perm = DjangoModelPermissionsOrAnonReadOnly::new();
let request = Request::builder()
.method(Method::GET)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: false,
is_admin: false,
is_active: false,
user: None,
};
assert!(perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_or_anon_read_only_post() {
let perm = DjangoModelPermissionsOrAnonReadOnly::new();
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: false,
is_admin: false,
is_active: false,
user: None,
};
assert!(!perm.has_permission(&context).await);
}
#[tokio::test]
async fn test_django_model_permissions_or_anon_read_only_authenticated() {
let perm = DjangoModelPermissionsOrAnonReadOnly::new();
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: true,
is_active: true,
user: None,
};
assert!(perm.has_permission(&context).await);
}
fn make_user(username: &str) -> Box<dyn crate::User> {
Box::new(crate::SimpleUser {
id: uuid::Uuid::now_v7(),
username: username.to_string(),
email: format!("{}@example.com", username),
is_active: true,
is_admin: false,
is_staff: false,
is_superuser: false,
})
}
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_non_admin_with_matching_permissions() {
let mut perm = DjangoModelPermissions::with_model_name("blog.article");
perm.add_user_permission("alice", "blog.add_article");
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(perm.has_permission(&context).await);
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_non_admin_wrong_permissions() {
let mut perm = DjangoModelPermissions::with_model_name("blog.article");
perm.add_user_permission("alice", "blog.add_article");
let request = Request::builder()
.method(Method::DELETE)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(!perm.has_permission(&context).await);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_non_admin_empty_permissions() {
let mut perm = DjangoModelPermissions::with_model_name("blog.article");
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(!perm.has_permission(&context).await);
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_no_model_name_non_admin_denied() {
let mut perm = DjangoModelPermissions::new();
perm.add_user_permission("alice", "blog.add_article");
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(!perm.has_permission(&context).await);
}
#[rstest::rstest]
#[tokio::test]
async fn test_django_model_permissions_no_model_name_admin_allowed() {
let perm = DjangoModelPermissions::new();
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: true,
is_active: true,
user: Some(make_user("admin")),
};
assert!(perm.has_permission(&context).await);
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_django_model_permissions_method_to_permission_mapping() {
let mut perm = DjangoModelPermissions::with_model_name("blog.article");
perm.add_user_permission("alice", "blog.view_article");
perm.add_user_permission("alice", "blog.add_article");
perm.add_user_permission("alice", "blog.change_article");
let user = make_user("alice");
let request = Request::builder()
.method(Method::GET)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(perm.has_permission(&context).await);
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(perm.has_permission(&context).await);
let request = Request::builder()
.method(Method::PUT)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(perm.has_permission(&context).await);
let request = Request::builder()
.method(Method::DELETE)
.uri("/")
.body(Bytes::new())
.build()
.unwrap();
let context = PermissionContext {
request: &request,
is_authenticated: true,
is_admin: false,
is_active: true,
user: Some(make_user("alice")),
};
assert!(!perm.has_permission(&context).await);
}
}