use std::path::PathBuf;
use tokio::process::Command;
use tracing::warn;
use landlock::{
path_beneath_rules, Access, AccessFs, BitFlags, Ruleset, RulesetAttr, RulesetCreatedAttr,
RulesetStatus, ABI,
};
fn read_access() -> BitFlags<AccessFs> {
AccessFs::ReadFile | AccessFs::ReadDir | AccessFs::Execute | AccessFs::Refer
}
fn full_access() -> BitFlags<AccessFs> {
AccessFs::from_all(ABI::V5)
}
pub(crate) fn protected_command(
program: &str,
data_dir: &std::path::Path,
profile: &crate::SandboxProfile,
) -> Command {
if !landlock_available() {
warn!("landlock: not supported by this kernel; falling back to code-level protection");
return Command::new(program);
}
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
let data_dir_owned = data_dir.to_path_buf();
let profile_clone = profile.clone();
let mut cmd = Command::new(program);
unsafe {
cmd.pre_exec(move || {
apply_landlock(&home, &data_dir_owned, &profile_clone).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::PermissionDenied, e.to_string())
})
});
}
cmd
}
fn landlock_available() -> bool {
std::path::Path::new("/sys/kernel/security/landlock/abi_version").exists()
}
fn refer_only() -> BitFlags<AccessFs> {
AccessFs::Refer.into()
}
fn apply_landlock(
home: &str,
data_dir: &std::path::Path,
profile: &crate::SandboxProfile,
) -> Result<(), anyhow::Error> {
let home_dir = PathBuf::from(home);
let mut ruleset = Ruleset::default()
.handle_access(full_access())?
.create()?
.add_rules(path_beneath_rules(&[PathBuf::from("/")], read_access()))?
.add_rules(path_beneath_rules(&[home_dir], full_access()))?
.add_rules(path_beneath_rules(&[PathBuf::from("/tmp")], full_access()))?;
let optional_paths = ["/var/tmp", "/opt", "/srv", "/run", "/media", "/mnt"];
for path in &optional_paths {
let p = PathBuf::from(path);
if p.exists() {
ruleset = ruleset.add_rules(path_beneath_rules(&[p], full_access()))?;
}
}
for allowed in &profile.allowed_paths {
if allowed.exists() {
ruleset = ruleset.add_rules(path_beneath_rules(&[allowed.clone()], full_access()))?;
}
}
let data_data = data_dir.join("data");
let _ = std::fs::create_dir_all(&data_data);
if data_data.exists() {
ruleset = ruleset.add_rules(path_beneath_rules(&[data_data], refer_only()))?;
}
let config_file = data_dir.join("config.toml");
if config_file.exists() {
ruleset = ruleset.add_rules(path_beneath_rules(&[config_file], refer_only()))?;
}
for blocked in &profile.blocked_paths {
if blocked.exists() {
ruleset = ruleset.add_rules(path_beneath_rules(&[blocked.clone()], refer_only()))?;
}
}
let status = ruleset.restrict_self()?;
if status.ruleset != RulesetStatus::FullyEnforced {
warn!(
"landlock: not all restrictions enforced (kernel may lack full support); \
best-effort protection active"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_access_flags() {
let flags = read_access();
assert!(flags.contains(AccessFs::ReadFile));
assert!(flags.contains(AccessFs::ReadDir));
assert!(flags.contains(AccessFs::Execute));
}
#[test]
fn test_full_access_contains_writes() {
let flags = full_access();
assert!(flags.contains(AccessFs::WriteFile));
assert!(flags.contains(AccessFs::ReadFile));
assert!(flags.contains(AccessFs::MakeDir));
}
#[test]
fn test_refer_only_blocks_reads_and_writes() {
let flags = refer_only();
assert!(flags.contains(AccessFs::Refer));
assert!(!flags.contains(AccessFs::ReadFile));
assert!(!flags.contains(AccessFs::WriteFile));
}
#[test]
fn test_command_structure() {
let data_dir = PathBuf::from("/tmp/ws");
let profile = crate::SandboxProfile::default();
let cmd = protected_command("claude", &data_dir, &profile);
let program = cmd.as_std().get_program().to_string_lossy().to_string();
assert_eq!(program, "claude");
}
}