Skip to main content

mofa_kernel/message/
mod.rs

1use crate::agent::AgentState;
2use serde::{Deserialize, Serialize};
3
4// 流类型枚举
5#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
6pub enum StreamType {
7    MessageStream, // 消息流 - 异步消息传递
8    DataStream,    // 数据流 - 连续数据传递
9    EventStream,   // 事件流 - 离散事件传递
10    CommandStream, // 命令流 - 控制命令传递
11}
12
13// 智能体通信消息协议
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub enum AgentMessage {
16    TaskRequest {
17        task_id: String,
18        content: String,
19    },
20    TaskResponse {
21        task_id: String,
22        result: String,
23        status: TaskStatus,
24    },
25    StateSync {
26        agent_id: String,
27        state: AgentState,
28    },
29    Event(AgentEvent),
30
31    // 流相关消息
32    StreamMessage {
33        stream_id: String,
34        message: Vec<u8>,
35        sequence: u64,
36    }, // 流消息
37
38    StreamControl {
39        stream_id: String,
40        command: StreamControlCommand,
41        metadata: std::collections::HashMap<String, String>,
42    }, // 流控制消息
43}
44
45// 任务状态枚举
46#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
47pub enum TaskStatus {
48    Success,
49    Failed,
50    Pending,
51}
52
53// Agent 可感知的事件类型
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum AgentEvent {
56    TaskReceived(TaskRequest), // 任务事件
57    TaskPreempted(String),     // 任务被抢占事件(参数:被抢占的任务ID)
58    Shutdown,                  // 框架关闭事件
59    Custom(String, Vec<u8>),   // 自定义事件
60
61    // 流相关事件
62    StreamMessage {
63        stream_id: String,
64        message: Vec<u8>,
65        sequence: u64,
66    }, // 流消息事件
67
68    StreamCreated {
69        stream_id: String,
70        stream_type: StreamType,
71        metadata: std::collections::HashMap<String, String>,
72    }, // 流创建事件
73
74    StreamClosed {
75        stream_id: String,
76        reason: String,
77    }, // 流关闭事件
78
79    StreamSubscription {
80        stream_id: String,
81        subscriber_id: String,
82    }, // 流订阅事件
83
84    StreamUnsubscription {
85        stream_id: String,
86        subscriber_id: String,
87    }, // 流取消订阅事件
88}
89
90// 扩展 TaskRequest,增加优先级和调度元数据
91#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
92pub struct TaskRequest {
93    pub task_id: String,
94    pub content: String,
95    pub priority: TaskPriority,
96    pub deadline: Option<std::time::Duration>, // 任务截止时间
97    pub metadata: std::collections::HashMap<String, String>,
98}
99
100// 任务优先级与调度元数据
101#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
102pub enum TaskPriority {
103    Critical = 0,
104    Highest = 1,
105    High = 2,
106    Medium = 3,
107    Normal = 4,
108    Low = 5,
109}
110
111// 实现 PartialOrd 用于优先级排序(Urgent > High > Medium > Low)
112impl PartialOrd for TaskPriority {
113    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
114        Some(self.cmp(other))
115    }
116}
117
118impl Ord for TaskPriority {
119    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
120        match (self, other) {
121            (TaskPriority::Critical, _) => std::cmp::Ordering::Greater,
122            (_, TaskPriority::Critical) => std::cmp::Ordering::Less,
123            (TaskPriority::High, TaskPriority::Medium | TaskPriority::Low) => {
124                std::cmp::Ordering::Greater
125            }
126            (TaskPriority::Medium, TaskPriority::Low) => std::cmp::Ordering::Greater,
127            (TaskPriority::Low, TaskPriority::Medium | TaskPriority::High) => {
128                std::cmp::Ordering::Less
129            }
130            (TaskPriority::Medium, TaskPriority::High) => std::cmp::Ordering::Less,
131            (a, b) if a == b => std::cmp::Ordering::Equal,
132            _ => unreachable!("All cases should be covered"),
133        }
134    }
135}
136
137// 流控制命令
138#[derive(Debug, Serialize, Deserialize, Clone)]
139pub enum StreamControlCommand {
140    Create(StreamType), // 创建流
141    Close(String),      // 关闭流(原因)
142    Subscribe,          // 订阅流
143    Unsubscribe,        // 取消订阅
144    Pause,              // 暂停流
145    Resume,             // 恢复流
146    Seek(u64),          // 跳转到指定序列位置(仅适用于可重放流)
147}
148
149// 调度状态,用于跟踪任务抢占情况
150#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
151pub enum SchedulingStatus {
152    Pending,
153    Running,
154    Preempted, // 被高优先级任务抢占
155    Completed,
156    Failed,
157}