use crate::error::{NucleusError, Result};
use landlock::{
Access, AccessFs, PathBeneath, PathFd, Ruleset, RulesetAttr, RulesetCreatedAttr, RulesetError,
RulesetStatus, ABI,
};
use tracing::{debug, info, warn};
const TARGET_ABI: ABI = ABI::V5;
const MINIMUM_PRODUCTION_ABI: ABI = ABI::V3;
pub struct LandlockManager {
applied: bool,
}
impl LandlockManager {
pub fn new() -> Self {
Self { applied: false }
}
pub fn apply_container_policy(&mut self) -> Result<bool> {
self.apply_container_policy_with_mode(false)
}
pub fn assert_minimum_abi(&self, production_mode: bool) -> Result<()> {
let min_access = AccessFs::from_all(MINIMUM_PRODUCTION_ABI);
let target_access = AccessFs::from_all(TARGET_ABI);
if min_access != target_access {
info!(
"Landlock ABI: target={:?}, minimum_production={:?}",
TARGET_ABI, MINIMUM_PRODUCTION_ABI
);
}
match Ruleset::default().handle_access(AccessFs::from_all(MINIMUM_PRODUCTION_ABI)) {
Ok(_) => {
info!("Landlock ABI >= V3 confirmed");
Ok(())
}
Err(e) => {
let msg = format!(
"Kernel Landlock ABI is below minimum required version (V3): {}",
e
);
if production_mode {
Err(ll_err(e))
} else {
warn!("{}", msg);
Ok(())
}
}
}
}
pub fn apply_container_policy_with_mode(&mut self, best_effort: bool) -> Result<bool> {
if self.applied {
debug!("Landlock policy already applied, skipping");
return Ok(true);
}
info!("Applying Landlock filesystem policy");
match self.build_and_restrict() {
Ok(status) => match status {
RulesetStatus::FullyEnforced => {
self.applied = true;
info!("Landlock policy fully enforced");
Ok(true)
}
RulesetStatus::PartiallyEnforced => {
self.applied = true;
info!("Landlock policy partially enforced (kernel lacks some access rights)");
Ok(true)
}
RulesetStatus::NotEnforced => {
if best_effort {
warn!("Landlock not enforced (kernel does not support Landlock)");
Ok(false)
} else {
Err(NucleusError::LandlockError(
"Landlock not enforced (kernel does not support Landlock)".to_string(),
))
}
}
},
Err(e) => {
if best_effort {
warn!(
"Failed to apply Landlock policy: {} (continuing without Landlock)",
e
);
Ok(false)
} else {
Err(e)
}
}
}
}
fn build_and_restrict(&self) -> Result<RulesetStatus> {
let access_all = AccessFs::from_all(TARGET_ABI);
let access_read = AccessFs::from_read(TARGET_ABI);
let access_read_exec = access_read | AccessFs::Execute;
let mut access_tmp = access_all;
access_tmp.remove(AccessFs::Execute);
let mut ruleset = Ruleset::default()
.handle_access(access_all)
.map_err(ll_err)?
.create()
.map_err(ll_err)?;
if let Ok(fd) = PathFd::new("/") {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, AccessFs::ReadDir))
.map_err(ll_err)?;
}
const MANDATORY_PATHS: &[&str] = &["/bin", "/usr", "/lib", "/etc"];
for path in MANDATORY_PATHS {
if !std::path::Path::new(path).exists() {
warn!(
"Landlock: mandatory path {} does not exist; container may not function correctly",
path
);
}
}
for path in &["/bin", "/usr", "/sbin"] {
if let Ok(fd) = PathFd::new(path) {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read_exec))
.map_err(ll_err)?;
}
}
for path in &["/lib", "/lib64", "/lib32"] {
if let Ok(fd) = PathFd::new(path) {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read))
.map_err(ll_err)?;
}
}
for path in &["/etc", "/dev", "/proc"] {
if let Ok(fd) = PathFd::new(path) {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read))
.map_err(ll_err)?;
}
}
if let Ok(fd) = PathFd::new("/tmp") {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_tmp))
.map_err(ll_err)?;
}
if let Ok(fd) = PathFd::new("/nix/store") {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read_exec))
.map_err(ll_err)?;
}
if let Ok(fd) = PathFd::new("/run/secrets") {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read))
.map_err(ll_err)?;
}
if let Ok(fd) = PathFd::new("/context") {
ruleset = ruleset
.add_rule(PathBeneath::new(fd, access_read))
.map_err(ll_err)?;
}
let status = ruleset.restrict_self().map_err(ll_err)?;
Ok(status.ruleset)
}
pub fn is_applied(&self) -> bool {
self.applied
}
}
impl Default for LandlockManager {
fn default() -> Self {
Self::new()
}
}
fn ll_err(e: RulesetError) -> NucleusError {
NucleusError::LandlockError(e.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_landlock_manager_initial_state() {
let mgr = LandlockManager::new();
assert!(!mgr.is_applied());
}
#[test]
fn test_apply_idempotent() {
let mut mgr = LandlockManager::new();
let _ = mgr.apply_container_policy_with_mode(true);
let result = mgr.apply_container_policy_with_mode(true);
assert!(result.is_ok());
}
#[test]
fn test_best_effort_on_unsupported_kernel() {
let mut mgr = LandlockManager::new();
let result = mgr.apply_container_policy_with_mode(true);
assert!(result.is_ok());
}
fn extract_fn_body<'a>(source: &'a str, fn_signature: &str) -> &'a str {
let fn_start = source
.find(fn_signature)
.unwrap_or_else(|| panic!("function '{}' not found in source", fn_signature));
let after = &source[fn_start..];
let open = after
.find('{')
.unwrap_or_else(|| panic!("no opening brace found for '{}'", fn_signature));
let mut depth = 0u32;
let mut end = open;
for (i, ch) in after[open..].char_indices() {
match ch {
'{' => depth += 1,
'}' => {
depth -= 1;
if depth == 0 {
end = open + i + 1;
break;
}
}
_ => {}
}
}
&after[..end]
}
#[test]
fn test_policy_covers_nix_store_and_secrets() {
let source = include_str!("landlock.rs");
let fn_body = extract_fn_body(source, "fn build_and_restrict");
assert!(
fn_body.contains("\"/nix/store\"") || fn_body.contains("\"/nix\""),
"Landlock build_and_restrict must include a rule for /nix/store or /nix"
);
assert!(
fn_body.contains("\"/run/secrets\"") || fn_body.contains("\"/run\""),
"Landlock build_and_restrict must include a rule for /run/secrets"
);
}
#[test]
fn test_tmp_access_excludes_execute() {
let access_all = AccessFs::from_all(TARGET_ABI);
let mut access_tmp = access_all;
access_tmp.remove(AccessFs::Execute);
assert!(!access_tmp.contains(AccessFs::Execute));
assert!(access_tmp.contains(AccessFs::WriteFile));
assert!(access_tmp.contains(AccessFs::RemoveFile));
}
#[test]
fn test_not_enforced_returns_error_in_strict_mode() {
let source = include_str!("landlock.rs");
let fn_body = extract_fn_body(source, "fn apply_container_policy_with_mode");
let not_enforced_start = fn_body
.find("NotEnforced")
.expect("function must handle NotEnforced status");
let rest = &fn_body[not_enforced_start..];
let arm_end = rest
.find("RestrictionStatus::")
.unwrap_or(rest.len().min(500));
let not_enforced_block = &rest[..arm_end];
assert!(
not_enforced_block.contains("best_effort") && not_enforced_block.contains("Err"),
"NotEnforced must return Err when best_effort=false. Block: {}",
not_enforced_block
);
}
}