fast-able 1.20.2

The world's martial arts are fast and unbreakable; 天下武功 唯快不破
Documentation
use std::{sync::Arc, thread};

use core_affinity::CoreId;
use crossbeam::atomic::AtomicCell;

/// Channel type definition based on features
/// Use crossbeam::channel when crossbeam_channel feature is enabled, otherwise use std::sync::mpsc
/// 基于 features 的 channel 类型定义
/// 启用 crossbeam_channel feature 时使用 crossbeam::channel,否则使用 std::sync::mpsc

#[cfg(feature = "crossbeam_channel")]
pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};

#[cfg(not(feature = "crossbeam_channel"))]
pub use std::sync::mpsc::{
    channel as unbounded, sync_channel as bounded, Receiver, TryRecvError,
};

// 为 std::sync::mpsc 定义统一的 Sender 类型
#[cfg(all(not(feature = "crossbeam_channel"), not(feature = "thread_task_bounded")))]
pub use std::sync::mpsc::Sender;

#[cfg(all(not(feature = "crossbeam_channel"), feature = "thread_task_bounded"))]
pub use std::sync::mpsc::SyncSender as Sender;

/// Get default bounded channel capacity
/// Returns 4 times the number of CPU cores, at least 64, at most 1024
/// 获取默认的有界 channel 容量
/// 返回 CPU 核心数的 4 倍,至少为 64,最多为 1024
fn get_default_bounded_capacity() -> usize {
    let cpu_count = num_cpus::get();
    let capacity = (cpu_count * 100).max(128).min(4096);
    capacity
}

/// Unified thread pool executor, supports switching channel implementation via features
/// 统一的线程池执行器,支持通过 features 切换 channel 实现
pub struct TaskExecutor {
    jobs: Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
    _handle: thread::JoinHandle<()>,
    pub count: Arc<AtomicCell<i64>>,
    core: usize,
}

impl std::fmt::Debug for TaskExecutor {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("TaskExecutor")
            .field("_handle", &self._handle)
            .field("count", &self.count)
            .field("core", &self.core)
            .finish()
    }
}

impl TaskExecutor {
    /// 创建新的任务执行器
    /// realtime: 实时内核优先级(1-99,越高越优先),输入 -1 时不开启
    pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
        // 根据 thread_task_bounded feature 选择使用有界或无界 channel
        #[cfg(feature = "thread_task_bounded")]
        let (tx, rx) = {
            let capacity = get_default_bounded_capacity();
            bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity)
        };

        #[cfg(not(feature = "thread_task_bounded"))]
        let (tx, rx) = unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();

        let count = Arc::new(AtomicCell::new(0_i64));
        let task_count = count.clone();

        let _handle = thread::spawn(move || {
            // 绑核和开启实时内核
            super::set_core_affinity_and_realtime(core.id, realtime);
            let core_id = core.id;

            // 在worker线程启动前设置线程级别的panic hook
            let old_hook = std::panic::take_hook();
            std::panic::set_hook(Box::new(move |panic_info| {
                let thread = std::thread::current();
                let thread_name = thread.name().unwrap_or("unnamed");

                // 获取panic消息
                let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
                    s.to_string()
                } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
                    s.clone()
                } else {
                    format!(
                        "Unknown panic payload type: {:?}",
                        panic_info.payload().type_id()
                    )
                };

                // 获取panic位置信息
                let location_info = if let Some(location) = panic_info.location() {
                    format!(
                        "file: '{}', line: {}, column: {}",
                        location.file(),
                        location.line(),
                        location.column()
                    )
                } else {
                    "unknown location".to_string()
                };

                // 输出详细的panic信息
                error!(
                    "PANIC in TaskExecutor worker thread!\n\
                     ┌─ Thread Info ─────────────────────────────────────┐\n\
                     │ Thread Name: {}\n\
                     │ Core ID: {}\n\
                     │ Thread ID: {:?}\n\
                     ├─ Panic Details ──────────────────────────────────┤\n\
                     │ Message: {}\n\
                     │ Location: {}\n\
                     └──────────────────────────────────────────────────┘",
                    thread_name,
                    core_id,
                    thread.id(),
                    panic_message,
                    location_info
                );

                // 调用原来的hook以保持默认行为
                old_hook(panic_info);
            }));

            Self::run_worker_loop(rx, task_count, core_id);
        });

        TaskExecutor {
            jobs: tx,
            _handle,
            count,
            core: core.id,
        }
    }

    /// 创建带自定义容量的任务执行器(仅在启用 thread_task_bounded feature 时有效)
    /// capacity: 有界 channel 的容量
    /// realtime: 实时内核优先级(1-99,越高越优先),输入 -1 时不开启
    #[cfg(feature = "thread_task_bounded")]
    pub fn new_with_capacity(core: CoreId, capacity: usize, realtime: i32) -> TaskExecutor {
        let (tx, rx) = bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity);
        let count = Arc::new(AtomicCell::new(0_i64));
        let task_count = count.clone();

        let _handle = thread::spawn(move || {
            // 绑核和开启实时内核
            super::set_core_affinity_and_realtime(core.id, realtime);
            let core_id = core.id;

            // 在worker线程启动前设置线程级别的panic hook
            let old_hook = std::panic::take_hook();
            std::panic::set_hook(Box::new(move |panic_info| {
                let thread = std::thread::current();
                let thread_name = thread.name().unwrap_or("unnamed");

                // 获取panic消息
                let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
                    s.to_string()
                } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
                    s.clone()
                } else {
                    format!(
                        "Unknown panic payload type: {:?}",
                        panic_info.payload().type_id()
                    )
                };

                // 获取panic位置信息
                let location_info = if let Some(location) = panic_info.location() {
                    format!(
                        "file: '{}', line: {}, column: {}",
                        location.file(),
                        location.line(),
                        location.column()
                    )
                } else {
                    "unknown location".to_string()
                };

                // 输出详细的panic信息
                error!(
                    "PANIC in TaskExecutor worker thread!\n\
                     ┌─ Thread Info ─────────────────────────────────────┐\n\
                     │ Thread Name: {}\n\
                     │ Core ID: {}\n\
                     │ Thread ID: {:?}\n\
                     ├─ Panic Details ──────────────────────────────────┤\n\
                     │ Message: {}\n\
                     │ Location: {}\n\
                     └──────────────────────────────────────────────────┘",
                    thread_name,
                    core_id,
                    thread.id(),
                    panic_message,
                    location_info
                );

                // 调用原来的hook以保持默认行为
                old_hook(panic_info);
            }));

            Self::run_worker_loop(rx, task_count, core_id);
        });

        TaskExecutor {
            jobs: tx,
            _handle,
            count,
            core: core.id,
        }
    }

    /// 工作线程主循环
    fn run_worker_loop(
        rx: Receiver<Box<dyn FnOnce(&usize) + Send + 'static>>,
        task_count: Arc<AtomicCell<i64>>,
        core_id: usize,
    ) {
        #[cfg(feature = "thread_dispatch")]
        {
            let mut empty_count = 0;
            loop {
                match rx.try_recv() {
                    Ok(job) => {
                        job(&core_id);
                        task_count.fetch_sub(1);
                        empty_count = 0;
                    }
                    Err(TryRecvError::Empty) => {
                        empty_count += 1;
                        if empty_count > 1000 {
                            empty_count = 0;
                            // 空闲次数过多时,阻塞等待任务
                            if let Ok(job) = rx.recv() {
                                job(&core_id);
                                task_count.fetch_sub(1);
                            }
                        }
                    }
                    Err(TryRecvError::Disconnected) => {
                        error!("TaskExecutor disconnected: {}", core_id);
                        break;
                    }
                }
            }
        }

        #[cfg(not(feature = "thread_dispatch"))]
        loop {
            if let Ok(job) = rx.try_recv() {
                job(&core_id);
                task_count.fetch_sub(1);
            }
        }
    }

    /// 提交任务到线程池
    #[inline(always)]
    pub fn spawn<F>(&self, f: F)
    where
        F: FnOnce(&usize) + Send + 'static,
    {
        self.count.fetch_add(1);
        
        if let Err(e) = self.jobs.send(Box::new(f)) {
            error!("TaskExecutor send error: {:?}", e);
            // 如果发送失败,直接在当前线程执行
            e.0(&0);
            self.count.fetch_sub(1);
        }
    }

    /// 尝试提交任务到线程池(非阻塞)
    /// 仅在启用 thread_task_bounded feature 时提供此方法
    /// 返回 true 表示成功提交,false 表示队列已满
    #[cfg(all(feature = "thread_task_bounded", feature = "crossbeam_channel"))]
    #[inline(always)]
    pub fn try_spawn<F>(&self, f: F) -> bool
    where
        F: FnOnce(&usize) + Send + 'static,
    {
        match self.jobs.try_send(Box::new(f)) {
            Ok(_) => {
                self.count.fetch_add(1);
                true
            }
            Err(_) => false, // 队列已满或通道已关闭
        }
    }

    /// 尝试提交任务到线程池(非阻塞)- std::sync::mpsc 版本
    /// 仅在启用 thread_task_bounded feature 且使用 std mpsc 时提供此方法
    /// 返回 true 表示成功提交,false 表示队列已满
    #[cfg(all(feature = "thread_task_bounded", not(feature = "crossbeam_channel")))]
    #[inline(always)]
    pub fn try_spawn<F>(&self, f: F) -> bool
    where
        F: FnOnce(&usize) + Send + 'static,
    {
        match self.jobs.try_send(Box::new(f)) {
            Ok(_) => {
                self.count.fetch_add(1);
                true
            }
            Err(_) => false, // 队列已满或通道已关闭
        }
    }
}