use crate::{
api::{state::deny, Server},
dsl::{
directives::{Directive, Directives},
smtp::service,
},
rule_state::RuleState,
server_api::ServerAPI,
sub_domain_hierarchy::{Builder, DomainDirectives, Script, SubDomainHierarchy},
ExecutionStage,
};
use anyhow::Context;
use rhai::module_resolvers::ModuleResolversCollection;
use rhai::{module_resolvers::FileModuleResolver, packages::Package, Engine, Scope};
use rhai_dylib::module_resolvers::libloading::DylibModuleResolver;
use vqueue::{GenericQueueManager, QueueID};
use vsmtp_common::{status::Status, CodeID, Domain, Reply, ReplyOrCodeID, TransactionType};
use vsmtp_config::{field::FieldAppVSL, Config, DnsResolvers};
use vsmtp_mail_parser::MessageBody;
#[derive(Debug)]
pub struct RuleEngine {
pub(super) global_modules: Vec<rhai::Shared<rhai::Module>>,
pub(super) static_modules: Vec<(String, rhai::Shared<rhai::Module>)>,
pub(super) server: Server,
pub(super) rules: SubDomainHierarchy,
}
type RuleEngineInput =
either::Either<(), Box<dyn Fn(Builder<'_>) -> anyhow::Result<SubDomainHierarchy>>>;
impl RuleEngine {
pub fn new(
config: std::sync::Arc<Config>,
resolvers: std::sync::Arc<DnsResolvers>,
queue_manager: std::sync::Arc<dyn GenericQueueManager>,
) -> anyhow::Result<Self> {
Self::new_inner(either::Left(()), config, resolvers, queue_manager)
}
pub fn with_hierarchy(
config: std::sync::Arc<Config>,
input: impl Fn(Builder<'_>) -> anyhow::Result<SubDomainHierarchy> + 'static,
resolvers: std::sync::Arc<DnsResolvers>,
queue_manager: std::sync::Arc<dyn GenericQueueManager>,
) -> anyhow::Result<Self> {
Self::new_inner(
either::Right(Box::new(input)),
config,
resolvers,
queue_manager,
)
}
#[tracing::instrument(name = "building-rules", skip_all)]
fn new_inner(
input: RuleEngineInput,
config: std::sync::Arc<Config>,
resolvers: std::sync::Arc<DnsResolvers>,
queue_manager: std::sync::Arc<dyn GenericQueueManager>,
) -> anyhow::Result<Self> {
if rhai::config::hashing::get_ahash_seed().is_none() {
rhai::config::hashing::set_ahash_seed(Some([1, 2, 3, 4]))
.map_err(|_| anyhow::anyhow!("Rhai ahash seed has been set before the rule engine as been built. This is a bug, please report it at https://github.com/viridIT/vSMTP/issues."))?;
}
tracing::debug!("Building rhai engine ...");
let mut engine = Self::new_rhai_engine();
tracing::debug!("Building static modules ...");
let static_modules = Self::build_static_modules(&mut engine, &config)?;
tracing::debug!("Building global modules ...");
let global_modules = Self::build_global_modules(&mut engine)?;
engine.set_module_resolver(config.path.as_ref().and_then(|path| path.parent()).map_or_else(|| {
tracing::warn!("No configuration path found, if you receive this message in production please open an issue.");
let mut resolvers = ModuleResolversCollection::new();
resolvers.push(FileModuleResolver::new_with_extension("vsl"));
resolvers.push(DylibModuleResolver::new());
resolvers
}, |path| {
let mut resolvers = ModuleResolversCollection::new();
resolvers.push(FileModuleResolver::new_with_path_and_extension(path, "vsl"));
resolvers.push(DylibModuleResolver::with_path(path));
resolvers
}));
let rules = match input {
either::Either::Left(()) => match &config.app.vsl {
FieldAppVSL {
filter_path: Some(filter_path),
domain_dir,
} => {
tracing::info!("Analyzing vSL rules at {filter_path:?}");
SubDomainHierarchy::new(&engine, filter_path, domain_dir.as_deref())?
}
FieldAppVSL {
filter_path: None, ..
} => {
tracing::warn!(
"No 'filter.vsl' provided in the config, the server will deny any incoming transaction by default."
);
SubDomainHierarchy::new_empty(&engine)?
}
},
either::Either::Right(builder) => builder(Builder::new(&engine)?)?,
};
tracing::info!("Rule engine initialized.");
#[cfg(debug_assertions)]
{
let type_id = std::any::TypeId::of::<rhai::ImmutableString>();
tracing::debug!(?type_id);
}
Ok(Self {
global_modules,
static_modules,
server: std::sync::Arc::new(ServerAPI {
config,
resolvers,
queue_manager,
}),
rules,
})
}
#[must_use]
pub fn spawn(&self) -> std::sync::Arc<RuleState> {
self.spawn_with(vsmtp_common::Context::Empty, MessageBody::default())
}
pub fn spawn_with(
&self,
mail_context: vsmtp_common::Context,
message: MessageBody,
) -> std::sync::Arc<RuleState> {
let (mail_context, message) = (
std::sync::Arc::new(std::sync::RwLock::new(mail_context)),
std::sync::Arc::new(std::sync::RwLock::new(message)),
);
let (mail_context_cpy, server_cpy, message_cpy) =
(mail_context.clone(), self.server.clone(), message.clone());
let mut engine = rhai::Engine::new_raw();
#[allow(deprecated)]
engine
.on_var(move |name, _, _| match name {
"CTX" => Ok(Some(rhai::Dynamic::from(mail_context_cpy.clone()))),
"SRV" => Ok(Some(rhai::Dynamic::from(server_cpy.clone()))),
"MSG" => Ok(Some(rhai::Dynamic::from(message_cpy.clone()))),
_ => Ok(None),
});
#[cfg(debug_assertion)]
engine
.on_print(|msg| println!("{msg}"))
.on_debug(move |s, src, pos| {
println!("{} @ {:?} > {}", src.unwrap_or("unknown source"), pos, s);
});
self.global_modules.iter().for_each(|module| {
engine.register_global_module(module.clone());
});
self.static_modules.iter().for_each(|(namespace, module)| {
engine.register_static_module(namespace, module.clone());
});
engine
.register_custom_syntax_with_state_raw(
"rule",
Directive::parse_directive,
true,
crate::dsl::directives::rule::create,
)
.register_custom_syntax_with_state_raw(
"action",
Directive::parse_directive,
true,
crate::dsl::directives::action::create,
);
#[cfg(feature = "delegation")]
engine.register_custom_syntax_with_state_raw(
"delegate",
Directive::parse_directive,
true,
crate::dsl::directives::delegation::create,
);
engine.set_fast_operators(false);
std::sync::Arc::new(RuleState {
engine,
server: self.server.clone(),
mail_context,
message,
})
}
#[allow(clippy::cognitive_complexity)]
fn get_delegation_directive_from_header(
rule_state: &RuleState,
skipped: &mut Option<Status>,
smtp_state: ExecutionStage,
script: &Script,
) -> Result<usize, Reply> {
macro_rules! err {
($err:expr) => {{
tracing::warn!($err);
$err.parse().expect("valid")
}};
}
let delegation_header = rule_state
.message()
.read()
.expect("Mutex poisoned")
.get_header("X-VSMTP-DELEGATION");
let header = delegation_header.ok_or_else(|| err!("500 Delegation header not found"))?;
let header = vsmtp_mail_parser::get_mime_header("X-VSMTP-DELEGATION", &header);
tracing::debug!(%header, "Got header for delegation");
let (directive_name, msg_uuid) = match (
header.args.get("stage"),
header.args.get("directive"),
header.args.get("id"),
) {
(Some(stage), Some(directive_name), Some(msg_uuid)) => {
match stage.parse::<ExecutionStage>() {
Ok(stage) if stage == smtp_state => (),
_ => return Err(err!("500 Delegation stage not matching")),
};
(
directive_name,
uuid::Uuid::parse_str(msg_uuid).map_err(|_err| {
err!("500 Delegation Failed to parse delegation message id")
})?,
)
}
_ => {
return Err(err!(
"500 Delegation header `X-VSMTP-DELEGATION` exists but ill-formed"
))
}
};
tracing::debug!(%directive_name, %msg_uuid, "Got header for delegation with attributes");
let position = script
.directives
.get(&smtp_state)
.ok_or_else(|| err!("500 Delegation No rules at the stages"))?
.iter()
.position(|directive| directive.name() == directive_name)
.ok_or_else(|| err!("500 Delegation directive not found"))?;
let mut ctx = rule_state
.server
.queue_manager
.get_ctx(&QueueID::Delegated, &msg_uuid);
let mut ctx =
block_on!(&mut ctx).map_err(|_err| err!("500 Delegation Failed to get old context"))?;
tracing::debug!(
"delegation changing msg uuid from {} to {}",
ctx.mail_from.message_uuid,
msg_uuid
);
ctx.connect.skipped = None;
ctx.mail_from.message_uuid = msg_uuid;
*rule_state.context().write().unwrap() = vsmtp_common::Context::Finished(ctx);
tracing::debug!("Resuming rule '{directive_name}' after delegation.",);
*skipped = None;
Ok(position)
}
#[tracing::instrument(name = "rule", skip_all, fields(stage = %smtp_state, skipped), ret)]
pub fn run_when(
&self,
rule_state: &RuleState,
skipped: &mut Option<Status>,
smtp_state: ExecutionStage,
) -> Status {
let script = {
let context = rule_state.context();
let context = context.read().expect("Mutex poisoned");
match self.get_directives_for_smtp_state(&context, smtp_state) {
Ok(script) => script,
Err(_) => return Status::Deny(ReplyOrCodeID::Left(CodeID::Denied)),
}
};
let directive = script.directives.get(&smtp_state);
let directive = match &skipped {
#[cfg(feature = "delegation")]
Some(Status::DelegationResult) if !smtp_state.is_email_received() => {
return Status::DelegationResult;
}
#[cfg(feature = "delegation")]
Some(Status::DelegationResult) => match Self::get_delegation_directive_from_header(
rule_state, skipped, smtp_state, script,
) {
Ok(position) => match directive {
Some(directive) => &directive[position..],
None => return deny(),
},
Err(e) => {
#[cfg(not(debug_assertions))]
{
tracing::warn!(error = ?e, "Failed to get delegation directive from the delegation header. Stopping processing.");
return deny();
}
#[cfg(debug_assertions)]
return Status::Deny(either::Right(e));
}
},
Some(status) if status.is_finished() => {
tracing::debug!(?status, "The status has been skipped before.");
return status.clone();
}
Some(_) | None => {
if let Some(directive) = directive {
directive
} else {
tracing::debug!("No rules for the current state, continuing.");
return Status::Next;
}
}
};
let status = Self::execute_directives(rule_state, &script.ast, directive, smtp_state);
if status.is_finished() {
tracing::info!(
"The rule engine will skip all rules because of the result {:?}",
status
);
*skipped = Some(status.clone());
}
status
}
#[must_use]
pub fn just_run_when(
&self,
skipped: &mut Option<Status>,
state: ExecutionStage,
mail_context: vsmtp_common::Context,
mail_message: MessageBody,
) -> (vsmtp_common::Context, MessageBody, Status) {
let rule_state = self.spawn_with(mail_context, mail_message);
let result = self.run_when(&rule_state, skipped, state);
let (mail_context, mail_message) = rule_state.take();
(mail_context, mail_message, result)
}
#[tracing::instrument(skip_all, err)]
fn get_directives_for_smtp_state<'a>(
&'a self,
context: &vsmtp_common::Context,
smtp_state: ExecutionStage,
) -> anyhow::Result<&'a Script> {
match smtp_state {
ExecutionStage::Connect | ExecutionStage::Helo | ExecutionStage::Authenticate => {
Ok(&self.rules.root_filter)
}
ExecutionStage::MailFrom => {
match context.reverse_path().context("bad state")? {
Some(reverse_path) if self.is_handled_domain(reverse_path) => {
self.get_domain_directives(reverse_path.domain()).map_or_else(|| {
tracing::error!(%reverse_path, "email is supposed to be outgoing but the sender's domain was not found in your vSL scripts.");
Ok(&self.rules.fallback)
}, |domain_directives| Ok(&domain_directives.outgoing))
}
Some(_) | None => Ok(&self.rules.root_filter),
}
}
ExecutionStage::RcptTo => {
let rcpt = context
.forward_paths()
.context("rcpt not found in rcpt stage")?
.last()
.ok_or_else(|| anyhow::anyhow!("could not get the latests recipient"))?;
let transaction_type = context
.transaction_type()
.context("could not get the transaction type")?;
let reverse_path = context
.reverse_path()
.context("reverse_path not found in rcpt stage")?;
match reverse_path {
Some(reverse_path) if self.is_handled_domain(reverse_path) => {
match (
self.get_domain_directives(reverse_path.domain()),
transaction_type,
) {
(Some(rules), TransactionType::Internal) => {
tracing::debug!(%rcpt, %reverse_path, "Internal email for current recipient.");
Ok(&rules.internal)
}
(Some(rules), TransactionType::Outgoing { .. }) => {
tracing::debug!(%rcpt, %reverse_path, "Outgoing email for current recipient.");
Ok(&rules.outgoing)
}
_ => {
tracing::error!(%rcpt, %reverse_path, "email is supposed to be outgoing but the sender's domain was not found in your vSL scripts.");
Ok(&self.rules.fallback)
}
}
}
None => Ok(&self.rules.root_filter),
Some(_) => {
if let (Some(rules), TransactionType::Incoming(Some(_))) = (
self.get_domain_directives(rcpt.address.domain()),
transaction_type,
) {
tracing::debug!(%rcpt, "Incoming recipient.");
Ok(&rules.incoming)
} else {
tracing::debug!(%rcpt, "Recipient unknown in unknown sender context, running fallback script.");
Ok(&self.rules.root_filter)
}
}
}
}
ExecutionStage::PreQ | ExecutionStage::PostQ | ExecutionStage::Delivery => {
let transaction_type = context
.transaction_type()
.context("could not get the transaction type")?;
let reverse_path = context
.reverse_path()
.context("sender not found in rcpt stage")?;
match reverse_path {
Some(reverse_path) if self.is_handled_domain(reverse_path) => {
match (
self.get_domain_directives(reverse_path.domain()),
transaction_type,
) {
(Some(rules), TransactionType::Internal) => Ok(&rules.internal),
(Some(rules), TransactionType::Outgoing { .. }) => Ok(&rules.outgoing),
_ => {
tracing::error!(%reverse_path, "email is supposed to be outgoing / internal but the sender's domain was not found in your vSL scripts.");
Ok(&self.rules.fallback)
}
}
}
None => Ok(&self.rules.root_filter),
Some(_) => {
match transaction_type {
TransactionType::Incoming(Some(domain)) => {
self.get_domain_directives(domain).map_or_else(
|| Ok(&self.rules.fallback),
|rules| Ok(&rules.incoming),
)
}
TransactionType::Incoming(None) => {
tracing::info!("No recipient has a domain handled by your configuration, running root incoming script");
Ok(&self.rules.root_filter)
}
TransactionType::Outgoing { .. } | TransactionType::Internal => {
tracing::error!("email is supposed to incoming but was marked has outgoing, running fallback scripts.");
Ok(&self.rules.fallback)
}
}
}
}
}
}
}
fn get_domain_directives(&self, domain: &str) -> Option<&DomainDirectives> {
if let Some(directives) = self.rules.domains.get(domain) {
return Some(directives);
}
Domain::iter(domain).find_map(|parent| self.rules.domains.get(parent))
}
fn execute_directives(
rule_state: &RuleState,
ast: &rhai::AST,
directives: &[Directive],
smtp_state: ExecutionStage,
) -> Status {
let mut status = Status::Next;
for directive in directives {
status = directive
.execute(rule_state, ast, smtp_state)
.unwrap_or_else(|e| {
let error_status = deny();
println!("error: {e:?}");
error_status
});
if status != Status::Next {
break;
}
}
status
}
#[must_use]
pub fn new_rhai_engine() -> rhai::Engine {
let mut engine = Engine::new();
#[allow(deprecated)]
engine.on_parse_token(|token, _, _| {
match token {
rhai::Token::Reserved(s) if &*s == "is" => rhai::Token::EqualsTo,
rhai::Token::Identifier(s) if &*s == "not" => rhai::Token::NotEqualsTo,
_ => token,
}
});
#[cfg(debug_assertion)]
engine
.on_print(|msg| println!("{msg}"))
.on_debug(move |s, src, pos| {
println!("{} @ {:?} > {}", src.unwrap_or("unknown source"), pos, s);
});
engine
.disable_symbol("eval")
.register_custom_syntax_with_state_raw(
"rule",
Directive::parse_directive,
true,
crate::dsl::directives::rule::create,
)
.register_custom_syntax_with_state_raw(
"action",
Directive::parse_directive,
true,
crate::dsl::directives::action::create,
);
#[cfg(feature = "delegation")]
engine.register_custom_syntax_with_state_raw(
"delegate",
Directive::parse_directive,
true,
crate::dsl::directives::delegation::create,
);
engine.set_fast_operators(false);
engine
}
pub fn build_global_modules(
engine: &mut rhai::Engine,
) -> anyhow::Result<Vec<rhai::Shared<rhai::Module>>> {
let std_module = rhai::packages::StandardPackage::new().as_shared_module();
engine.register_global_module(std_module.clone());
let vsl_rhai_module: rhai::Shared<_> = Self::compile_api(engine)
.context("failed to compile vsl's api")?
.into();
engine.register_global_module(vsl_rhai_module.clone());
Ok(vec![std_module, vsl_rhai_module])
}
pub fn build_static_modules(
engine: &mut rhai::Engine,
config: &Config,
) -> anyhow::Result<Vec<(String, rhai::Shared<rhai::Module>)>> {
let (server_config, app_config) = (
serde_json::to_string(&config.server)
.context("failed to convert the server configuration to json")?,
serde_json::to_string(&config.app)
.context("failed to convert the app configuration to json")?,
);
let mut vsl_modules = crate::api::vsmtp_static_modules()
.into_iter()
.map(|(name, module)| (name.to_owned(), rhai::Shared::new(module)))
.collect::<Vec<_>>();
vsl_modules.push(("cfg".to_owned(), {
let mut config_module = rhai::Module::new();
config_module
.set_var("server", engine.parse_json(server_config, true)?)
.set_var("app", engine.parse_json(app_config, true)?);
rhai::Shared::new(config_module)
}));
for (name, module) in &vsl_modules {
engine.register_static_module(name, module.clone());
}
Ok(vsl_modules)
}
pub fn compile_api(engine: &rhai::Engine) -> anyhow::Result<rhai::Module> {
let ast = engine.compile_scripts_with_scope(
&rhai::Scope::new(),
[include_str!("../api/internal.rhai")],
)?;
rhai::Module::eval_ast_as_new(rhai::Scope::new(), &ast, engine)
.context("failed to create a module from vsl's api.")
}
pub(crate) fn extract_directives(
engine: &rhai::Engine,
ast: &rhai::AST,
) -> anyhow::Result<Directives> {
let mut scope = Scope::new();
scope.push_constant("CTX", ()).push_constant("SRV", ());
let raw_directives = engine
.eval_ast_with_scope::<rhai::Map>(&mut scope, ast)
.context("failed to compile your rules.")?;
let mut directives = Directives::new();
for (stage, directive_set) in raw_directives {
let Ok(stage) = ExecutionStage::try_from(stage.as_str()) else {
anyhow::bail!("the '{stage}' smtp stage does not exist.")
};
let directive_set = directive_set
.try_cast::<rhai::Array>()
.ok_or_else(|| {
anyhow::anyhow!("the stage '{stage}' must be declared using the array syntax")
})?
.into_iter()
.map(|rule| {
let map = rule.try_cast::<rhai::Map>().unwrap();
let directive_type = map
.get("type")
.ok_or_else(|| anyhow::anyhow!("a directive in stage '{stage}' does not have a valid type"))?
.to_string();
let name = map
.get("name")
.ok_or_else(|| anyhow::anyhow!("a directive in stage '{stage}' does not have a name"))?
.to_string();
let pointer = map
.get("evaluate")
.ok_or_else(|| anyhow::anyhow!("the directive '{stage}' in stage '{name}' does not have an evaluation function"))?
.clone()
.try_cast::<rhai::FnPtr>()
.ok_or_else(|| anyhow::anyhow!("the evaluation field for the directive '{stage}' in stage '{name}' must be a function pointer"))?;
let directive =
match directive_type.as_str() {
"rule" => Directive::Rule { name, pointer },
"action" => Directive::Action { name, pointer },
#[cfg(feature = "delegation")]
"delegate" => {
if !stage.is_email_received() {
anyhow::bail!("invalid delegation '{name}' in stage '{stage}': delegation directives are available from the 'postq' stage and onwards.");
}
let service = map
.get("service")
.ok_or_else(|| anyhow::anyhow!("the delegation '{name}' in stage '{stage}' does not have a service to delegate processing to"))?
.clone()
.try_cast::<std::sync::Arc<service::Smtp>>()
.ok_or_else(|| anyhow::anyhow!("the field after the 'delegate' keyword in the directive '{name}' in stage '{stage}' must be a smtp service"))?;
Directive::Delegation { name, pointer, service }
},
unknown => anyhow::bail!("unknown directive type '{unknown}' called '{name}'"),
};
Ok(directive)
})
.collect::<anyhow::Result<Vec<_>>>()?;
directives.insert(stage, directive_set);
}
let names = directives
.iter()
.flat_map(|(_, d)| d)
.map(Directive::name)
.collect::<Vec<_>>();
for (idx, name) in names.iter().enumerate() {
for other in &names[idx + 1..] {
if other == name {
anyhow::bail!("found duplicate rule '{name}': a rule must have a unique name",);
}
}
}
Ok(directives)
}
#[must_use]
pub fn is_handled_domain(&self, address: &vsmtp_common::Address) -> bool {
let domain = address.domain();
if self.rules.domains.contains_key(domain) {
true
} else {
Domain::iter(domain).any(|parent| self.rules.domains.contains_key(parent))
}
}
#[must_use]
#[cfg(feature = "delegation")]
pub fn get_delegation_directive_bound_to_address(
&self,
socket: std::net::SocketAddr,
) -> Option<&Directive> {
let per_domain_scripts = self
.rules
.domains
.iter()
.flat_map(|(_, d)| [&d.incoming, &d.internal, &d.outgoing].into_iter());
std::iter::once(&self.rules.root_filter)
.chain(per_domain_scripts)
.filter_map(|script| {
script.directives.iter().flat_map(|(_, d)| d).find(|d| {
matches!(d,
Directive::Delegation { service, .. } if service.receiver == socket)
})
})
.take(1)
.next()
}
}