mechanics-core 0.1.0

mechanics automation framework (core)
Documentation
use crate::{
    error::MechanicsError, http::EndpointHttpClient, job::MechanicsExecutionLimits,
    runtime::RuntimeInternal,
};
use crossbeam_channel::{Receiver, Sender, bounded, select};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{
    collections::HashMap,
    sync::{
        Arc,
        atomic::{AtomicBool, AtomicUsize, Ordering},
    },
    thread,
    time::Instant,
};

use super::{
    config::MechanicsPoolConfig,
    restart_guard::RestartGuard,
    worker::{PoolMessage, WorkerExit, WorkerHandle},
};

#[derive(Debug)]
pub(crate) struct MechanicsPoolShared {
    tx: Sender<PoolMessage>,
    rx: Receiver<PoolMessage>,
    exit_tx: Sender<WorkerExit>,
    exit_rx: Receiver<WorkerExit>,
    workers: RwLock<HashMap<usize, WorkerHandle>>,
    next_worker_id: AtomicUsize,
    desired_worker_count: usize,
    closed: AtomicBool,
    restart_blocked: AtomicBool,
    restart_guard: Mutex<RestartGuard>,
    execution_limits: MechanicsExecutionLimits,
    default_http_timeout_ms: Option<u64>,
    default_http_response_max_bytes: Option<usize>,
    endpoint_http_client: Arc<dyn EndpointHttpClient>,
    #[cfg(test)]
    pub(crate) force_worker_runtime_init_failure: bool,
}

impl MechanicsPoolShared {
    pub(crate) fn new(
        config: &MechanicsPoolConfig,
        endpoint_http_client: Arc<dyn EndpointHttpClient>,
        tx: Sender<PoolMessage>,
        rx: Receiver<PoolMessage>,
        exit_tx: Sender<WorkerExit>,
        exit_rx: Receiver<WorkerExit>,
    ) -> Self {
        Self {
            tx,
            rx,
            exit_tx,
            exit_rx,
            workers: parking_lot::RwLock::new(HashMap::new()),
            next_worker_id: AtomicUsize::new(0),
            desired_worker_count: config.worker_count(),
            closed: AtomicBool::new(false),
            restart_blocked: AtomicBool::new(false),
            restart_guard: parking_lot::Mutex::new(RestartGuard::new(
                config.restart_window(),
                config.max_restarts_in_window(),
            )),
            execution_limits: config.execution_limits(),
            default_http_timeout_ms: config.default_http_timeout_ms(),
            default_http_response_max_bytes: config.default_http_response_max_bytes(),
            endpoint_http_client,
            #[cfg(test)]
            force_worker_runtime_init_failure: config.force_worker_runtime_init_failure,
        }
    }

    pub(crate) fn workers_read(&self) -> RwLockReadGuard<'_, HashMap<usize, WorkerHandle>> {
        self.workers.read()
    }

    pub(crate) fn workers_write(&self) -> RwLockWriteGuard<'_, HashMap<usize, WorkerHandle>> {
        self.workers.write()
    }

    pub(crate) fn restart_guard_snapshot(&self) -> (usize, usize) {
        let guard = self.restart_guard.lock();
        (
            guard.restart_attempts_in_window(),
            guard.max_restarts_in_window(),
        )
    }

    pub(crate) fn mark_closed(&self) {
        self.closed.store(true, Ordering::Release);
    }

    pub(crate) fn is_closed(&self) -> bool {
        self.closed.load(Ordering::Acquire)
    }

    pub(crate) fn is_restart_blocked(&self) -> bool {
        self.restart_blocked.load(Ordering::Acquire)
    }

    pub(crate) fn set_restart_blocked(&self, blocked: bool) {
        self.restart_blocked.store(blocked, Ordering::Release);
    }

    pub(crate) fn desired_worker_count(&self) -> usize {
        self.desired_worker_count
    }

    pub(crate) fn queue_depth(&self) -> usize {
        self.rx.len()
    }

    pub(crate) fn queue_capacity(&self) -> Option<usize> {
        self.rx.capacity()
    }

    pub(crate) fn job_sender(&self) -> &Sender<PoolMessage> {
        &self.tx
    }

    pub(crate) fn job_receiver(&self) -> &Receiver<PoolMessage> {
        &self.rx
    }

    pub(crate) fn worker_exit_receiver(&self) -> &Receiver<WorkerExit> {
        &self.exit_rx
    }

    pub(crate) fn record_restart_attempt(&self, now: Instant) -> bool {
        let mut guard = self.restart_guard.lock();
        guard.allow_restart(now)
    }

    fn remove_worker_handle(&self, worker_id: usize) -> Option<WorkerHandle> {
        let mut workers = self.workers_write();
        workers.remove(&worker_id)
    }

    fn reap_finished_workers(&self) {
        let finished_ids: Vec<usize> = {
            let workers = self.workers_read();
            workers
                .iter()
                .filter_map(|(id, handle)| handle.is_finished().then_some(*id))
                .collect()
        };
        if finished_ids.is_empty() {
            return;
        }

        let mut finished_handles = Vec::with_capacity(finished_ids.len());
        {
            let mut workers = self.workers_write();
            for id in finished_ids {
                if let Some(handle) = workers.remove(&id) {
                    finished_handles.push(handle);
                }
            }
        }
        for handle in finished_handles {
            handle.join();
        }
    }

    pub(crate) fn spawn_worker(shared: &Arc<Self>) -> Result<usize, MechanicsError> {
        let worker_id = shared.next_worker_id.fetch_add(1, Ordering::Relaxed);

        let rx = shared.rx.clone();
        let exit_tx = shared.exit_tx.clone();
        let (ready_tx, ready_rx) = bounded::<Result<(), MechanicsError>>(1);
        let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
        let endpoint_http_client = Arc::clone(&shared.endpoint_http_client);
        let execution_limits = shared.execution_limits;
        let default_http_timeout_ms = shared.default_http_timeout_ms;
        let default_http_response_max_bytes = shared.default_http_response_max_bytes;
        #[cfg(test)]
        let force_runtime_init_failure = shared.force_worker_runtime_init_failure;

        let handle = thread::Builder::new()
            .name(format!("mechanics-worker-{worker_id}"))
            .spawn(move || {
                let run = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                    #[cfg(test)]
                    if force_runtime_init_failure {
                        let _ = ready_tx.send(Err(MechanicsError::runtime_pool(
                            "forced runtime initialization failure for tests",
                        )));
                        return;
                    }

                    let mut runtime = match RuntimeInternal::new_with_endpoint_http_client(
                        endpoint_http_client,
                    ) {
                        Ok(runtime) => {
                            let _ = ready_tx.send(Ok(()));
                            runtime
                        }
                        Err(err) => {
                            let _ = ready_tx.send(Err(err));
                            return;
                        }
                    };
                    runtime.set_execution_limits(execution_limits);
                    runtime.set_default_endpoint_timeout_ms(default_http_timeout_ms);
                    runtime.set_default_endpoint_response_max_bytes(default_http_response_max_bytes);

                    loop {
                        select! {
                            recv(shutdown_rx) -> _ => {
                                break;
                            }
                            recv(rx) -> msg => {
                                match msg {
                                    Ok(PoolMessage::Run(pool_job)) => {
                                        if pool_job.is_canceled() {
                                            pool_job.send_result(Err(MechanicsError::canceled(
                                                "job timed out before execution",
                                            )));
                                            continue;
                                        }
                                        let reply = pool_job.reply_sender();
                                        let job = pool_job.into_job();
                                        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                                            runtime.run_source(job)
                                        }));
                                        match result {
                                            Ok(result) => {
                                                let _ = reply.send(result);
                                            }
                                            Err(_) => {
                                                let _ = reply.send(Err(MechanicsError::worker_panic(
                                                    "worker panicked while running job",
                                                )));
                                                break;
                                            }
                                        }
                                    }
                                    Err(_) => break,
                                }
                            }
                        }
                    }
                }));

                if run.is_err() {
                    let _ = ready_tx.send(Err(MechanicsError::worker_panic("worker panicked during startup")));
                    let _ = exit_tx.send(WorkerExit::new(worker_id));
                    return;
                }

                let _ = exit_tx.send(WorkerExit::new(worker_id));
            })
            .map_err(|e| {
                MechanicsError::runtime_pool(format!("failed to spawn worker thread: {e}"))
            })?;

        {
            let mut workers = shared.workers_write();
            workers.insert(
                worker_id,
                WorkerHandle::new(handle, shutdown_tx),
            );
        }

        match ready_rx.recv() {
            Ok(Ok(())) => {
                shared.set_restart_blocked(false);
                Ok(worker_id)
            }
            Ok(Err(err)) => {
                if let Some(handle) = shared.remove_worker_handle(worker_id) {
                    handle.join();
                }
                Err(err)
            }
            Err(_) => {
                if let Some(handle) = shared.remove_worker_handle(worker_id) {
                    handle.join();
                }
                Err(MechanicsError::runtime_pool(
                    "worker exited before startup completed",
                ))
            }
        }
    }

    pub(crate) fn live_workers(&self) -> usize {
        self.reap_finished_workers();
        self.workers_read().len()
    }

    pub(crate) fn reconcile_workers(shared: &Arc<Self>) {
        if shared.is_closed() {
            return;
        }

        let live = shared.live_workers();
        let missing = shared.desired_worker_count().saturating_sub(live);
        if missing == 0 {
            shared.set_restart_blocked(false);
            return;
        }

        for _ in 0..missing {
            let now = Instant::now();
            let can_restart = shared.record_restart_attempt(now);
            if !can_restart {
                shared.set_restart_blocked(true);
                return;
            }

            if MechanicsPoolShared::spawn_worker(shared).is_err() {
                shared.set_restart_blocked(true);
                return;
            }
        }
    }
}