use crate::{
delegate,
scheduler::{self, Emitter},
ProcessMessage,
};
use anyhow::Context;
use tokio_stream::StreamExt;
use vqueue::{GenericQueueManager, QueueID};
use vsmtp_common::{
status,
transfer::{self, error::Rule},
};
use vsmtp_rule_engine::{ExecutionStage, RuleEngine};
pub(super) async fn start<Q: GenericQueueManager + Sized + 'static>(
rule_engine: std::sync::Arc<RuleEngine>,
queue_manager: std::sync::Arc<Q>,
emitter: std::sync::Arc<Emitter>,
mut receiver: scheduler::Receiver,
) {
let working_receiver = receiver.as_stream().map(|pm| {
tokio::spawn(handle_one(
rule_engine.clone(),
queue_manager.clone(),
pm,
emitter.clone(),
))
});
tokio::pin!(working_receiver);
while let Some(_join_handle) = working_receiver.next().await {}
}
#[allow(clippy::too_many_lines)]
#[tracing::instrument(name = "working", skip_all, err)]
pub async fn handle_one<Q: GenericQueueManager + Sized + 'static>(
rule_engine: std::sync::Arc<RuleEngine>,
queue_manager: std::sync::Arc<Q>,
process_message: ProcessMessage,
emitter: std::sync::Arc<Emitter>,
) -> anyhow::Result<()> {
struct Opt {
move_to_queue: Option<QueueID>,
send_to_delivery: bool,
write_email: bool,
delegated: bool,
}
let queue = if process_message.is_from_delegation() {
QueueID::Delegated
} else {
QueueID::Working
};
let (ctx, mail_message) = queue_manager
.get_both(&queue, process_message.as_ref())
.await?;
let mut skipped = ctx.connect.skipped.clone();
let (ctx, mail_message, _) = rule_engine.just_run_when(
&mut skipped,
ExecutionStage::PostQ,
vsmtp_common::Context::Finished(ctx),
mail_message,
);
let mut ctx = ctx.unwrap_finished().context("context is not finished")?;
let Opt {
move_to_queue,
send_to_delivery,
write_email,
delegated,
} = match &skipped {
Some(status::Status::Quarantine(path)) => {
queue_manager
.move_to(&queue, &QueueID::Quarantine { name: path.into() }, &ctx)
.await?;
tracing::warn!(stage = %ExecutionStage::PostQ, status = "quarantine", "Rules skipped.");
Opt {
move_to_queue: None,
send_to_delivery: false,
write_email: true,
delegated: false,
}
}
Some(status @ status::Status::Delegated(delegator)) => {
ctx.connect.skipped = Some(status::Status::DelegationResult);
queue_manager
.clone()
.move_to(&queue, &QueueID::Delegated, &ctx)
.await?;
queue_manager
.write_msg(process_message.as_ref(), &mail_message)
.await?;
delegate(delegator, &ctx, &mail_message)?;
tracing::warn!(stage = %ExecutionStage::PostQ, status = status.as_ref(), "Rules skipped.");
Opt {
move_to_queue: None,
send_to_delivery: false,
write_email: false,
delegated: true,
}
}
Some(status::Status::DelegationResult) => Opt {
move_to_queue: None,
send_to_delivery: true,
write_email: true,
delegated: true,
},
Some(status::Status::Deny(code)) => {
for rcpt in &mut ctx.rcpt_to.delivery.values_mut().flatten() {
rcpt.1 = transfer::Status::failed(Rule::Denied(code.clone()));
}
Opt {
move_to_queue: Some(QueueID::Dead),
send_to_delivery: false,
write_email: true,
delegated: false,
}
}
None | Some(status::Status::Next) => Opt {
move_to_queue: Some(QueueID::Deliver),
send_to_delivery: true,
write_email: true,
delegated: false,
},
Some(reason) => {
tracing::warn!(status = ?reason, "Rules skipped.");
Opt {
move_to_queue: Some(QueueID::Deliver),
send_to_delivery: true,
write_email: true,
delegated: false,
}
}
};
if write_email {
queue_manager
.write_msg(process_message.as_ref(), &mail_message)
.await?;
}
if let Some(next_queue) = move_to_queue {
queue_manager.move_to(&queue, &next_queue, &ctx).await?;
}
if send_to_delivery {
emitter
.send_to_delivery(if delegated {
ProcessMessage::delegated
} else {
ProcessMessage::new
}(*process_message.as_ref()))
.await?;
}
Ok(())
}