plugin_interfaces/message/
stream_message.rs

1use crate::{log_info, PluginHandler};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7/// 流式传输错误类型
8#[derive(Debug, Clone)]
9pub enum StreamError {
10    SendFailed,
11    InvalidStreamId,
12    StreamNotFound,
13    StreamAlreadyEnded,
14    InvalidState,
15}
16
17impl std::fmt::Display for StreamError {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        match self {
20            StreamError::SendFailed => write!(f, "Failed to send message to frontend"),
21            StreamError::InvalidStreamId => write!(f, "Invalid stream ID"),
22            StreamError::StreamNotFound => write!(f, "Stream not found"),
23            StreamError::StreamAlreadyEnded => write!(f, "Stream already ended"),
24            StreamError::InvalidState => write!(f, "Invalid stream state"),
25        }
26    }
27}
28
29impl std::error::Error for StreamError {}
30
31/// 流状态
32#[derive(Debug, Clone, PartialEq)]
33pub enum StreamStatus {
34    Active,
35    Paused,
36    Finalizing,
37    Completed,
38    Error,
39    Cancelled,
40}
41
42/// 流信息
43#[derive(Debug, Clone)]
44pub struct StreamInfo {
45    pub id: String,
46    pub plugin_id: String,
47    pub message_type: String,
48    pub status: StreamStatus,
49    pub created_at: u64,
50}
51
52/// 流式消息基础结构
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct StreamMessageWrapper {
55    pub r#type: String,
56    pub plugin_id: String,
57    pub instance_id: String,
58    pub data: StreamMessageData,
59    pub timestamp: u64,
60}
61
62/// 流式消息数据联合体
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(untagged)]
65pub enum StreamMessageData {
66    Start(StreamStartData),
67    Data(StreamDataData),
68    End(StreamEndData),
69    Control(StreamControlData),
70}
71
72/// 流开始消息数据
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StreamStartData {
75    pub stream_id: String,
76    pub message_type: String,
77}
78
79/// 流数据消息数据
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct StreamDataData {
82    pub stream_id: String,
83    pub chunk: String,
84    pub is_final: bool,
85}
86
87/// 流结束消息数据
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct StreamEndData {
90    pub stream_id: String,
91    pub success: bool,
92    pub error: Option<String>,
93}
94
95/// 流控制消息数据
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct StreamControlData {
98    pub stream_id: String,
99}
100
101/// 全局流管理器
102static STREAM_MANAGER: std::sync::LazyLock<Arc<Mutex<HashMap<String, StreamInfo>>>> =
103    std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
104
105/// 生成唯一的流ID
106fn generate_stream_id() -> String {
107    let timestamp = SystemTime::now()
108        .duration_since(UNIX_EPOCH)
109        .unwrap()
110        .as_nanos();
111    format!("stream_{}", timestamp)
112}
113
114/// 发送流式消息到前端
115fn send_stream_message_to_frontend(
116    plugin_id: &str,
117    instance_id: &str,
118    message_type: &str,
119    data: StreamMessageData,
120    plugin_ctx: &crate::metadata::PluginInstanceContext,
121) -> bool {
122    let wrapper = StreamMessageWrapper {
123        r#type: message_type.to_string(),
124        plugin_id: plugin_id.to_string(),
125        instance_id: instance_id.to_string(),
126        data,
127        timestamp: SystemTime::now()
128            .duration_since(UNIX_EPOCH)
129            .unwrap()
130            .as_millis() as u64,
131    };
132
133    match serde_json::to_string(&wrapper) {
134        Ok(payload) => plugin_ctx.send_to_frontend("plugin-stream", &payload),
135        Err(_) => false,
136    }
137}
138
139/// 插件流式消息发送器
140/// 重新设计以支持上下文传递模式
141pub trait PluginStreamMessage {
142    /// 开始流式传输,返回流ID
143    fn send_message_stream_start(
144        &self,
145        plugin_ctx: &crate::metadata::PluginInstanceContext,
146    ) -> Result<String, StreamError>;
147
148    /// 发送流式数据块
149    fn send_message_stream(
150        &self,
151        stream_id: &str,
152        chunk: &str,
153        is_final: bool,
154        plugin_ctx: &crate::metadata::PluginInstanceContext,
155    ) -> Result<(), StreamError>;
156
157    /// 结束流式传输
158    fn send_message_stream_end(
159        &self,
160        stream_id: &str,
161        success: bool,
162        error_msg: Option<&str>,
163        plugin_ctx: &crate::metadata::PluginInstanceContext,
164    ) -> Result<(), StreamError>;
165
166    /// 暂停流式传输
167    fn send_message_stream_pause(
168        &self,
169        stream_id: &str,
170        plugin_ctx: &crate::metadata::PluginInstanceContext,
171    ) -> Result<(), StreamError>;
172
173    /// 恢复流式传输
174    fn send_message_stream_resume(
175        &self,
176        stream_id: &str,
177        plugin_ctx: &crate::metadata::PluginInstanceContext,
178    ) -> Result<(), StreamError>;
179
180    /// 取消流式传输
181    fn send_message_stream_cancel(
182        &self,
183        stream_id: &str,
184        plugin_ctx: &crate::metadata::PluginInstanceContext,
185    ) -> Result<(), StreamError>;
186
187    /// 获取流状态
188    fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus>;
189
190    /// 列出活跃的流
191    fn list_active_streams(
192        &self,
193        plugin_ctx: &crate::metadata::PluginInstanceContext,
194    ) -> Vec<String>;
195
196    /// 批量发送流式数据
197    fn send_message_stream_batch(
198        &self,
199        stream_id: &str,
200        chunks: &[&str],
201        plugin_ctx: &crate::metadata::PluginInstanceContext,
202    ) -> Result<(), StreamError>;
203}
204
205impl<T: PluginHandler> PluginStreamMessage for T {
206    fn send_message_stream_start(
207        &self,
208        plugin_ctx: &crate::metadata::PluginInstanceContext,
209    ) -> Result<String, StreamError> {
210        log_info!("Starting stream with context: {:?}", plugin_ctx);
211        let stream_id = generate_stream_id();
212        let plugin_metadata = self.get_metadata(plugin_ctx);
213        let plugin_id = &plugin_metadata.id;
214        let instance_id = plugin_metadata
215            .instance_id
216            .as_ref()
217            .unwrap_or(&plugin_metadata.id);
218
219        log_info!(
220            "Starting stream: {} {} {}",
221            stream_id,
222            plugin_id,
223            instance_id
224        );
225
226        let data = StreamMessageData::Start(StreamStartData {
227            stream_id: stream_id.clone(),
228            message_type: "stream_start".to_string(),
229        });
230
231        if send_stream_message_to_frontend(plugin_id, instance_id, "stream_start", data, plugin_ctx)
232        {
233            log_info!("Stream started successfully");
234            // 记录流信息
235            if let Ok(mut manager) = STREAM_MANAGER.lock() {
236                let stream_info = StreamInfo {
237                    id: stream_id.clone(),
238                    plugin_id: plugin_id.clone(),
239                    message_type: "plugin_stream".to_string(),
240                    status: StreamStatus::Active,
241                    created_at: SystemTime::now()
242                        .duration_since(UNIX_EPOCH)
243                        .unwrap()
244                        .as_secs(),
245                };
246                manager.insert(stream_id.clone(), stream_info);
247            }
248            Ok(stream_id)
249        } else {
250            Err(StreamError::SendFailed)
251        }
252    }
253
254    fn send_message_stream(
255        &self,
256        stream_id: &str,
257        chunk: &str,
258        is_final: bool,
259        plugin_ctx: &crate::metadata::PluginInstanceContext,
260    ) -> Result<(), StreamError> {
261        // 检查流是否存在且状态有效
262        {
263            let manager = STREAM_MANAGER
264                .lock()
265                .map_err(|_| StreamError::InvalidState)?;
266            match manager.get(stream_id) {
267                Some(stream_info) => match stream_info.status {
268                    StreamStatus::Active | StreamStatus::Finalizing => {}
269                    StreamStatus::Paused => return Err(StreamError::InvalidState),
270                    StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
271                        return Err(StreamError::StreamAlreadyEnded);
272                    }
273                },
274                None => return Err(StreamError::StreamNotFound),
275            }
276        }
277
278        let plugin_metadata = self.get_metadata(plugin_ctx);
279        let plugin_id = &plugin_metadata.id;
280        let instance_id = plugin_metadata
281            .instance_id
282            .as_ref()
283            .unwrap_or(&plugin_metadata.id);
284        let data = StreamMessageData::Data(StreamDataData {
285            stream_id: stream_id.to_string(),
286            chunk: chunk.to_string(),
287            is_final,
288        });
289
290        if send_stream_message_to_frontend(plugin_id, instance_id, "stream_data", data, plugin_ctx)
291        {
292            // 更新流状态
293            if is_final {
294                if let Ok(mut manager) = STREAM_MANAGER.lock() {
295                    if let Some(stream_info) = manager.get_mut(stream_id) {
296                        stream_info.status = StreamStatus::Finalizing;
297                    }
298                }
299            }
300            Ok(())
301        } else {
302            Err(StreamError::SendFailed)
303        }
304    }
305
306    fn send_message_stream_end(
307        &self,
308        stream_id: &str,
309        success: bool,
310        error_msg: Option<&str>,
311        plugin_ctx: &crate::metadata::PluginInstanceContext,
312    ) -> Result<(), StreamError> {
313        // 检查流是否存在
314        {
315            let manager = STREAM_MANAGER
316                .lock()
317                .map_err(|_| StreamError::InvalidState)?;
318            if !manager.contains_key(stream_id) {
319                return Err(StreamError::StreamNotFound);
320            }
321        }
322
323        let plugin_metadata = self.get_metadata(plugin_ctx);
324        let plugin_id = &plugin_metadata.id;
325        let instance_id = plugin_metadata
326            .instance_id
327            .as_ref()
328            .unwrap_or(&plugin_metadata.id);
329        let data = StreamMessageData::End(StreamEndData {
330            stream_id: stream_id.to_string(),
331            success,
332            error: error_msg.map(|s| s.to_string()),
333        });
334
335        if send_stream_message_to_frontend(plugin_id, instance_id, "stream_end", data, plugin_ctx) {
336            // 更新流状态
337            if let Ok(mut manager) = STREAM_MANAGER.lock() {
338                if let Some(stream_info) = manager.get_mut(stream_id) {
339                    stream_info.status = if success {
340                        StreamStatus::Completed
341                    } else {
342                        StreamStatus::Error
343                    };
344                }
345            }
346            Ok(())
347        } else {
348            Err(StreamError::SendFailed)
349        }
350    }
351
352    fn send_message_stream_pause(
353        &self,
354        stream_id: &str,
355        plugin_ctx: &crate::metadata::PluginInstanceContext,
356    ) -> Result<(), StreamError> {
357        let mut manager = STREAM_MANAGER
358            .lock()
359            .map_err(|_| StreamError::InvalidState)?;
360        match manager.get_mut(stream_id) {
361            Some(stream_info) => {
362                if stream_info.status == StreamStatus::Active {
363                    stream_info.status = StreamStatus::Paused;
364
365                    let plugin_metadata = self.get_metadata(plugin_ctx);
366                    let plugin_id = &plugin_metadata.id;
367                    let instance_id = plugin_metadata
368                        .instance_id
369                        .as_ref()
370                        .unwrap_or(&plugin_metadata.id);
371                    let data = StreamMessageData::Control(StreamControlData {
372                        stream_id: stream_id.to_string(),
373                    });
374
375                    if send_stream_message_to_frontend(
376                        plugin_id,
377                        instance_id,
378                        "stream_pause",
379                        data,
380                        plugin_ctx,
381                    ) {
382                        Ok(())
383                    } else {
384                        // 回滚状态
385                        stream_info.status = StreamStatus::Active;
386                        Err(StreamError::SendFailed)
387                    }
388                } else {
389                    Err(StreamError::InvalidState)
390                }
391            }
392            None => Err(StreamError::StreamNotFound),
393        }
394    }
395
396    fn send_message_stream_resume(
397        &self,
398        stream_id: &str,
399        plugin_ctx: &crate::metadata::PluginInstanceContext,
400    ) -> Result<(), StreamError> {
401        let mut manager = STREAM_MANAGER
402            .lock()
403            .map_err(|_| StreamError::InvalidState)?;
404        match manager.get_mut(stream_id) {
405            Some(stream_info) => {
406                if stream_info.status == StreamStatus::Paused {
407                    stream_info.status = StreamStatus::Active;
408
409                    let plugin_metadata = self.get_metadata(plugin_ctx);
410                    let plugin_id = &plugin_metadata.id;
411                    let instance_id = plugin_metadata
412                        .instance_id
413                        .as_ref()
414                        .unwrap_or(&plugin_metadata.id);
415                    let data = StreamMessageData::Control(StreamControlData {
416                        stream_id: stream_id.to_string(),
417                    });
418
419                    if send_stream_message_to_frontend(
420                        plugin_id,
421                        instance_id,
422                        "stream_resume",
423                        data,
424                        plugin_ctx,
425                    ) {
426                        Ok(())
427                    } else {
428                        // 回滚状态
429                        stream_info.status = StreamStatus::Paused;
430                        Err(StreamError::SendFailed)
431                    }
432                } else {
433                    Err(StreamError::InvalidState)
434                }
435            }
436            None => Err(StreamError::StreamNotFound),
437        }
438    }
439
440    fn send_message_stream_cancel(
441        &self,
442        stream_id: &str,
443        plugin_ctx: &crate::metadata::PluginInstanceContext,
444    ) -> Result<(), StreamError> {
445        let mut manager = STREAM_MANAGER
446            .lock()
447            .map_err(|_| StreamError::InvalidState)?;
448        match manager.get_mut(stream_id) {
449            Some(stream_info) => match stream_info.status {
450                StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing => {
451                    stream_info.status = StreamStatus::Cancelled;
452
453                    let plugin_metadata = self.get_metadata(plugin_ctx);
454                    let plugin_id = &plugin_metadata.id;
455                    let instance_id = plugin_metadata
456                        .instance_id
457                        .as_ref()
458                        .unwrap_or(&plugin_metadata.id);
459                    let data = StreamMessageData::Control(StreamControlData {
460                        stream_id: stream_id.to_string(),
461                    });
462
463                    if send_stream_message_to_frontend(
464                        plugin_id,
465                        instance_id,
466                        "stream_cancel",
467                        data,
468                        plugin_ctx,
469                    ) {
470                        Ok(())
471                    } else {
472                        Err(StreamError::SendFailed)
473                    }
474                }
475                _ => Err(StreamError::InvalidState),
476            },
477            None => Err(StreamError::StreamNotFound),
478        }
479    }
480
481    fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
482        if let Ok(manager) = STREAM_MANAGER.lock() {
483            manager.get(stream_id).map(|info| info.status.clone())
484        } else {
485            None
486        }
487    }
488
489    fn list_active_streams(
490        &self,
491        plugin_ctx: &crate::metadata::PluginInstanceContext,
492    ) -> Vec<String> {
493        if let Ok(manager) = STREAM_MANAGER.lock() {
494            let plugin_metadata = self.get_metadata(plugin_ctx);
495            let plugin_id = plugin_metadata
496                .instance_id
497                .as_ref()
498                .unwrap_or(&plugin_metadata.id)
499                .clone();
500            manager
501                .iter()
502                .filter(|(_, info)| {
503                    info.plugin_id == plugin_id
504                        && matches!(
505                            info.status,
506                            StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing
507                        )
508                })
509                .map(|(id, _)| id.clone())
510                .collect()
511        } else {
512            Vec::new()
513        }
514    }
515
516    fn send_message_stream_batch(
517        &self,
518        stream_id: &str,
519        chunks: &[&str],
520        plugin_ctx: &crate::metadata::PluginInstanceContext,
521    ) -> Result<(), StreamError> {
522        // 检查流是否存在且状态有效
523        {
524            let manager = STREAM_MANAGER
525                .lock()
526                .map_err(|_| StreamError::InvalidState)?;
527            match manager.get(stream_id) {
528                Some(stream_info) => match stream_info.status {
529                    StreamStatus::Active | StreamStatus::Finalizing => {}
530                    StreamStatus::Paused => return Err(StreamError::InvalidState),
531                    StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
532                        return Err(StreamError::StreamAlreadyEnded);
533                    }
534                },
535                None => return Err(StreamError::StreamNotFound),
536            }
537        }
538
539        let plugin_metadata = self.get_metadata(plugin_ctx);
540        let plugin_id = &plugin_metadata.id;
541        let instance_id = plugin_metadata
542            .instance_id
543            .as_ref()
544            .unwrap_or(&plugin_metadata.id);
545
546        for (i, chunk) in chunks.iter().enumerate() {
547            let is_final = i == chunks.len() - 1;
548            let data = StreamMessageData::Data(StreamDataData {
549                stream_id: stream_id.to_string(),
550                chunk: chunk.to_string(),
551                is_final,
552            });
553
554            if !send_stream_message_to_frontend(
555                plugin_id,
556                instance_id,
557                "stream_data",
558                data,
559                plugin_ctx,
560            ) {
561                return Err(StreamError::SendFailed);
562            }
563        }
564
565        // 更新流状态
566        if !chunks.is_empty() {
567            if let Ok(mut manager) = STREAM_MANAGER.lock() {
568                if let Some(stream_info) = manager.get_mut(stream_id) {
569                    stream_info.status = StreamStatus::Finalizing;
570                }
571            }
572        }
573
574        Ok(())
575    }
576}