#![forbid(unsafe_code)]
use std::{fmt, sync::LazyLock};
use memchr::arch::all::is_equal;
use nix::{
errno::Errno,
fcntl::{OFlag, AT_FDCWD},
};
use serde::{ser::SerializeMap, Serialize, Serializer};
use crate::{
compat::{openat2, OpenHow, ResolveFlag},
hash::{SydHashMap, SydHashSet, SydIndexMap},
landlock::{
Access, AccessFs, AccessNet, CompatLevel, Compatible, CreateRulesetError, Errata, NetPort,
PathBeneath, PathFd, RestrictSelfFlags, RestrictionStatus, Ruleset, RulesetAttr,
RulesetCreatedAttr, RulesetError, Scope, ABI,
},
parsers::sandbox::{str2u32, LandlockCmd, LandlockOp, LandlockRule, PathSet},
path::{XPath, XPathBuf},
port::PortSet,
retry::retry_on_eintr,
sandbox::Sandbox,
};
pub static LANDLOCK_ACCESS_FS: LazyLock<SydIndexMap<&str, AccessFs>> = LazyLock::new(|| {
SydIndexMap::from_iter([
("read", AccessFs::ReadFile),
("write", AccessFs::WriteFile),
("exec", AccessFs::Execute),
("ioctl", AccessFs::IoctlDev),
("create", AccessFs::MakeReg),
("delete", AccessFs::RemoveFile),
("rename", AccessFs::Refer),
("symlink", AccessFs::MakeSym),
("truncate", AccessFs::Truncate),
("readdir", AccessFs::ReadDir),
("mkdir", AccessFs::MakeDir),
("rmdir", AccessFs::RemoveDir),
("mkbdev", AccessFs::MakeBlock),
("mkcdev", AccessFs::MakeChar),
("mkfifo", AccessFs::MakeFifo),
("bind", AccessFs::MakeSock),
("all", LandlockPolicy::access_fs_from_set("all")),
("all-x", LandlockPolicy::access_fs_from_set("all-x")),
("rpath", LandlockPolicy::access_fs_from_set("rpath")),
("wpath", LandlockPolicy::access_fs_from_set("wpath")),
("cpath", LandlockPolicy::access_fs_from_set("cpath")),
("dpath", LandlockPolicy::access_fs_from_set("dpath")),
("spath", LandlockPolicy::access_fs_from_set("spath")),
("tpath", LandlockPolicy::access_fs_from_set("tpath")),
("bnet", LandlockPolicy::access_fs_from_set("bnet")),
])
});
pub static LANDLOCK_ACCESS_NET: LazyLock<SydIndexMap<&str, AccessNet>> = LazyLock::new(|| {
SydIndexMap::from_iter([
("bind", AccessNet::BindTcp),
("connect", AccessNet::ConnectTcp),
("net", LandlockPolicy::access_net_from_set("net")),
("inet", LandlockPolicy::access_net_from_set("inet")),
("bnet", LandlockPolicy::access_net_from_set("bnet")),
("cnet", LandlockPolicy::access_net_from_set("cnet")),
])
});
#[derive(Clone, Debug, Default)]
pub struct LandlockPolicy {
pub compat_level: Option<CompatLevel>,
pub read_pathset: Option<PathSet>,
pub write_pathset: Option<PathSet>,
pub exec_pathset: Option<PathSet>,
pub ioctl_pathset: Option<PathSet>,
pub create_pathset: Option<PathSet>,
pub delete_pathset: Option<PathSet>,
pub rename_pathset: Option<PathSet>,
pub symlink_pathset: Option<PathSet>,
pub truncate_pathset: Option<PathSet>,
pub readdir_pathset: Option<PathSet>,
pub mkdir_pathset: Option<PathSet>,
pub rmdir_pathset: Option<PathSet>,
pub mkbdev_pathset: Option<PathSet>,
pub mkcdev_pathset: Option<PathSet>,
pub mkfifo_pathset: Option<PathSet>,
pub bind_pathset: Option<PathSet>,
pub bind_portset: Option<PortSet>,
pub conn_portset: Option<PortSet>,
pub scoped_abs: bool,
pub scoped_sig: bool,
pub restrict_self_flags: RestrictSelfFlags,
}
impl LandlockPolicy {
pub fn edit(&mut self, cmd: LandlockCmd, sandbox: Option<&Sandbox>) -> Result<(), Errno> {
for rule in cmd.filter {
match rule {
LandlockRule::Fs((access_fs, pat)) => {
let pat = if let Some(sandbox) = sandbox {
sandbox.expand_env(&pat)?
} else {
pat.into()
};
let pat = XPath::from_bytes(pat.as_bytes());
if cmd.op == LandlockOp::Add {
self.rule_add_fs(access_fs, pat)?;
} else {
if sandbox.is_some()
&& access_fs.intersects(AccessFs::ReadFile | AccessFs::ReadDir)
&& pat.is_equal(b"/proc")
{
return Err(Errno::EACCES);
}
if sandbox.is_some()
&& access_fs.intersects(
AccessFs::ReadFile | AccessFs::WriteFile | AccessFs::Truncate,
)
&& pat.is_equal(b"/dev/null")
{
return Err(Errno::EACCES);
}
self.rule_del_fs(access_fs, pat)?;
}
}
LandlockRule::Net((access_net, ref ports)) => {
if cmd.op == LandlockOp::Add {
self.rule_add_net(access_net, ports)?;
} else {
self.rule_del_net(access_net, ports)?;
}
}
}
}
Ok(())
}
pub fn rule_add_fs(&mut self, access: AccessFs, pat: &XPath) -> Result<(), Errno> {
if access.is_empty() {
return Err(Errno::EINVAL);
}
for access in access.iter() {
let set = self.get_pathset_mut(access);
if let Some(ref mut set) = set {
set.insert(pat.to_owned());
} else {
let mut new_set = SydHashSet::default();
new_set.insert(pat.to_owned());
*set = Some(new_set);
}
}
Ok(())
}
pub fn rule_del_fs(&mut self, access: AccessFs, pat: &XPath) -> Result<(), Errno> {
if access.is_empty() {
return Err(Errno::EINVAL);
}
for access in access.iter() {
let set = self.get_pathset_mut(access);
if let Some(ref mut set_ref) = set {
set_ref.remove(pat);
if set_ref.is_empty() {
*set = None;
}
}
}
Ok(())
}
pub fn rule_add_net(&mut self, access: AccessNet, ports: &PortSet) -> Result<(), Errno> {
if access.is_empty() {
return Err(Errno::EINVAL);
}
for access in access.iter() {
let set = self.get_portset_mut(access);
if let Some(ref mut set_ref) = set {
set_ref.union_with(ports);
} else {
*set = Some(ports.clone());
}
}
Ok(())
}
pub fn rule_del_net(&mut self, access: AccessNet, ports: &PortSet) -> Result<(), Errno> {
if access.is_empty() {
return Err(Errno::EINVAL);
}
for access in access.iter() {
let set = self.get_portset_mut(access);
if let Some(ref mut set_ref) = set {
set_ref.difference_with(ports);
if set_ref.is_clear() {
*set = None;
}
}
}
Ok(())
}
pub fn parse_errata(errata: &[u8]) -> Result<Errata, Errno> {
let mut e = Errata::empty();
for fix in errata.split(|b| *b == b',') {
if let Ok(flag) = str2u32(fix).map(Errata::from_bits_retain) {
e.insert(flag);
continue;
}
if is_equal(fix, b"tcp_socket_identification") {
e.insert(Errata::TCP_SOCKET_IDENTIFICATION);
} else if is_equal(fix, b"scoped_signal_same_tgid") {
e.insert(Errata::SCOPED_SIGNAL_SAME_TGID);
} else {
return Err(Errno::EINVAL);
}
}
if !e.is_empty() {
Ok(e)
} else {
Err(Errno::EINVAL)
}
}
pub fn parse_restrict_self_flags(
flags: &[u8],
numeric: bool,
) -> Result<RestrictSelfFlags, Errno> {
let mut f = RestrictSelfFlags::empty();
for flag in flags.split(|b| *b == b',') {
if numeric {
if let Ok(flag) =
str2u32(flag).and_then(|f| RestrictSelfFlags::from_bits(f).ok_or(Errno::EINVAL))
{
f.insert(flag);
continue;
}
}
const LOG_SAME_EXEC_OFF_NAMES: &[&[u8]] = &[b"same_exec_off", b"log_same_exec_off"];
const LOG_NEW_EXEC_ON_NAMES: &[&[u8]] = &[b"new_exec_on", b"log_new_exec_on"];
const LOG_SUBDOMAINS_OFF_NAMES: &[&[u8]] = &[b"subdomains_off", b"log_subdomains_off"];
if LOG_SAME_EXEC_OFF_NAMES.iter().any(|f| is_equal(flag, f)) {
f.insert(RestrictSelfFlags::LOG_SAME_EXEC_OFF);
} else if LOG_NEW_EXEC_ON_NAMES.iter().any(|f| is_equal(flag, f)) {
f.insert(RestrictSelfFlags::LOG_NEW_EXEC_ON);
} else if LOG_SUBDOMAINS_OFF_NAMES.iter().any(|f| is_equal(flag, f)) {
f.insert(RestrictSelfFlags::LOG_SUBDOMAINS_OFF);
} else {
return Err(Errno::EINVAL);
}
}
if !f.is_empty() {
Ok(f)
} else {
Err(Errno::EINVAL)
}
}
pub fn access(access_str: &str) -> Result<(AccessFs, AccessNet), Errno> {
let mut access_fs = AccessFs::EMPTY;
let mut access_net = AccessNet::EMPTY;
for access in access_str.split(',') {
let my_access_fs = LANDLOCK_ACCESS_FS
.get(access)
.copied()
.unwrap_or(AccessFs::EMPTY);
let my_access_net = LANDLOCK_ACCESS_NET
.get(access)
.copied()
.unwrap_or(AccessNet::EMPTY);
if my_access_fs.is_empty() && my_access_net.is_empty() {
return Err(Errno::EINVAL);
}
access_fs |= my_access_fs;
access_net |= my_access_net;
}
Ok((access_fs, access_net))
}
pub fn access_fs_from_set(set: &str) -> AccessFs {
let s = set.as_bytes();
if is_equal(s, b"all") {
AccessFs::all()
} else if is_equal(s, b"all-x") {
AccessFs::all() & !AccessFs::Execute
} else if is_equal(s, b"rpath") {
AccessFs::ReadFile | AccessFs::ReadDir
} else if is_equal(s, b"wpath") {
AccessFs::WriteFile | AccessFs::Truncate
} else if is_equal(s, b"cpath") {
AccessFs::MakeReg | AccessFs::RemoveFile | AccessFs::Refer
} else if is_equal(s, b"dpath") {
AccessFs::MakeBlock | AccessFs::MakeChar
} else if is_equal(s, b"spath") {
AccessFs::MakeFifo | AccessFs::MakeSym
} else if is_equal(s, b"tpath") {
AccessFs::MakeDir | AccessFs::RemoveDir
} else if is_equal(s, b"bnet") {
AccessFs::MakeSock
} else {
unreachable!("BUG: Invalid landlock(7) filesystem access right {set}, report a bug!");
}
}
pub fn access_net_from_set(set: &str) -> AccessNet {
let s = set.as_bytes();
if is_equal(s, b"all") {
AccessNet::all()
} else if is_equal(s, b"bnet") {
AccessNet::BindTcp
} else if is_equal(s, b"cnet") {
AccessNet::ConnectTcp
} else if is_equal(s, b"net") || is_equal(s, b"inet") {
AccessNet::BindTcp | AccessNet::ConnectTcp
} else {
unreachable!("BUG: Invalid landlock(7) network access right {set}, report a bug!");
}
}
#[expect(clippy::cognitive_complexity)]
pub fn restrict_self(&self, abi: ABI) -> Result<RestrictionStatus, RulesetError> {
let mut ruleset = Ruleset::default().handle_access(AccessFs::from_all(abi))?;
let ruleset_ref = &mut ruleset;
let level = if let Some(compat_level) = self.compat_level {
ruleset_ref.set_compatibility(compat_level);
compat_level
} else {
CompatLevel::BestEffort
};
let mut network_rules_bind = PortSet::empty();
let mut network_rules_conn = PortSet::empty();
if abi >= ABI::V4 {
if let Some(ref port_set) = self.bind_portset {
network_rules_bind = port_set.clone();
}
if network_rules_bind.is_full() {
network_rules_bind.clear();
} else {
ruleset_ref.handle_access(AccessNet::BindTcp)?;
}
if let Some(ref port_set) = self.conn_portset {
network_rules_conn = port_set.clone();
}
if network_rules_conn.is_full() {
network_rules_conn.clear();
} else {
ruleset_ref.handle_access(AccessNet::ConnectTcp)?;
}
}
if abi >= ABI::V6 {
if self.scoped_abs {
ruleset_ref.scope(Scope::AbstractUnixSocket)?;
}
if self.scoped_sig {
ruleset_ref.scope(Scope::Signal)?;
}
}
let mut all_pathset: SydHashSet<XPathBuf> = SydHashSet::default();
if let Some(ref pathset) = self.read_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.write_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.exec_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.ioctl_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.create_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.delete_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.rename_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.symlink_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.truncate_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.readdir_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.mkdir_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.rmdir_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.mkbdev_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.mkcdev_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.mkfifo_pathset {
all_pathset.extend(pathset.iter().cloned());
}
if let Some(ref pathset) = self.bind_pathset {
all_pathset.extend(pathset.iter().cloned());
}
let mut acl: SydHashMap<AccessFs, Vec<XPathBuf>> = SydHashMap::default();
for path in all_pathset {
let mut access = AccessFs::EMPTY;
if self
.read_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::ReadFile;
}
if self
.write_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::WriteFile;
}
if self
.exec_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::Execute;
}
if abi >= ABI::V5
&& self
.ioctl_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::IoctlDev;
}
if self
.create_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeReg;
}
if self
.delete_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::RemoveFile;
}
if abi >= ABI::V2
&& self
.rename_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::Refer;
}
if self
.symlink_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeSym;
}
if abi >= ABI::V3
&& self
.truncate_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::Truncate;
}
if self
.readdir_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::ReadDir;
}
if self
.mkdir_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeDir;
}
if self
.rmdir_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::RemoveDir;
}
if self
.mkbdev_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeBlock;
}
if self
.mkcdev_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeChar;
}
if self
.mkfifo_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeFifo;
}
if self
.bind_pathset
.as_ref()
.map(|set| set.contains(&path))
.unwrap_or(false)
{
access |= AccessFs::MakeSock;
}
if access.is_empty() {
continue;
}
acl.entry(access).or_default().push(path);
}
let mut ruleset = ruleset.create()?;
for (access, paths) in &acl {
ruleset = ruleset.add_rules(landlock_path_beneath_rules(level, paths, *access))?;
}
#[expect(clippy::cast_possible_truncation)]
ruleset
.add_rules(network_rules_bind.ones().map(|port| {
Ok::<NetPort, RulesetError>(NetPort::new(port as u16, AccessNet::BindTcp))
}))?
.add_rules(network_rules_conn.ones().map(|port| {
Ok::<NetPort, RulesetError>(NetPort::new(port as u16, AccessNet::ConnectTcp))
}))?
.restrict_self(self.restrict_self_flags)
}
#[inline]
fn get_pathset_mut(&mut self, access: AccessFs) -> &mut Option<PathSet> {
match access {
AccessFs::ReadFile => &mut self.read_pathset,
AccessFs::WriteFile => &mut self.write_pathset,
AccessFs::Execute => &mut self.exec_pathset,
AccessFs::IoctlDev => &mut self.ioctl_pathset,
AccessFs::MakeReg => &mut self.create_pathset,
AccessFs::RemoveFile => &mut self.delete_pathset,
AccessFs::Refer => &mut self.rename_pathset,
AccessFs::MakeSym => &mut self.symlink_pathset,
AccessFs::Truncate => &mut self.truncate_pathset,
AccessFs::ReadDir => &mut self.readdir_pathset,
AccessFs::MakeDir => &mut self.mkdir_pathset,
AccessFs::RemoveDir => &mut self.rmdir_pathset,
AccessFs::MakeBlock => &mut self.mkbdev_pathset,
AccessFs::MakeChar => &mut self.mkcdev_pathset,
AccessFs::MakeFifo => &mut self.mkfifo_pathset,
AccessFs::MakeSock => &mut self.bind_pathset,
_ => unreachable!("BUG: unhandled Landlock filesystem access right {access:?}!"),
}
}
#[inline]
fn get_portset_mut(&mut self, access: AccessNet) -> &mut Option<PortSet> {
match access {
AccessNet::BindTcp => &mut self.bind_portset,
AccessNet::ConnectTcp => &mut self.conn_portset,
_ => unreachable!("BUG: unhandled Landlock network access right {access:?}!"),
}
}
}
impl fmt::Display for LandlockPolicy {
#[expect(clippy::cognitive_complexity)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, " Landlock Policy:")?;
let level = self.compat_level.unwrap_or(CompatLevel::BestEffort);
writeln!(f, " Compat Level: {level:?}")?;
writeln!(f, " Scoped Abstract Unix Sockets: {}", self.scoped_abs)?;
writeln!(f, " Scoped Signals: {}", self.scoped_sig)?;
writeln!(f, " Restrict Self Flags: {:?}", self.restrict_self_flags)?;
macro_rules! fmt_pathset {
($field:ident, $name:expr) => {
if let Some(ref set) = self.$field {
if !set.is_empty() {
let mut paths: Vec<_> = set.iter().collect();
paths.sort();
writeln!(f, " {}: {paths:?}", $name)?;
}
}
};
}
macro_rules! fmt_portset {
($field:ident, $name:expr) => {
if let Some(ref set) = self.$field {
if set.count_ones(..) > 0 {
let mut ports: Vec<_> = set.ones().collect();
ports.sort();
writeln!(f, " {}: {ports:?}", $name)?;
}
}
};
}
fmt_pathset!(read_pathset, "Read Pathset");
fmt_pathset!(write_pathset, "Write Pathset");
fmt_pathset!(exec_pathset, "Exec Pathset");
fmt_pathset!(ioctl_pathset, "Ioctl Pathset");
fmt_pathset!(create_pathset, "Create Pathset");
fmt_pathset!(delete_pathset, "Delete Pathset");
fmt_pathset!(rename_pathset, "Rename Pathset");
fmt_pathset!(symlink_pathset, "Symlink Pathset");
fmt_pathset!(truncate_pathset, "Truncate Pathset");
fmt_pathset!(readdir_pathset, "Readdir Pathset");
fmt_pathset!(mkdir_pathset, "Mkdir Pathset");
fmt_pathset!(rmdir_pathset, "Rmdir Pathset");
fmt_pathset!(mkbdev_pathset, "Mkbdev Pathset");
fmt_pathset!(mkcdev_pathset, "Mkcdev Pathset");
fmt_pathset!(mkfifo_pathset, "Mkfifo Pathset");
fmt_pathset!(bind_pathset, "Bind Pathset");
fmt_portset!(bind_portset, " Bind Portset");
fmt_portset!(conn_portset, " Connect Portset");
Ok(())
}
}
impl Serialize for LandlockPolicy {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
let level = match self.compat_level.unwrap_or(CompatLevel::BestEffort) {
CompatLevel::BestEffort => "best-effort",
CompatLevel::SoftRequirement => "soft-requirement",
CompatLevel::HardRequirement => "hard-requirement",
};
map.serialize_entry("compat_level", level)?;
map.serialize_entry("scoped_abs", &self.scoped_abs)?;
map.serialize_entry("scoped_sig", &self.scoped_sig)?;
let mut flags = Vec::new();
if self
.restrict_self_flags
.contains(RestrictSelfFlags::LOG_SAME_EXEC_OFF)
{
flags.push("log-same-exec-off");
}
if self
.restrict_self_flags
.contains(RestrictSelfFlags::LOG_NEW_EXEC_ON)
{
flags.push("log-new-exec-on");
}
if self
.restrict_self_flags
.contains(RestrictSelfFlags::LOG_SUBDOMAINS_OFF)
{
flags.push("log-subdomains-off");
}
map.serialize_entry("restrict_self_flags", &flags)?;
let mut ser_pathset = |name: &str, set: &Option<PathSet>| -> Result<(), S::Error> {
if let Some(ref set) = set {
if !set.is_empty() {
let mut paths: Vec<_> = set.iter().map(|p| p.to_string()).collect();
paths.sort();
map.serialize_entry(name, &paths)?;
}
}
Ok(())
};
ser_pathset("read_pathset", &self.read_pathset)?;
ser_pathset("write_pathset", &self.write_pathset)?;
ser_pathset("exec_pathset", &self.exec_pathset)?;
ser_pathset("ioctl_pathset", &self.ioctl_pathset)?;
ser_pathset("create_pathset", &self.create_pathset)?;
ser_pathset("delete_pathset", &self.delete_pathset)?;
ser_pathset("rename_pathset", &self.rename_pathset)?;
ser_pathset("symlink_pathset", &self.symlink_pathset)?;
ser_pathset("truncate_pathset", &self.truncate_pathset)?;
ser_pathset("readdir_pathset", &self.readdir_pathset)?;
ser_pathset("mkdir_pathset", &self.mkdir_pathset)?;
ser_pathset("rmdir_pathset", &self.rmdir_pathset)?;
ser_pathset("mkbdev_pathset", &self.mkbdev_pathset)?;
ser_pathset("mkcdev_pathset", &self.mkcdev_pathset)?;
ser_pathset("mkfifo_pathset", &self.mkfifo_pathset)?;
ser_pathset("bind_pathset", &self.bind_pathset)?;
if let Some(ref set) = self.bind_portset {
if !set.is_clear() {
map.serialize_entry("bind_portset", set)?;
}
}
if let Some(ref set) = self.conn_portset {
if !set.is_clear() {
map.serialize_entry("conn_portset", set)?;
}
}
map.end()
}
}
#[expect(clippy::cognitive_complexity)]
#[expect(clippy::disallowed_methods)]
pub(crate) fn landlock_path_beneath_rules<I, P>(
level: CompatLevel,
paths: I,
access: AccessFs,
) -> impl Iterator<Item = Result<PathBeneath<PathFd>, RulesetError>>
where
I: IntoIterator<Item = P>,
P: AsRef<XPath>,
{
let compat_level = match level {
CompatLevel::HardRequirement => "hard-requirement",
CompatLevel::SoftRequirement => "soft-requirement",
CompatLevel::BestEffort => "best-effort",
};
paths.into_iter().filter_map(move |p| {
let p = p.as_ref();
if p.has_parent_dot() {
crate::error!("ctx": "init", "op": "landlock_create_ruleset",
"path": p, "access": access,
"cmp": compat_level, "err": Errno::EACCES as i32,
"msg": format!("open path `{p}' for Landlock failed due to dotdot"),
"tip": "avoid using `..' components in Landlock rules");
return Some(Err(RulesetError::CreateRuleset(
CreateRulesetError::CreateRulesetCall {
source: Errno::EACCES.into(),
},
)));
}
let how = OpenHow::new()
.flags(OFlag::O_PATH | OFlag::O_CLOEXEC)
.resolve(ResolveFlag::RESOLVE_NO_MAGICLINKS);
match retry_on_eintr(|| openat2(AT_FDCWD, p, how)) {
Ok(fd) => Some(Ok(PathBeneath::new(PathFd { fd }, access))),
Err(errno @ Errno::ENOENT) if level == CompatLevel::BestEffort => {
crate::info!("ctx": "init", "op": "landlock_create_ruleset",
"path": p, "access": access,
"cmp": compat_level, "err": errno as i32,
"msg": format!("open path `{p}' for Landlock failed: {errno}"));
None
}
Err(errno) => {
let tip = if errno == Errno::ENOENT {
"use parent dir or set `default/lock:warn' to ignore file-not-found errors for Landlock"
} else if errno == Errno::ELOOP {
"avoid using magiclinks in Landlock rules"
} else {
"avoid using inaccessible paths in Landlock rules"
};
crate::error!("ctx": "init", "op": "landlock_create_ruleset",
"path": p, "access": access,
"cmp": compat_level, "err": errno as i32,
"msg": format!("open path `{p}' for Landlock failed: {errno}"),
"tip": tip);
Some(Err(RulesetError::CreateRuleset(
CreateRulesetError::CreateRulesetCall {
source: errno.into(),
},
)))
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_errata_1() {
let e = LandlockPolicy::parse_errata(b"tcp_socket_identification").unwrap();
assert!(e.contains(Errata::TCP_SOCKET_IDENTIFICATION));
}
#[test]
fn test_parse_errata_2() {
let e = LandlockPolicy::parse_errata(b"scoped_signal_same_tgid").unwrap();
assert!(e.contains(Errata::SCOPED_SIGNAL_SAME_TGID));
}
#[test]
fn test_parse_errata_3() {
let e = LandlockPolicy::parse_errata(b"tcp_socket_identification,scoped_signal_same_tgid")
.unwrap();
assert!(e.contains(Errata::TCP_SOCKET_IDENTIFICATION));
assert!(e.contains(Errata::SCOPED_SIGNAL_SAME_TGID));
}
#[test]
fn test_parse_errata_4() {
let result = LandlockPolicy::parse_errata(b"invalid_errata");
assert_eq!(result, Err(Errno::EINVAL));
}
#[test]
fn test_parse_errata_5() {
let result = LandlockPolicy::parse_errata(b"");
assert_eq!(result, Err(Errno::EINVAL));
}
#[test]
fn test_parse_errata_6() {
let e = LandlockPolicy::parse_errata(b"1").unwrap();
assert_eq!(e.bits(), 1);
}
#[test]
fn test_parse_restrict_self_flags_1() {
let f = LandlockPolicy::parse_restrict_self_flags(b"same_exec_off", false).unwrap();
assert!(f.contains(RestrictSelfFlags::LOG_SAME_EXEC_OFF));
}
#[test]
fn test_parse_restrict_self_flags_2() {
let f = LandlockPolicy::parse_restrict_self_flags(b"log_same_exec_off", false).unwrap();
assert!(f.contains(RestrictSelfFlags::LOG_SAME_EXEC_OFF));
}
#[test]
fn test_parse_restrict_self_flags_3() {
let f = LandlockPolicy::parse_restrict_self_flags(b"new_exec_on", false).unwrap();
assert!(f.contains(RestrictSelfFlags::LOG_NEW_EXEC_ON));
}
#[test]
fn test_parse_restrict_self_flags_4() {
let f = LandlockPolicy::parse_restrict_self_flags(b"subdomains_off", false).unwrap();
assert!(f.contains(RestrictSelfFlags::LOG_SUBDOMAINS_OFF));
}
#[test]
fn test_parse_restrict_self_flags_5() {
let result = LandlockPolicy::parse_restrict_self_flags(b"invalid", false);
assert_eq!(result, Err(Errno::EINVAL));
}
#[test]
fn test_parse_restrict_self_flags_6() {
let result = LandlockPolicy::parse_restrict_self_flags(b"", false);
assert_eq!(result, Err(Errno::EINVAL));
}
#[test]
fn test_parse_restrict_self_flags_7() {
let f =
LandlockPolicy::parse_restrict_self_flags(b"same_exec_off,new_exec_on", false).unwrap();
assert!(f.contains(RestrictSelfFlags::LOG_SAME_EXEC_OFF));
assert!(f.contains(RestrictSelfFlags::LOG_NEW_EXEC_ON));
}
#[test]
fn test_access_1() {
let (fs, net) = LandlockPolicy::access("exec").unwrap();
assert!(fs.contains(AccessFs::Execute));
assert!(net.is_empty());
}
#[test]
fn test_access_2() {
let (fs, net) = LandlockPolicy::access("bind").unwrap();
assert!(fs.contains(AccessFs::MakeSock));
assert!(net.contains(AccessNet::BindTcp));
}
#[test]
fn test_access_3() {
let (_, net) = LandlockPolicy::access("connect").unwrap();
assert!(net.contains(AccessNet::ConnectTcp));
}
#[test]
fn test_access_4() {
let result = LandlockPolicy::access("nonexistent_right");
assert_eq!(result, Err(Errno::EINVAL));
}
#[test]
fn test_access_5() {
let (fs, _) = LandlockPolicy::access("read,write").unwrap();
assert!(fs.contains(AccessFs::ReadFile));
assert!(fs.contains(AccessFs::WriteFile));
}
#[test]
fn test_access_fs_from_set_1() {
let fs = LandlockPolicy::access_fs_from_set("all");
assert_eq!(fs, AccessFs::all());
}
#[test]
fn test_access_fs_from_set_2() {
let fs = LandlockPolicy::access_fs_from_set("rpath");
assert!(fs.contains(AccessFs::ReadFile));
assert!(fs.contains(AccessFs::ReadDir));
}
#[test]
fn test_access_fs_from_set_3() {
let fs = LandlockPolicy::access_fs_from_set("wpath");
assert!(fs.contains(AccessFs::WriteFile));
assert!(fs.contains(AccessFs::Truncate));
}
#[test]
fn test_access_fs_from_set_4() {
let fs = LandlockPolicy::access_fs_from_set("cpath");
assert!(fs.contains(AccessFs::MakeReg));
assert!(fs.contains(AccessFs::RemoveFile));
assert!(fs.contains(AccessFs::Refer));
}
#[test]
fn test_access_fs_from_set_5() {
let fs = LandlockPolicy::access_fs_from_set("dpath");
assert!(fs.contains(AccessFs::MakeBlock));
assert!(fs.contains(AccessFs::MakeChar));
}
#[test]
fn test_access_fs_from_set_6() {
let fs = LandlockPolicy::access_fs_from_set("spath");
assert!(fs.contains(AccessFs::MakeFifo));
assert!(fs.contains(AccessFs::MakeSym));
}
#[test]
fn test_access_fs_from_set_7() {
let fs = LandlockPolicy::access_fs_from_set("tpath");
assert!(fs.contains(AccessFs::MakeDir));
assert!(fs.contains(AccessFs::RemoveDir));
}
#[test]
fn test_access_fs_from_set_8() {
let fs = LandlockPolicy::access_fs_from_set("bnet");
assert!(fs.contains(AccessFs::MakeSock));
}
#[test]
fn test_access_fs_from_set_9() {
let fs = LandlockPolicy::access_fs_from_set("all-x");
assert!(!fs.contains(AccessFs::Execute));
}
#[test]
fn test_access_net_from_set_1() {
let net = LandlockPolicy::access_net_from_set("all");
assert_eq!(net, AccessNet::all());
}
#[test]
fn test_access_net_from_set_2() {
let net = LandlockPolicy::access_net_from_set("bnet");
assert_eq!(net, AccessNet::BindTcp);
}
#[test]
fn test_access_net_from_set_3() {
let net = LandlockPolicy::access_net_from_set("cnet");
assert_eq!(net, AccessNet::ConnectTcp);
}
#[test]
fn test_access_net_from_set_4() {
let net = LandlockPolicy::access_net_from_set("net");
assert!(net.contains(AccessNet::BindTcp));
assert!(net.contains(AccessNet::ConnectTcp));
}
#[test]
fn test_access_net_from_set_5() {
let net = LandlockPolicy::access_net_from_set("inet");
assert!(net.contains(AccessNet::BindTcp));
assert!(net.contains(AccessNet::ConnectTcp));
}
}