sozu 2.1.0

sozu, a fast, reliable, hot reconfigurable HTTP reverse proxy
Documentation
use std::os::fd::AsRawFd;

use libc::pid_t;
use mio::Token;
use serde::{Deserialize, Serialize};
use sozu_command_lib::{
    config::Config,
    proto::command::{
        ResponseStatus, ReturnListenSockets, RunState, SoftStop, WorkerResponse,
        request::RequestType,
    },
    state::ConfigState,
};

use super::sessions::WorkerSession;
use crate::{
    command::{
        requests::{AuditExtras, AuditResult, audit_emit_inline},
        server::{
            ClientId, Gatherer, GatheringTask, MessageClient, Server, ServerState, SessionId,
            TaskId, Timeout, WorkerId,
        },
        sessions::{ClientSession, OptionalClient},
    },
    upgrade::{UpgradeError, fork_main_into_new_main},
    util::disable_close_on_exec,
};
use sozu_command_lib::proto::command::EventKind;

#[derive(Debug)]
enum UpgradeWorkerProgress {
    /// 1. request listeners from the old worker
    /// 2. store listeners to pass them to new worker,
    RequestingListenSockets {
        old_worker_token: Token,
        old_worker_id: WorkerId,
    },
    /// 3. soft stop the old worker
    /// 4. activate the listeners of the new worker
    StopOldActivateNew {
        old_worker_id: WorkerId,
        new_worker_id: WorkerId,
    },
}

#[derive(Debug)]
struct UpgradeWorkerTask {
    pub client_token: Token,
    progress: UpgradeWorkerProgress,

    ok: usize,
    errors: usize,
    responses: Vec<(WorkerId, WorkerResponse)>,
    expected_responses: usize,
}

pub fn upgrade_worker(server: &mut Server, client: &mut ClientSession, old_worker_id: WorkerId) {
    info!(
        "client[{:?}] msg wants to upgrade worker {}",
        client.token, old_worker_id
    );

    audit_emit_inline(
        server,
        client,
        EventKind::WorkerUpgraded,
        "worker_upgraded",
        "config.worker_upgraded",
        format!("worker:{old_worker_id}"),
        AuditResult::Ok,
        AuditExtras::default(),
    );

    let old_worker_token = match server.get_active_worker_by_id(old_worker_id) {
        Some(session) => session.token,
        None => {
            client.finish_failure(format!(
                "Worker {old_worker_id} does not exist, or is stopping / stopped"
            ));
            return;
        }
    };

    client.return_processing(format!(
        "Requesting listen sockets from worker {old_worker_id}"
    ));
    server.scatter(
        RequestType::ReturnListenSockets(ReturnListenSockets {}).into(),
        Box::new(UpgradeWorkerTask {
            client_token: client.token,
            progress: UpgradeWorkerProgress::RequestingListenSockets {
                old_worker_token,
                old_worker_id,
            },
            ok: 0,
            errors: 0,
            responses: Vec::new(),
            expected_responses: 0,
        }),
        Timeout::Default,
        Some(old_worker_id),
    );
}

impl UpgradeWorkerTask {
    fn receive_listen_sockets(
        self,
        server: &mut Server,
        client: &mut OptionalClient,
        old_worker_token: Token,
        old_worker_id: WorkerId,
    ) {
        let old_worker = match server.workers.get_mut(&old_worker_token) {
            Some(old_worker) => old_worker,
            None => {
                client.finish_failure(format!("Worker {old_worker_id} died while upgrading, it should be restarted automatically"));
                return;
            }
        };
        let old_worker_id = old_worker.id;

        match old_worker.scm_socket.set_blocking(true) {
            Ok(_) => {}
            Err(error) => {
                client.finish_failure(format!("Could not set SCM sockets to blocking: {error:?}"));
                return;
            }
        }

        let listeners = match old_worker.scm_socket.receive_listeners() {
            Ok(listeners) => listeners,
            Err(_) => {
                client.finish_failure(
                    "Could not upgrade worker: did not get back listeners from the old worker",
                );
                return;
            }
        };

        old_worker.run_state = RunState::Stopping;

        // lauch new worker
        let new_worker = match server.launch_new_worker(Some(listeners)) {
            Ok(worker) => worker,
            Err(worker_err) => {
                return client.finish_failure(format!("could not launch new worker: {worker_err}"));
            }
        };
        client.return_processing(format!("Launched a new worker with id {}", new_worker.id));
        let new_worker_id = new_worker.id;

        let finish_task = server.new_task(
            Box::new(UpgradeWorkerTask {
                client_token: self.client_token,
                progress: UpgradeWorkerProgress::StopOldActivateNew {
                    old_worker_id,
                    new_worker_id,
                },

                ok: 0,
                errors: 0,
                responses: Vec::new(),
                expected_responses: 0,
            }),
            Timeout::None,
        );

        // Stop the old worker
        client.return_processing(format!("Soft stopping worker with id {old_worker_id}"));
        server.scatter_on(
            RequestType::SoftStop(SoftStop {}).into(),
            finish_task,
            0,
            Some(old_worker_id),
        );

        // activate new worker
        for (count, request) in server
            .state
            .generate_activate_requests()
            .into_iter()
            .enumerate()
        {
            server.scatter_on(request, finish_task, count + 1, Some(new_worker_id));
        }
    }
}

impl GatheringTask for UpgradeWorkerTask {
    fn client_token(&self) -> Option<Token> {
        Some(self.client_token)
    }

    fn get_gatherer(&mut self) -> &mut dyn super::server::Gatherer {
        self
    }

    fn on_finish(
        self: Box<Self>,
        server: &mut Server,
        client: &mut OptionalClient,
        _timed_out: bool,
    ) {
        match self.progress {
            UpgradeWorkerProgress::RequestingListenSockets {
                old_worker_token,
                old_worker_id,
            } => {
                if self.ok == 1 {
                    self.receive_listen_sockets(server, client, old_worker_token, old_worker_id);
                } else {
                    client.finish_failure(format!(
                        "Could not get listen sockets from old worker:{:?}",
                        self.responses
                    ));
                }
            }
            UpgradeWorkerProgress::StopOldActivateNew {
                old_worker_id,
                new_worker_id,
            } => {
                client.finish_ok(
                    format!(
                        "Upgrade successful:\n- finished soft stop of worker {old_worker_id:?}\n- finished activation of new worker {new_worker_id:?}"
                    )
                );
            }
        }
    }
}

impl Gatherer for UpgradeWorkerTask {
    fn inc_expected_responses(&mut self, count: usize) {
        self.expected_responses += count;
    }

    fn has_finished(&self) -> bool {
        self.ok + self.errors >= self.expected_responses
    }

    fn on_message(
        &mut self,
        _server: &mut Server,
        client: &mut OptionalClient,
        worker_id: WorkerId,
        message: WorkerResponse,
    ) {
        match ResponseStatus::try_from(message.status) {
            Ok(ResponseStatus::Ok) => {
                self.ok += 1;
                match self.progress {
                    UpgradeWorkerProgress::RequestingListenSockets { .. } => {}
                    UpgradeWorkerProgress::StopOldActivateNew { .. } => {
                        client.return_processing(format!(
                            "Worker {worker_id} answered OK to {}. {}",
                            message.id, message.message
                        ))
                    }
                }
            }
            Ok(ResponseStatus::Failure) => self.errors += 1,
            Ok(ResponseStatus::Processing) => client.return_processing(format!(
                "Worker {worker_id} is processing {}. {}",
                message.id, message.message
            )),
            Err(e) => warn!("error decoding response status: {}", e),
        }
        self.responses.push((worker_id, message));
    }
}

//===============================================
// Upgrade the main process

/// Summary of a worker session, meant to be passed to a new main process
/// during an upgrade, in order to recreate the worker
#[derive(Deserialize, Serialize, Debug)]
pub struct SerializedWorkerSession {
    /// file descriptor of the UNIX channel
    pub channel_fd: i32,
    pub pid: pid_t,
    pub id: WorkerId,
    pub run_state: RunState,
    /// file descriptor of the SCM socket
    pub scm_fd: i32,
}

impl TryFrom<&WorkerSession> for SerializedWorkerSession {
    type Error = UpgradeError;

    fn try_from(worker: &WorkerSession) -> Result<Self, Self::Error> {
        disable_close_on_exec(worker.channel.fd()).map_err(|util_err| {
            UpgradeError::DisableCloexec {
                fd_name: format!("main-to-worker-{}-channel", worker.id),
                util_err,
            }
        })?;

        Ok(Self {
            channel_fd: worker.channel.sock.as_raw_fd(),
            pid: worker.pid,
            id: worker.id,
            run_state: worker.run_state,
            scm_fd: worker.scm_socket.raw_fd(),
        })
    }
}

#[derive(Deserialize, Serialize, Debug)]
pub struct UpgradeData {
    /// file descriptor of the unix command socket
    pub command_socket_fd: i32,
    pub config: Config,
    pub next_client_id: ClientId,
    pub next_session_id: SessionId,
    pub next_task_id: TaskId,
    pub next_worker_id: WorkerId,
    /// JSON serialized workers
    pub workers: Vec<SerializedWorkerSession>,
    pub state: ConfigState,
    /// Boot-generation counter, bumped each time a `MAIN_UPGRADED` re-exec
    /// happens. Stamps every audit line so SOC tooling can disambiguate
    /// post-upgrade sessions from pre-upgrade ones (PIDs reset, but the
    /// audit log keeps generation+session_ulid as the durable correlation
    /// pair). `0` on first boot.
    #[serde(default)]
    pub boot_generation: u32,
}

pub fn upgrade_main(server: &mut Server, client: &mut ClientSession) {
    // Bump the boot generation BEFORE serialising upgrade_data so the
    // re-execed main starts at the new value. The audit line below
    // already reflects the bumped value.
    server.boot_generation = server.boot_generation.saturating_add(1);

    audit_emit_inline(
        server,
        client,
        EventKind::MainUpgraded,
        "main_upgraded",
        "config.main_upgraded",
        format!(
            "executable:{} boot_generation:{}",
            server.executable_path.as_str(),
            server.boot_generation
        ),
        AuditResult::Ok,
        AuditExtras::default(),
    );

    if let Err(err) = server.disable_cloexec_before_upgrade() {
        client.finish_failure(err.to_string());
    }

    client.return_processing("Upgrading the main process...");

    let upgrade_data = server.generate_upgrade_data();

    let (new_main_pid, mut fork_confirmation_channel) =
        match fork_main_into_new_main(server.executable_path.clone(), upgrade_data) {
            Ok(tuple) => tuple,
            Err(fork_error) => {
                client.finish_failure(format!(
                    "Could not start a new main process by forking: {fork_error}"
                ));
                return;
            }
        };

    let received_ok_from_new_process = fork_confirmation_channel.read_message().unwrap_or(false);

    debug!(
        "new main process sent a fork confirmation: {:?}",
        received_ok_from_new_process
    );

    if !received_ok_from_new_process {
        client.finish_failure("Upgrade of main process failed: no feedback from the new main");
    } else {
        client.finish_ok(format!(
            "Upgrade successful, closing main process. New main process has pid {new_main_pid}"
        ));
        server.run_state = ServerState::Stopping;
        // The new master is taking over; the old master must NOT emit
        // `STOPPING=1` after `command_hub.run()` returns — that race
        // would arrive at systemd before the new master's MAINPID=
        // notify and flip the unit to inactive while sozu is still
        // serving traffic (see `bin/src/command/mod.rs` STOPPING gate
        // and `os-build/systemd/sozu.service`).
        server.upgrading = true;
    }
}