Skip to main content

mf_core/actors/
mod.rs

1//! Actor系统模块 - 基于ractor框架的实现
2//!
3//! 本模块使用ractor框架重构ModuForge的核心架构,实现Actor模式的并发设计。
4//!
5//! ## 架构设计
6//!
7//! - **TransactionProcessorActor**: 事务处理Actor,负责处理所有事务逻辑
8//! - **StateActor**: 状态管理Actor,确保状态操作的线程安全
9//! - **EventBusActor**: 事件总线Actor,处理事件的发布和订阅
10//! - **ExtensionManagerActor**: 扩展管理Actor,负责插件系统
11//! - **ForgeActorSystem**: Actor系统管理器,协调所有Actor
12//!
13//! ## 设计原则
14//!
15//! 1. **保持现有逻辑不变**: 所有业务逻辑保持与原实现完全相同
16//! 2. **消息驱动**: 所有组件间通信通过消息传递
17//! 3. **故障隔离**: Actor失败不影响其他Actor
18//! 4. **性能优化**: 利用Actor模式的并发优势
19
20pub mod event_bus;
21pub mod extension_manager;
22pub mod state_actor;
23pub mod system;
24pub mod transaction_processor;
25
26// 重新导出核心类型
27pub use transaction_processor::{TransactionProcessorActor, TransactionMessage};
28pub use state_actor::{StateActor, StateMessage};
29pub use event_bus::{EventBusActor, EventBusMessage};
30pub use extension_manager::{ExtensionManagerActor, ExtensionMessage};
31pub use system::{ForgeActorSystem, ActorSystemConfig};
32
33use ractor::{SpawnErr};
34use std::sync::Arc;
35use thiserror::Error;
36use tokio::sync::oneshot;
37
38/// Actor系统错误类型
39#[derive(Debug, Error)]
40pub enum ActorSystemError {
41    #[error("Actor启动失败: {actor_name} - {source}")]
42    ActorStartupFailed { actor_name: String, source: SpawnErr },
43
44    #[error("Actor通信失败: {message}")]
45    CommunicationFailed { message: String },
46
47    #[error("Actor系统关闭失败: {message}")]
48    ShutdownFailed { message: String },
49
50    #[error("配置错误: {message}")]
51    ConfigurationError { message: String },
52
53    #[error("超时错误: {operation}")]
54    TimeoutError { operation: String },
55
56    #[error("其他错误: {message}")]
57    Other { message: String },
58}
59
60/// Actor系统结果类型
61pub type ActorSystemResult<T> = Result<T, ActorSystemError>;
62
63/// Actor消息包装器
64///
65/// 用于在保持现有API兼容性的同时,支持ractor的消息系统
66#[derive(Debug)]
67pub struct MessageWrapper<T> {
68    pub inner: T,
69    pub reply_to: Option<oneshot::Sender<crate::ForgeResult<()>>>,
70}
71
72impl<T> MessageWrapper<T> {
73    pub fn new(inner: T) -> Self {
74        Self { inner, reply_to: None }
75    }
76
77    pub fn with_reply(
78        inner: T,
79        reply_to: oneshot::Sender<crate::ForgeResult<()>>,
80    ) -> Self {
81        Self { inner, reply_to: Some(reply_to) }
82    }
83
84    pub fn reply(
85        self,
86        result: crate::ForgeResult<()>,
87    ) {
88        if let Some(sender) = self.reply_to {
89            let _ = sender.send(result);
90        }
91    }
92}
93
94// 移除手动Message实现 - ractor 0.15.8 自动为满足条件的类型实现Message
95// 只需要确保消息类型满足: Debug + Send + 'static
96
97/// Actor管理器trait
98///
99/// 定义了Actor的生命周期管理接口
100#[async_trait::async_trait]
101pub trait ActorManager {
102    type Config;
103    type Handle;
104
105    /// 启动Actor
106    async fn start(config: Self::Config) -> ActorSystemResult<Self::Handle>;
107
108    /// 停止Actor
109    async fn stop(handle: Self::Handle) -> ActorSystemResult<()>;
110
111    /// 检查Actor是否健康
112    async fn health_check(handle: &Self::Handle) -> bool;
113}
114
115/// Actor指标收集
116pub struct ActorMetrics {
117    /// 消息处理计数
118    pub messages_processed: Arc<std::sync::atomic::AtomicU64>,
119    /// 错误计数
120    pub errors_count: Arc<std::sync::atomic::AtomicU64>,
121    /// 平均处理时间
122    pub avg_processing_time: Arc<std::sync::atomic::AtomicU64>,
123}
124
125impl Default for ActorMetrics {
126    fn default() -> Self {
127        Self {
128            messages_processed: Arc::new(std::sync::atomic::AtomicU64::new(0)),
129            errors_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
130            avg_processing_time: Arc::new(std::sync::atomic::AtomicU64::new(0)),
131        }
132    }
133}
134
135impl ActorMetrics {
136    pub fn increment_messages(&self) {
137        self.messages_processed
138            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
139    }
140
141    pub fn increment_errors(&self) {
142        self.errors_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
143    }
144
145    pub fn update_processing_time(
146        &self,
147        duration_ms: u64,
148    ) {
149        self.avg_processing_time
150            .store(duration_ms, std::sync::atomic::Ordering::Relaxed);
151    }
152}