plugin_interfaces/message/
stream_message.rs

1use crate::{log_info, send_to_frontend, PluginHandler};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7/// 流式传输错误类型
8#[derive(Debug, Clone)]
9pub enum StreamError {
10    SendFailed,
11    InvalidStreamId,
12    StreamNotFound,
13    StreamAlreadyEnded,
14    InvalidState,
15}
16
17impl std::fmt::Display for StreamError {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        match self {
20            StreamError::SendFailed => write!(f, "Failed to send message to frontend"),
21            StreamError::InvalidStreamId => write!(f, "Invalid stream ID"),
22            StreamError::StreamNotFound => write!(f, "Stream not found"),
23            StreamError::StreamAlreadyEnded => write!(f, "Stream already ended"),
24            StreamError::InvalidState => write!(f, "Invalid stream state"),
25        }
26    }
27}
28
29impl std::error::Error for StreamError {}
30
31/// 流状态
32#[derive(Debug, Clone, PartialEq)]
33pub enum StreamStatus {
34    Active,
35    Paused,
36    Finalizing,
37    Completed,
38    Error,
39    Cancelled,
40}
41
42/// 流信息
43#[derive(Debug, Clone)]
44pub struct StreamInfo {
45    pub id: String,
46    pub plugin_id: String,
47    pub stream_type: String,
48    pub status: StreamStatus,
49    pub created_at: u64,
50    pub metadata: Option<String>,
51}
52
53/// 流式消息基础结构
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct StreamMessageWrapper {
56    pub r#type: String,
57    pub plugin_id: String,
58    pub data: StreamMessageData,
59    pub timestamp: u64,
60}
61
62/// 流式消息数据联合体
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(untagged)]
65pub enum StreamMessageData {
66    Start(StreamStartData),
67    Data(StreamDataData),
68    End(StreamEndData),
69    Control(StreamControlData),
70}
71
72/// 流开始消息数据
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StreamStartData {
75    pub stream_id: String,
76    pub stream_type: String,
77    pub metadata: Option<String>,
78}
79
80/// 流数据消息数据
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct StreamDataData {
83    pub stream_id: String,
84    pub chunk: String,
85    pub is_final: bool,
86}
87
88/// 流结束消息数据
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct StreamEndData {
91    pub stream_id: String,
92    pub success: bool,
93    pub error: Option<String>,
94}
95
96/// 流控制消息数据
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StreamControlData {
99    pub stream_id: String,
100}
101
102/// 全局流管理器
103static STREAM_MANAGER: std::sync::LazyLock<Arc<Mutex<HashMap<String, StreamInfo>>>> =
104    std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
105
106/// 生成唯一的流ID
107fn generate_stream_id() -> String {
108    let timestamp = SystemTime::now()
109        .duration_since(UNIX_EPOCH)
110        .unwrap()
111        .as_nanos();
112    format!("stream_{}", timestamp)
113}
114
115/// 发送流式消息到前端
116fn send_stream_message_to_frontend(
117    plugin_id: &str,
118    message_type: &str,
119    data: StreamMessageData,
120) -> bool {
121    let wrapper = StreamMessageWrapper {
122        r#type: message_type.to_string(),
123        plugin_id: plugin_id.to_string(),
124        data,
125        timestamp: SystemTime::now()
126            .duration_since(UNIX_EPOCH)
127            .unwrap()
128            .as_millis() as u64,
129    };
130
131    match serde_json::to_string(&wrapper) {
132        Ok(payload) => send_to_frontend("plugin-stream", &payload),
133        Err(_) => false,
134    }
135}
136
137/// 插件流式消息发送器
138pub trait PluginStreamMessage {
139    /// 开始流式传输,返回流ID
140    fn send_message_stream_start(
141        &self,
142        stream_type: &str,
143        metadata: Option<&str>,
144    ) -> Result<String, StreamError>;
145
146    /// 发送流式数据块
147    fn send_message_stream(
148        &self,
149        stream_id: &str,
150        chunk: &str,
151        is_final: bool,
152    ) -> Result<(), StreamError>;
153
154    /// 结束流式传输
155    fn send_message_stream_end(
156        &self,
157        stream_id: &str,
158        success: bool,
159        error_msg: Option<&str>,
160    ) -> Result<(), StreamError>;
161
162    /// 暂停流式传输
163    fn send_message_stream_pause(&self, stream_id: &str) -> Result<(), StreamError>;
164
165    /// 恢复流式传输
166    fn send_message_stream_resume(&self, stream_id: &str) -> Result<(), StreamError>;
167
168    /// 取消流式传输
169    fn send_message_stream_cancel(&self, stream_id: &str) -> Result<(), StreamError>;
170
171    /// 获取流状态
172    fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus>;
173
174    /// 列出活跃的流
175    fn list_active_streams(&self) -> Vec<String>;
176
177    /// 批量发送流式数据
178    fn send_message_stream_batch(
179        &self,
180        stream_id: &str,
181        chunks: &[&str],
182    ) -> Result<(), StreamError>;
183}
184
185impl<T: PluginHandler> PluginStreamMessage for T {
186    fn send_message_stream_start(
187        &self,
188        stream_type: &str,
189        metadata: Option<&str>,
190    ) -> Result<String, StreamError> {
191        let stream_id = generate_stream_id();
192        let plugin_id = self.get_metadata().id.clone();
193
194        log_info!("Starting stream: {} {}", stream_id, plugin_id);
195
196        let data = StreamMessageData::Start(StreamStartData {
197            stream_id: stream_id.clone(),
198            stream_type: stream_type.to_string(),
199            metadata: metadata.map(|s| s.to_string()),
200        });
201
202        if send_stream_message_to_frontend(&plugin_id, "stream_start", data) {
203            // 记录流信息
204            if let Ok(mut manager) = STREAM_MANAGER.lock() {
205                let stream_info = StreamInfo {
206                    id: stream_id.clone(),
207                    plugin_id: plugin_id.clone(),
208                    stream_type: stream_type.to_string(),
209                    status: StreamStatus::Active,
210                    created_at: SystemTime::now()
211                        .duration_since(UNIX_EPOCH)
212                        .unwrap()
213                        .as_secs(),
214                    metadata: metadata.map(|s| s.to_string()),
215                };
216                manager.insert(stream_id.clone(), stream_info);
217            }
218            Ok(stream_id)
219        } else {
220            Err(StreamError::SendFailed)
221        }
222    }
223
224    fn send_message_stream(
225        &self,
226        stream_id: &str,
227        chunk: &str,
228        is_final: bool,
229    ) -> Result<(), StreamError> {
230        // 检查流是否存在且状态有效
231        {
232            let manager = STREAM_MANAGER
233                .lock()
234                .map_err(|_| StreamError::InvalidState)?;
235            match manager.get(stream_id) {
236                Some(stream_info) => match stream_info.status {
237                    StreamStatus::Active | StreamStatus::Finalizing => {}
238                    StreamStatus::Paused => return Err(StreamError::InvalidState),
239                    StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
240                        return Err(StreamError::StreamAlreadyEnded);
241                    }
242                },
243                None => return Err(StreamError::StreamNotFound),
244            }
245        }
246
247        let plugin_id = self.get_metadata().id.clone();
248        let data = StreamMessageData::Data(StreamDataData {
249            stream_id: stream_id.to_string(),
250            chunk: chunk.to_string(),
251            is_final,
252        });
253
254        if send_stream_message_to_frontend(&plugin_id, "stream_data", data) {
255            // 更新流状态
256            if is_final {
257                if let Ok(mut manager) = STREAM_MANAGER.lock() {
258                    if let Some(stream_info) = manager.get_mut(stream_id) {
259                        stream_info.status = StreamStatus::Finalizing;
260                    }
261                }
262            }
263            Ok(())
264        } else {
265            Err(StreamError::SendFailed)
266        }
267    }
268
269    fn send_message_stream_end(
270        &self,
271        stream_id: &str,
272        success: bool,
273        error_msg: Option<&str>,
274    ) -> Result<(), StreamError> {
275        // 检查流是否存在
276        {
277            let manager = STREAM_MANAGER
278                .lock()
279                .map_err(|_| StreamError::InvalidState)?;
280            if !manager.contains_key(stream_id) {
281                return Err(StreamError::StreamNotFound);
282            }
283        }
284
285        let plugin_id = self.get_metadata().id.clone();
286        let data = StreamMessageData::End(StreamEndData {
287            stream_id: stream_id.to_string(),
288            success,
289            error: error_msg.map(|s| s.to_string()),
290        });
291
292        if send_stream_message_to_frontend(&plugin_id, "stream_end", data) {
293            // 更新流状态
294            if let Ok(mut manager) = STREAM_MANAGER.lock() {
295                if let Some(stream_info) = manager.get_mut(stream_id) {
296                    stream_info.status = if success {
297                        StreamStatus::Completed
298                    } else {
299                        StreamStatus::Error
300                    };
301                }
302            }
303            Ok(())
304        } else {
305            Err(StreamError::SendFailed)
306        }
307    }
308
309    fn send_message_stream_pause(&self, stream_id: &str) -> Result<(), StreamError> {
310        let mut manager = STREAM_MANAGER
311            .lock()
312            .map_err(|_| StreamError::InvalidState)?;
313        match manager.get_mut(stream_id) {
314            Some(stream_info) => {
315                if stream_info.status == StreamStatus::Active {
316                    stream_info.status = StreamStatus::Paused;
317
318                    let plugin_id = self.get_metadata().id.clone();
319                    let data = StreamMessageData::Control(StreamControlData {
320                        stream_id: stream_id.to_string(),
321                    });
322
323                    if send_stream_message_to_frontend(&plugin_id, "stream_pause", data) {
324                        Ok(())
325                    } else {
326                        // 回滚状态
327                        stream_info.status = StreamStatus::Active;
328                        Err(StreamError::SendFailed)
329                    }
330                } else {
331                    Err(StreamError::InvalidState)
332                }
333            }
334            None => Err(StreamError::StreamNotFound),
335        }
336    }
337
338    fn send_message_stream_resume(&self, stream_id: &str) -> Result<(), StreamError> {
339        let mut manager = STREAM_MANAGER
340            .lock()
341            .map_err(|_| StreamError::InvalidState)?;
342        match manager.get_mut(stream_id) {
343            Some(stream_info) => {
344                if stream_info.status == StreamStatus::Paused {
345                    stream_info.status = StreamStatus::Active;
346
347                    let plugin_id = self.get_metadata().id.clone();
348                    let data = StreamMessageData::Control(StreamControlData {
349                        stream_id: stream_id.to_string(),
350                    });
351
352                    if send_stream_message_to_frontend(&plugin_id, "stream_resume", data) {
353                        Ok(())
354                    } else {
355                        // 回滚状态
356                        stream_info.status = StreamStatus::Paused;
357                        Err(StreamError::SendFailed)
358                    }
359                } else {
360                    Err(StreamError::InvalidState)
361                }
362            }
363            None => Err(StreamError::StreamNotFound),
364        }
365    }
366
367    fn send_message_stream_cancel(&self, stream_id: &str) -> Result<(), StreamError> {
368        let mut manager = STREAM_MANAGER
369            .lock()
370            .map_err(|_| StreamError::InvalidState)?;
371        match manager.get_mut(stream_id) {
372            Some(stream_info) => match stream_info.status {
373                StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing => {
374                    stream_info.status = StreamStatus::Cancelled;
375
376                    let plugin_id = self.get_metadata().id.clone();
377                    let data = StreamMessageData::Control(StreamControlData {
378                        stream_id: stream_id.to_string(),
379                    });
380
381                    if send_stream_message_to_frontend(&plugin_id, "stream_cancel", data) {
382                        Ok(())
383                    } else {
384                        Err(StreamError::SendFailed)
385                    }
386                }
387                _ => Err(StreamError::InvalidState),
388            },
389            None => Err(StreamError::StreamNotFound),
390        }
391    }
392
393    fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
394        if let Ok(manager) = STREAM_MANAGER.lock() {
395            manager.get(stream_id).map(|info| info.status.clone())
396        } else {
397            None
398        }
399    }
400
401    fn list_active_streams(&self) -> Vec<String> {
402        if let Ok(manager) = STREAM_MANAGER.lock() {
403            let plugin_id = self.get_metadata().id.clone();
404            manager
405                .iter()
406                .filter(|(_, info)| {
407                    info.plugin_id == plugin_id
408                        && matches!(
409                            info.status,
410                            StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing
411                        )
412                })
413                .map(|(id, _)| id.clone())
414                .collect()
415        } else {
416            Vec::new()
417        }
418    }
419
420    fn send_message_stream_batch(
421        &self,
422        stream_id: &str,
423        chunks: &[&str],
424    ) -> Result<(), StreamError> {
425        // 检查流是否存在且状态有效
426        {
427            let manager = STREAM_MANAGER
428                .lock()
429                .map_err(|_| StreamError::InvalidState)?;
430            match manager.get(stream_id) {
431                Some(stream_info) => match stream_info.status {
432                    StreamStatus::Active | StreamStatus::Finalizing => {}
433                    StreamStatus::Paused => return Err(StreamError::InvalidState),
434                    StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
435                        return Err(StreamError::StreamAlreadyEnded);
436                    }
437                },
438                None => return Err(StreamError::StreamNotFound),
439            }
440        }
441
442        let plugin_id = self.get_metadata().id.clone();
443
444        for (i, chunk) in chunks.iter().enumerate() {
445            let is_final = i == chunks.len() - 1;
446            let data = StreamMessageData::Data(StreamDataData {
447                stream_id: stream_id.to_string(),
448                chunk: chunk.to_string(),
449                is_final,
450            });
451
452            if !send_stream_message_to_frontend(&plugin_id, "stream_data", data) {
453                return Err(StreamError::SendFailed);
454            }
455        }
456
457        // 更新流状态
458        if !chunks.is_empty() {
459            if let Ok(mut manager) = STREAM_MANAGER.lock() {
460                if let Some(stream_info) = manager.get_mut(stream_id) {
461                    stream_info.status = StreamStatus::Finalizing;
462                }
463            }
464        }
465
466        Ok(())
467    }
468}