#![cfg(feature = "access")]
use crate::memory_map::*;
#[cfg(test)]
use crate::testing::*;
use crate::types::*;
use crate::utils::*;
use candid::{CandidType, Principal};
use ic_cdk_macros::{query, update};
use ic_stable_structures::{DefaultMemoryImpl, StableBTreeMap, StableCell};
use rustic_macros::modifiers;
use std::cell::RefCell;
#[derive(Clone, CandidType, serde::Serialize, serde::Deserialize)]
struct AccessControl {
owner: Option<Principal>,
pending_owner: Option<Principal>,
admins: Vec<Principal>,
admins_of_role: [u32; 32],
}
thread_local! {
static ACCESS_CONTROL: RefCell<StableCell<Cbor<Option<AccessControl>>, RM>> =
#[allow(clippy::expect_used)] RefCell::new(StableCell::init(
RM::new(DefaultMemoryImpl::default(), ACCESS_CONTROL_PAGE_START..ACCESS_CONTROL_PAGE_END),
Cbor(Some(AccessControl {
owner: Some(canister_caller()),
pending_owner: None,
admins: vec![canister_caller()],
admins_of_role: Default::default(),
})),
).expect("Failed to initialize the access control cell")
);
}
pub(crate) fn access_init(owner: Principal) {
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.owner = Some(owner);
config.admins = vec![owner];
#[allow(clippy::expect_used)] c.set(Cbor(Some(config)))
.expect("Access control init failed");
});
}
pub fn only_owner() -> Result<(), String> {
let caller = canister_caller();
#[allow(clippy::unwrap_used)] if ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap().owner) == Some(caller) {
Ok(())
} else {
Err("Caller is not the owner".to_string())
}
}
#[query]
pub fn is_owner(owner: Principal) -> bool {
ACCESS_CONTROL.with(|c| {
#[allow(clippy::unwrap_used)] c.borrow().get().0.clone().unwrap().owner
}) == Some(owner)
}
#[update]
#[modifiers("only_owner")]
pub fn transfer_ownership(new_owner: Option<Principal>) {
if let Some(x) = new_owner {
assert_ne!(
x,
Principal::anonymous(),
"Cannot transfer ownership to the anonymous principal"
);
}
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.pending_owner = new_owner;
#[allow(clippy::expect_used)] c.set(Cbor(Some(config)))
.expect("Ownership transfer failed");
});
}
#[update]
#[modifiers("only_owner")]
pub fn transfer_ownership_immediate(new_owner: Option<Principal>) {
if let Some(x) = new_owner {
assert_ne!(
x,
Principal::anonymous(),
"Cannot transfer ownership to the anonymous principal"
);
}
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.pending_owner = None;
config.owner = new_owner;
#[allow(clippy::expect_used)] c.set(Cbor(Some(config)))
.expect("Ownership transfer failed");
});
}
#[update]
#[modifiers("only_owner")]
pub fn renounce_ownership() {
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.owner = None;
config.pending_owner = None;
#[allow(clippy::expect_used)] c.set(Cbor(Some(config)))
.expect("Ownership transfer failed");
});
}
#[update]
pub fn accept_ownership() {
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
#[allow(clippy::expect_used)] let new_owner = config.pending_owner.expect("No pending owner");
assert_eq!(
new_owner,
canister_caller(),
"Only pending owner can accept ownership"
);
config.owner = Some(new_owner);
config.pending_owner = None;
#[allow(clippy::expect_used)] c.set(Cbor(Some(config)))
.expect("Ownership transfer failed");
});
}
#[query]
pub fn owner() -> Option<Principal> {
#[allow(clippy::unwrap_used)] ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap().owner)
}
#[query]
pub fn pending_owner() -> Option<Principal> {
#[allow(clippy::unwrap_used)] ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap().pending_owner)
}
#[query]
pub fn owner_and_pending_owner() -> (Option<Principal>, Option<Principal>) {
#[allow(clippy::unwrap_used)] let config = ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap());
(config.owner, config.pending_owner)
}
pub fn only_admin() -> Result<(), String> {
let caller = canister_caller();
#[allow(clippy::unwrap_used)] if ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap().admins.contains(&caller)) {
Ok(())
} else {
Err("Caller is not an admin".to_string())
}
}
#[query]
pub fn is_admin(admin: Principal) -> bool {
#[allow(clippy::unwrap_used)] ACCESS_CONTROL.with(|c| c.borrow().get().0.clone().unwrap().admins.contains(&admin))
}
#[update]
#[modifiers("only_owner")]
pub fn grant_admin(new_admin: Principal) {
assert_ne!(
new_admin,
Principal::anonymous(),
"Cannot grant admin to the anonymous principal"
);
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
if !config.admins.contains(&new_admin) {
config.admins.push(new_admin);
#[allow(clippy::expect_used)] c.set(Cbor(Some(config))).expect("Grant admin failed");
}
});
}
#[update]
#[modifiers("only_owner")]
pub fn revoke_admin(admin: Principal) {
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.admins.retain(|x| x != &admin);
#[allow(clippy::expect_used)] c.set(Cbor(Some(config))).expect("Revoke admin failed");
});
}
#[update]
#[modifiers("only_admin")]
pub fn renounce_admin() {
let admin = canister_caller();
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
config.admins.retain(|x| x != &admin);
#[allow(clippy::expect_used)] c.set(Cbor(Some(config))).expect("Revoke admin failed");
});
}
thread_local! {
static ACCESS_ROLES: RefCell<StableBTreeMap<StablePrincipal, u32, VM>> =
MEMORY_MANAGER.with(|mm| {
RefCell::new(StableBTreeMap::init(
mm.borrow().get(ACCESS_ROLES_MEM_ID)))
});
}
#[cfg(feature = "access-roles")]
fn is_role_admin(role_flag: u32, role: u8) -> bool {
assert!(role <= 31, "Role must be between 0 and 31");
ACCESS_CONTROL.with(|c| {
let c = c.borrow();
#[allow(clippy::unwrap_used)] let config = c.get().0.clone().unwrap();
config.admins_of_role[role as usize] & role_flag != 0
})
}
#[cfg(feature = "access-roles")]
#[update]
pub fn grant_roles(roles: Vec<u8>, principal: Principal) -> Vec<bool> {
let mut success = Vec::with_capacity(roles.len());
ACCESS_ROLES.with(|ar| {
let mut ar = ar.borrow_mut();
let mut principal_roles = ar.get(&principal.into()).unwrap_or(0);
let caller_roles = ar.get(&canister_caller().into()).unwrap_or(0);
for role in roles {
if role <= 31 && (is_admin(canister_caller()) || is_role_admin(caller_roles, role)) {
principal_roles |= 1 << role;
success.push(true);
} else {
success.push(false);
}
}
#[allow(clippy::expect_used)] ar.insert(principal.into(), principal_roles);
});
success
}
#[cfg(feature = "access-roles")]
#[update]
pub fn revoke_roles(roles: Vec<u8>, principal: Principal) -> Vec<bool> {
let mut success = Vec::with_capacity(roles.len());
ACCESS_ROLES.with(|c| {
let mut c = c.borrow_mut();
let mut principal_roles = c.get(&principal.into()).unwrap_or(0);
let caller_roles = c.get(&canister_caller().into()).unwrap_or(0);
for role in roles {
if role <= 31 && (is_admin(canister_caller()) || is_role_admin(caller_roles, role)) {
principal_roles &= !(1 << role);
success.push(true);
} else {
success.push(false);
}
}
#[allow(clippy::expect_used)] c.insert(principal.into(), principal_roles)
.expect("Role update failed");
});
success
}
#[cfg(feature = "access-roles")]
#[query]
pub fn get_user_roles(principal: Principal) -> u32 {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
c.get(&principal.into()).unwrap_or(0)
})
}
#[cfg(feature = "access-roles")]
#[query]
pub fn user_has_role(role: u8, principal: Principal) -> bool {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&principal.into()).unwrap_or(0);
principal_roles & (1 << role) != 0
})
}
#[cfg(feature = "access-roles")]
pub fn has_role(role: u8) -> Result<(), String> {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&canister_caller().into()).unwrap_or(0);
if principal_roles & (1 << role) != 0 {
Ok(())
} else {
Err("Unauthorized".to_string())
}
})
}
#[cfg(feature = "access-roles")]
#[query]
pub fn user_has_roles_all(roles: Vec<u8>, principal: Principal) -> bool {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&principal.into()).unwrap_or(0);
roles.iter().all(|role| principal_roles & (1 << role) != 0)
})
}
#[cfg(feature = "access-roles")]
pub fn has_roles_all(roles: Vec<u8>) -> Result<(), String> {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&canister_caller().into()).unwrap_or(0);
if roles.iter().all(|role| principal_roles & (1 << role) != 0) {
Ok(())
} else {
Err("Unauthorized".to_string())
}
})
}
#[cfg(feature = "access-roles")]
#[query]
pub fn user_has_roles_any(roles: Vec<u8>, principal: Principal) -> bool {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&principal.into()).unwrap_or(0);
roles.iter().any(|role| principal_roles & (1 << role) != 0)
})
}
#[cfg(feature = "access-roles")]
pub fn has_roles_any(roles: Vec<u8>) -> Result<(), String> {
ACCESS_ROLES.with(|c| {
let c = c.borrow();
let principal_roles = c.get(&canister_caller().into()).unwrap_or(0);
if roles.iter().any(|role| principal_roles & (1 << role) != 0) {
Ok(())
} else {
Err("Unauthorized".to_string())
}
})
}
#[cfg(feature = "access-roles")]
#[update]
#[modifiers("only_admin")]
pub fn set_role_admins(role: u8, admins: Vec<u8>) {
let admin_bit_flags = admins.iter().fold(0, |acc, x| acc | (1 << x));
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
if let Some(x) = config.admins_of_role.get_mut(role as usize) {
*x |= admin_bit_flags;
}
#[allow(clippy::expect_used)] c.set(Cbor(Some(config))).expect("Set role admin failed");
});
}
#[cfg(feature = "access-roles")]
#[update]
#[modifiers("only_admin")]
pub fn revoke_role_admins(role: u8, admins: Vec<u8>) {
let admin_bit_flags = admins.iter().fold(0, |acc, x| acc | (1 << x));
ACCESS_CONTROL.with(|c| {
let mut c = c.borrow_mut();
#[allow(clippy::unwrap_used)] let mut config = c.get().0.clone().unwrap();
if let Some(x) = config.admins_of_role.get_mut(role as usize) {
*x &= !admin_bit_flags;
}
#[allow(clippy::expect_used)] c.set(Cbor(Some(config))).expect("Revoke role admin failed");
});
}
#[cfg(test)]
mod access_tests {
use super::*;
#[test]
fn test_ownable() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_owner().is_ok());
assert!(owner() == Principal::from_text(MOCK_USER_0).ok());
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
assert!(only_owner().is_err());
}
#[test]
fn test_ownable_transfer_ownership() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_owner().is_ok());
assert!(owner() == Principal::from_text(MOCK_USER_0).ok());
transfer_ownership(Some(Principal::from_text(MOCK_USER_1).unwrap()));
assert!(owner() == Principal::from_text(MOCK_USER_0).ok());
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
assert!(only_owner().is_err());
accept_ownership();
assert!(only_owner().is_ok());
assert!(owner() == Principal::from_text(MOCK_USER_1).ok());
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
assert!(only_owner().is_err());
}
#[test]
#[should_panic(expected = "No pending owner")]
fn test_ownable_transfer_ownership_to_none() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_owner().is_ok());
transfer_ownership(None);
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
accept_ownership();
}
#[test]
#[should_panic(expected = "msg_reject should only be called inside canisters")]
fn test_ownable_unauth_transfer() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_owner().is_ok());
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
transfer_ownership(Some(Principal::from_text(MOCK_USER_1).unwrap()));
}
#[test]
fn test_ownable_transfer_ownership_to_self() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_owner().is_ok());
transfer_ownership(Some(Principal::from_text(MOCK_USER_0).unwrap()));
assert!(only_owner().is_ok());
accept_ownership();
assert!(only_owner().is_ok());
}
#[test]
fn test_admin() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(only_admin().is_ok());
assert!(is_admin(Principal::from_text(MOCK_USER_0).unwrap()));
assert!(!is_admin(Principal::from_text(MOCK_USER_1).unwrap()));
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
assert!(only_admin().is_err());
}
#[test]
fn test_grant_admin() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(!is_admin(Principal::from_text(MOCK_USER_1).unwrap()));
grant_admin(Principal::from_text(MOCK_USER_1).unwrap());
assert!(is_admin(Principal::from_text(MOCK_USER_1).unwrap()));
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
assert!(only_admin().is_ok());
}
#[test]
fn test_revoke_admin() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(is_admin(Principal::from_text(MOCK_USER_0).unwrap()));
revoke_admin(Principal::from_text(MOCK_USER_0).unwrap());
assert!(!is_admin(Principal::from_text(MOCK_USER_0).unwrap()));
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
assert!(only_admin().is_err());
}
#[test]
fn test_renounce_admin() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
grant_admin(Principal::from_text(MOCK_USER_1).unwrap());
assert!(is_admin(Principal::from_text(MOCK_USER_1).unwrap()));
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
assert!(only_admin().is_ok());
renounce_admin();
assert!(!is_admin(Principal::from_text(MOCK_USER_1).unwrap()));
}
#[test]
#[should_panic(expected = "msg_reject should only be called inside canisters")]
fn grant_admin_unauth() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
grant_admin(Principal::from_text(MOCK_USER_1).unwrap());
}
#[test]
#[should_panic(expected = "msg_reject should only be called inside canisters")]
fn revoke_admin_unauth() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
set_mock_caller(Principal::from_text(MOCK_USER_1).unwrap());
revoke_admin(Principal::from_text(MOCK_USER_1).unwrap());
}
}
#[cfg(test)]
#[cfg(feature = "access-roles")]
mod access_role_tests {
use super::*;
#[repr(u8)]
enum Role {
R0 = 0,
R1 = 1,
R2 = 2,
R3 = 3,
}
impl From<Role> for u8 {
fn from(role: Role) -> Self {
role as u8
}
}
const ROLE0: u8 = 0;
#[allow(unused)]
const ROLE1: u8 = 1;
#[allow(unused)]
const ROLE2: u8 = 2;
#[allow(unused)]
const ROLE3: u8 = 3;
#[test]
#[should_panic(expected = "msg_reject should only be called inside canisters")]
#[modifiers("has_role@Role::R0.into()")]
fn test_syntax_role_modifiers_enum() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
}
#[test]
#[should_panic(expected = "msg_reject should only be called inside canisters")]
#[modifiers("has_role@ROLE0")]
fn test_syntax_role_modifiers_const() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
}
#[test]
fn test_grant_roles() {
set_mock_caller(Principal::from_text(MOCK_USER_0).unwrap());
access_init(canister_caller());
assert!(!user_has_role(
Role::R0.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(!user_has_role(
Role::R1.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(!user_has_role(
Role::R2.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(!user_has_role(
Role::R3.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(is_admin(canister_caller()));
grant_roles(
vec![Role::R0.into(), Role::R1.into()],
Principal::from_text(MOCK_USER_1).unwrap(),
);
assert!(user_has_role(
Role::R0.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(user_has_role(
Role::R1.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(!user_has_role(
Role::R2.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
assert!(!user_has_role(
Role::R3.into(),
Principal::from_text(MOCK_USER_1).unwrap()
));
}
}