use crate::state::State;
use crate::types::{Action, CapId, Capability, MemAddr, PolicyContext, Right, SecurityLevel};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuthorizationError {
CapabilityNotFound(CapId),
CapabilityRevoked(CapId),
HolderMismatch {
expected: u128,
actual: u128,
},
TargetMismatch {
expected: u128,
actual: u128,
},
InsufficientRights,
PolicyDenied,
ResourceNotLive(MemAddr),
BibaViolation {
writer_level: SecurityLevel,
target_level: SecurityLevel,
},
CapabilityNotHeld(CapId),
}
impl std::fmt::Display for AuthorizationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthorizationError::CapabilityNotFound(id) => {
write!(f, "Capability {id} not found")
}
AuthorizationError::CapabilityRevoked(id) => {
write!(f, "Capability {id} is revoked")
}
AuthorizationError::HolderMismatch { expected, actual } => {
write!(f, "Holder mismatch: expected {expected}, got {actual}")
}
AuthorizationError::TargetMismatch { expected, actual } => {
write!(f, "Target mismatch: expected {expected}, got {actual}")
}
AuthorizationError::InsufficientRights => {
write!(f, "Insufficient rights")
}
AuthorizationError::PolicyDenied => {
write!(f, "Policy denied the action")
}
AuthorizationError::ResourceNotLive(addr) => {
write!(f, "Resource at {addr} is not live")
}
AuthorizationError::BibaViolation {
writer_level,
target_level,
} => {
write!(
f,
"Biba violation: writer level {writer_level:?} cannot write to target level {target_level:?}"
)
}
AuthorizationError::CapabilityNotHeld(id) => {
write!(f, "Capability {id} not held by plugin")
}
}
}
}
impl std::error::Error for AuthorizationError {}
pub fn resource_integrity(state: &State, rid: MemAddr) -> SecurityLevel {
if let Some(status) = state.ghost().get_status(rid) {
if let Some(owner) = status.owner() {
state.plugin_level(owner).unwrap_or(SecurityLevel::Public)
} else {
SecurityLevel::Public
}
} else {
SecurityLevel::Public
}
}
pub fn biba_write_ok(state: &State, action: &Action) -> bool {
if !action.rights.contains(Right::Write) {
return true;
}
let writer_level = state
.plugin_level(action.subject)
.unwrap_or(SecurityLevel::Public);
let target_level = resource_integrity(state, action.target as MemAddr);
writer_level >= target_level
}
#[allow(dead_code)] pub fn cap_is_valid(state: &State, cap: &Capability) -> bool {
state.kernel().revocation().is_valid(cap.id())
}
#[allow(dead_code)] pub fn capability_check(state: &State, action: &Action, cap: &Capability) -> bool {
if cap.holder() != action.subject {
return false;
}
if cap.target() != action.target {
return false;
}
if !action.rights.is_subset_of(cap.rights()) {
return false;
}
if !cap_is_valid(state, cap) {
return false;
}
if !state.plugin_holds(action.subject, cap.id()) {
return false;
}
true
}
#[derive(Debug, Clone)]
#[must_use = "authorization tokens must be consumed to enforce access control"]
pub struct Authorized {
cap: Capability,
action: Action,
ctx: PolicyContext,
_private: (),
}
impl Authorized {
fn new_internal(cap: Capability, action: Action, ctx: PolicyContext) -> Self {
Authorized {
cap,
action,
ctx,
_private: (),
}
}
pub fn validate_atomic(
state: &State,
cap: Capability,
action: Action,
ctx: PolicyContext,
) -> Result<Authorized, AuthorizationError> {
let provisional = Self::new_internal(cap, action, ctx);
provisional.check_capability(state)?;
provisional.check_policy(state)?;
provisional.check_validity(state)?;
provisional.check_liveness(state)?;
provisional.check_confinement()?;
provisional.check_biba(state)?;
Ok(provisional)
}
pub(crate) fn validate(&self, state: &State) -> Result<(), AuthorizationError> {
self.check_capability(state)?;
self.check_policy(state)?;
self.check_validity(state)?;
self.check_liveness(state)?;
self.check_confinement()?;
self.check_biba(state)?;
Ok(())
}
pub fn cap(&self) -> &Capability {
&self.cap
}
pub fn action(&self) -> &Action {
&self.action
}
pub fn ctx(&self) -> &PolicyContext {
&self.ctx
}
fn check_capability(&self, state: &State) -> Result<(), AuthorizationError> {
if self.cap.holder() != self.action.subject {
return Err(AuthorizationError::HolderMismatch {
expected: self.action.subject,
actual: self.cap.holder(),
});
}
if self.cap.target() != self.action.target {
return Err(AuthorizationError::TargetMismatch {
expected: self.action.target,
actual: self.cap.target(),
});
}
if !self.action.rights.is_subset_of(self.cap.rights()) {
return Err(AuthorizationError::InsufficientRights);
}
if !state.plugin_holds(self.action.subject, self.cap.id()) {
return Err(AuthorizationError::CapabilityNotHeld(self.cap.id()));
}
Ok(())
}
fn check_policy(&self, state: &State) -> Result<(), AuthorizationError> {
let decision = state.kernel().policy().eval(&self.action, &self.ctx);
if decision.is_permit() {
Ok(())
} else {
Err(AuthorizationError::PolicyDenied)
}
}
fn check_validity(&self, state: &State) -> Result<(), AuthorizationError> {
let kernel_cap = state
.kernel()
.revocation()
.get(self.cap.id())
.ok_or(AuthorizationError::CapabilityNotFound(self.cap.id()))?;
if !kernel_cap.is_valid() {
return Err(AuthorizationError::CapabilityRevoked(self.cap.id()));
}
if let Some(parent_id) = self.cap.parent() {
if !state.kernel().revocation().is_valid(parent_id) {
return Err(AuthorizationError::CapabilityRevoked(parent_id));
}
}
Ok(())
}
fn check_liveness(&self, state: &State) -> Result<(), AuthorizationError> {
let addr = self.action.target as MemAddr;
if !state.ghost().is_live(addr) {
return Err(AuthorizationError::ResourceNotLive(addr));
}
Ok(())
}
fn check_confinement(&self) -> Result<(), AuthorizationError> {
if !self.action.rights.is_subset_of(self.cap.rights()) {
return Err(AuthorizationError::InsufficientRights);
}
Ok(())
}
fn check_biba(&self, state: &State) -> Result<(), AuthorizationError> {
if !biba_write_ok(state, &self.action) {
let writer_level = state
.plugin_level(self.action.subject)
.unwrap_or(SecurityLevel::Public);
let target_level = resource_integrity(state, self.action.target as MemAddr);
return Err(AuthorizationError::BibaViolation {
writer_level,
target_level,
});
}
Ok(())
}
pub fn cap_id(&self) -> CapId {
self.cap.id()
}
pub fn holder_has_cap(&self, state: &State) -> bool {
state.plugin_holds(self.action.subject, self.cap.id())
}
pub fn policy_permitted(&self, state: &State) -> bool {
state
.kernel()
.policy()
.eval(&self.action, &self.ctx)
.is_permit()
}
pub fn rights_confined(&self) -> bool {
self.action.rights.is_subset_of(self.cap.rights())
}
pub fn biba_satisfied(&self, state: &State) -> bool {
biba_write_ok(state, &self.action)
}
}
#[cfg(test)]
impl Authorized {
pub fn new_for_test(cap: Capability, action: Action, ctx: PolicyContext) -> Self {
Authorized::new_internal(cap, action, ctx)
}
}
#[allow(dead_code)] pub fn properly_mediated(state: &State, auth: &Authorized) -> bool {
auth.validate(state).is_ok()
}
#[allow(dead_code)] pub fn authorize(state: &State, cap: &Capability, action: &Action, ctx: &PolicyContext) -> bool {
let policy_ok = state.kernel().policy().eval(action, ctx).is_permit();
let cap_ok = capability_check(state, action, cap);
policy_ok && cap_ok
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::PluginState;
use crate::types::{Rights, SealedTag};
fn make_test_cap(id: CapId, holder: u128, target: u128) -> Capability {
Capability::new(
id,
holder,
target,
Rights::singleton(Right::Read),
None,
0,
SealedTag::empty(),
)
.expect("valid capability")
}
#[test]
fn test_resource_integrity_unallocated() {
let state = State::empty();
assert_eq!(resource_integrity(&state, 999), SecurityLevel::Public);
}
#[test]
fn test_biba_write_ok_no_write() {
let state = State::empty();
let action = Action {
subject: 1,
target: 100,
rights: Rights::singleton(Right::Read), kind: "read".to_string(),
};
assert!(biba_write_ok(&state, &action));
}
#[test]
fn test_cap_is_valid_not_found() {
let state = State::empty();
let cap = make_test_cap(1, 1, 100);
assert!(!cap_is_valid(&state, &cap));
}
#[test]
fn test_cap_is_valid_in_kernel() {
let mut state = State::empty();
let cap = make_test_cap(1, 1, 100);
state
.apply_cap_delegate_mut(cap.clone(), 1)
.expect("delegate should succeed");
assert!(cap_is_valid(&state, &cap));
}
#[test]
fn test_capability_check_holder_mismatch() {
let mut state = State::empty();
let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Public, 0));
let cap = make_test_cap(1, 2, 100); let action = Action {
subject: 1, target: 100,
rights: Rights::singleton(Right::Read),
kind: "read".to_string(),
};
assert!(!capability_check(&state, &action, &cap));
}
#[test]
fn test_authorized_validate_cap_not_held() {
let mut state = State::empty();
let _ = state.insert_plugin(1, PluginState::empty(SecurityLevel::Public, 0));
let cap = make_test_cap(1, 1, 100);
let action = Action {
subject: 1,
target: 100,
rights: Rights::singleton(Right::Read),
kind: "read".to_string(),
};
let ctx = PolicyContext::default();
let result = Authorized::validate_atomic(&state, cap, action, ctx);
assert!(matches!(
result,
Err(AuthorizationError::CapabilityNotHeld(_))
));
}
#[test]
fn test_authorize_both_pass() {
let mut state = State::empty();
let mut ps = PluginState::empty(SecurityLevel::Public, 0);
ps.grant_cap_mut(1);
state.insert_plugin(1, ps).unwrap();
let cap = make_test_cap(1, 1, 100);
state
.apply_cap_delegate_mut(cap.clone(), 1)
.expect("delegate should succeed");
let mut ghost = state.ghost().clone();
ghost.alloc_mut(100, 1);
let action = Action {
subject: 1,
target: 100,
rights: Rights::singleton(Right::Read),
kind: "read".to_string(),
};
let _ctx = PolicyContext::default();
assert!(capability_check(&state, &action, &cap));
}
}