indymilter 0.3.0

Asynchronous milter library
Documentation
// indymilter – asynchronous milter library
// Copyright © 2021–2024 David Bürgin <dbuergin@gluet.ch>
//
// This program is free software: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
// details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <https://www.gnu.org/licenses/>.

//! A library for writing asynchronous milter applications.
//!
//! This library provides an API for creating milters that can be integrated
//! with MTAs using the sendmail mail filter protocol, also known as
//! *libmilter*.
//!
//! # Usage
//!
//! The function [`run`] is the main entry point to the indymilter API. This
//! function returns a future that can be awaited in order to execute a milter
//! application.
//!
//! The API presented for implementing a milter is modeled after the sendmail
//! milter (libmilter) API. Refer to the sendmail milter documentation for
//! details and background. The documentation included in indymilter is less
//! complete.
//!
//! # Trace logging
//!
//! This library uses the [tracing] crate for internal trace logging. For
//! insight into library operation, use a [tracing
//! subscriber][tracing-subscriber] and enable logging at `trace` level.
//!
//! A word of warning: Trace logging is very noisy, and can be misleading. Even
//! during normal operation it is common to see ‘errors’ such as inability to
//! parse empty macro definitions from the MTA. However, this is expected,
//! historically grown behaviour (no different from libmilter) and does not
//! impact operation.
//!
//! [tracing]: https://crates.io/crates/tracing
//! [tracing-subscriber]: https://crates.io/crates/tracing-subscriber

mod callbacks;
mod config;
mod connection;
mod context;
mod ffi_util;
mod listener;
mod macros;
pub mod message;
mod proto_util;
mod session;

pub use crate::{
    callbacks::{CallbackFuture, Callbacks, Status},
    config::Config,
    context::{
        ActionError, Context, ContextActions, EomActions, EomContext, NegotiateContext,
        SetErrorReply, SmtpReply, SmtpReplyError,
    },
    ffi_util::IntoCString,
    listener::{EitherListener, EitherStream, Listener},
    macros::{MacroStage, Macros},
    proto_util::{Actions, ProtoOpts, SocketInfo},
};
use std::{future::Future, io, sync::Arc};
use tokio::{
    select,
    sync::{watch, Semaphore},
};
use tracing::{error, trace};

// Logging policy: When this library encounters an unanticipated failure
// condition (programming error) it panics. No error logging is done in such a
// case.
//
// For all other error conditions, the general principle is not to log about
// library operation above `trace` level. The only exceptions are:
// - `error`: The milter library fails to provide service, eg when no new
//   connections can be accepted due to an I/O problem.
// - `warn`: The user-provided milter implementation misbehaves (user error), eg
//   when `Noreply` status is not used even though it was negotiated beforehand.

/// Runs a milter that handles MTA connections until it is shut down.
///
/// While the future returned by `run` is awaited, it perpetually accepts new
/// MTA connections and spawns a session task for each connection. This
/// procedure continues and the future will not complete until the supplied
/// `shutdown` future completes.
///
/// # Termination
///
/// For graceful termination, the milter task should be shut down by letting the
/// `shutdown` future complete. If instead the future returned by `run` is
/// simply dropped, currently active, spawned sessions may continue to execute.
///
/// When the `shutdown` future completes, all sessions exit cleanly as soon as
/// possible: That is, any milter command in the act of being processed will be
/// processed to completion (including the callback call), but commands waiting
/// in the queue are dropped.
///
/// # Errors
///
/// When the listener fails to accept any new connections an error is returned.
/// Else, the task runs for ever until it is shut down.
///
/// # Examples
///
/// The following example shows the simplest possible, no-op milter.
///
/// ```
/// # async fn f() -> std::io::Result<()> {
/// use indymilter::Callbacks;
/// use std::future;
/// use tokio::net::TcpListener;
///
/// let listener = TcpListener::bind("127.0.0.1:3000").await?;
/// let callbacks = Callbacks::<()>::new();
/// let config = Default::default();
/// let shutdown = future::pending::<()>();
///
/// indymilter::run(listener, callbacks, config, shutdown).await
/// # }
/// ```
pub async fn run<T>(
    listener: impl Listener,
    callbacks: Callbacks<T>,
    config: Config,
    shutdown: impl Future,
) -> io::Result<()>
where
    T: Send + 'static,
{
    // The supplied shutdown_milter causes the main connection/session spawn in
    // the `select!` below to exit.
    // At the same time, multiple sessions may have been spawned (detached) and
    // be busy: those need to be notified of shutdown too, via the
    // `shutdown_sessions` handle, subscribed to by each session.
    let shutdown_milter = shutdown;
    let (shutdown_sessions, _) = watch::channel(false);

    trace!("milter starting");

    // The invocation of `run_milter` never returns normally. It has an infinite
    // loop that is only broken when the listener cannot accept any new
    // connections.
    // When the shutdown future completes, the `run_milter` future is simply
    // dropped in the middle of whatever it is doing. The listener is dropped at
    // the same time and no new connections are accepted.

    let result = select! {
        res = run_milter(listener, callbacks, config, &shutdown_sessions) => {
            let e = res.unwrap_err();
            error!("milter exited with error, shutting down: {e}");
            Err(e)
        }
        _ = shutdown_milter => {
            trace!("milter shutting down");
            Ok(())
        }
    };

    // Spawned, currently active sessions need to be notified of the shutdown,
    // and exit gracefully. Await session termination.
    // Note, however, that a session’s connection may still be alive after this
    // point, because we are not waiting for the `StreamHandler` actor to exit.

    let _ = shutdown_sessions.send(true);
    shutdown_sessions.closed().await;

    result
}

// Main loop spawning session tasks that handle commands coming in on a
// connection. Normally runs for ever.
//
// However, there is an error condition that should result in loop exit and
// `Err` result: If the listener somehow breaks and cannot accept any new
// connections, the fault is not of some individual connection, but ours, and is
// propagated.
//
// Arguments are moved into the `run_milter` future, so that when it is dropped,
// associated resources (except the one only borrowed) are dropped at the same
// time, too.
async fn run_milter<T>(
    mut listener: impl Listener,
    callbacks: Callbacks<T>,
    config: Config,
    shutdown_sender: &watch::Sender<bool>,
) -> io::Result<()>
where
    T: Send + 'static,
{
    let callbacks = Arc::new(callbacks);
    let config = Arc::new(config);

    let conn_permits = Arc::new(Semaphore::new(config.max_connections));

    loop {
        // Spawn new sessions continuously, but make sure that no more than the
        // max connections limit are in flight at the same time.
        // After this point, permission to handle a new connection is available.

        let permit = conn_permits.clone().acquire_owned().await.unwrap();

        // Wait for a connection. Then, ready to go: synchronously spawn a new
        // session and resume looping.

        let stream = listener.accept().await?;

        session::spawn(stream, shutdown_sender, &callbacks, &config, permit);
    }
}