1use crate::{log_info, PluginHandler};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7#[derive(Debug, Clone)]
9pub enum StreamError {
10 SendFailed,
11 InvalidStreamId,
12 StreamNotFound,
13 StreamAlreadyEnded,
14 InvalidState,
15}
16
17impl std::fmt::Display for StreamError {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 match self {
20 StreamError::SendFailed => write!(f, "Failed to send message to frontend"),
21 StreamError::InvalidStreamId => write!(f, "Invalid stream ID"),
22 StreamError::StreamNotFound => write!(f, "Stream not found"),
23 StreamError::StreamAlreadyEnded => write!(f, "Stream already ended"),
24 StreamError::InvalidState => write!(f, "Invalid stream state"),
25 }
26 }
27}
28
29impl std::error::Error for StreamError {}
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum StreamStatus {
34 Active,
35 Paused,
36 Finalizing,
37 Completed,
38 Error,
39 Cancelled,
40}
41
42#[derive(Debug, Clone)]
44pub struct StreamInfo {
45 pub id: String,
46 pub plugin_id: String,
47 pub message_type: String,
48 pub status: StreamStatus,
49 pub created_at: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct StreamMessageWrapper {
55 pub r#type: String,
56 pub plugin_id: String,
57 pub instance_id: String,
58 pub data: StreamMessageData,
59 pub timestamp: u64,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(untagged)]
65pub enum StreamMessageData {
66 Start(StreamStartData),
67 Data(StreamDataData),
68 End(StreamEndData),
69 Control(StreamControlData),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StreamStartData {
75 pub stream_id: String,
76 pub message_type: String,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct StreamDataData {
82 pub stream_id: String,
83 pub chunk: String,
84 pub is_final: bool,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct StreamEndData {
90 pub stream_id: String,
91 pub success: bool,
92 pub error: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct StreamControlData {
98 pub stream_id: String,
99}
100
101static STREAM_MANAGER: std::sync::LazyLock<Arc<Mutex<HashMap<String, StreamInfo>>>> =
103 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
104
105fn generate_stream_id() -> String {
107 let timestamp = SystemTime::now()
108 .duration_since(UNIX_EPOCH)
109 .unwrap()
110 .as_nanos();
111 format!("stream_{}", timestamp)
112}
113
114fn send_stream_message_to_frontend(
116 plugin_id: &str,
117 instance_id: &str,
118 message_type: &str,
119 data: StreamMessageData,
120 plugin_ctx: &crate::metadata::PluginInstanceContext,
121) -> bool {
122 let wrapper = StreamMessageWrapper {
123 r#type: message_type.to_string(),
124 plugin_id: plugin_id.to_string(),
125 instance_id: instance_id.to_string(),
126 data,
127 timestamp: SystemTime::now()
128 .duration_since(UNIX_EPOCH)
129 .unwrap()
130 .as_millis() as u64,
131 };
132
133 match serde_json::to_string(&wrapper) {
134 Ok(payload) => plugin_ctx.send_to_frontend("plugin-stream", &payload),
135 Err(_) => false,
136 }
137}
138
139pub trait PluginStreamMessage {
142 fn send_message_stream_start(
144 &self,
145 plugin_ctx: &crate::metadata::PluginInstanceContext,
146 ) -> Result<String, StreamError>;
147
148 fn send_message_stream(
150 &self,
151 stream_id: &str,
152 chunk: &str,
153 is_final: bool,
154 plugin_ctx: &crate::metadata::PluginInstanceContext,
155 ) -> Result<(), StreamError>;
156
157 fn send_message_stream_end(
159 &self,
160 stream_id: &str,
161 success: bool,
162 error_msg: Option<&str>,
163 plugin_ctx: &crate::metadata::PluginInstanceContext,
164 ) -> Result<(), StreamError>;
165
166 fn send_message_stream_pause(
168 &self,
169 stream_id: &str,
170 plugin_ctx: &crate::metadata::PluginInstanceContext,
171 ) -> Result<(), StreamError>;
172
173 fn send_message_stream_resume(
175 &self,
176 stream_id: &str,
177 plugin_ctx: &crate::metadata::PluginInstanceContext,
178 ) -> Result<(), StreamError>;
179
180 fn send_message_stream_cancel(
182 &self,
183 stream_id: &str,
184 plugin_ctx: &crate::metadata::PluginInstanceContext,
185 ) -> Result<(), StreamError>;
186
187 fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus>;
189
190 fn list_active_streams(
192 &self,
193 plugin_ctx: &crate::metadata::PluginInstanceContext,
194 ) -> Vec<String>;
195
196 fn send_message_stream_batch(
198 &self,
199 stream_id: &str,
200 chunks: &[&str],
201 plugin_ctx: &crate::metadata::PluginInstanceContext,
202 ) -> Result<(), StreamError>;
203}
204
205impl<T: PluginHandler> PluginStreamMessage for T {
206 fn send_message_stream_start(
207 &self,
208 plugin_ctx: &crate::metadata::PluginInstanceContext,
209 ) -> Result<String, StreamError> {
210 log_info!("Starting stream with context: {:?}", plugin_ctx);
211 let stream_id = generate_stream_id();
212 let plugin_metadata = self.get_metadata(plugin_ctx);
213 let plugin_id = &plugin_metadata.id;
214 let instance_id = plugin_metadata
215 .instance_id
216 .as_ref()
217 .unwrap_or(&plugin_metadata.id);
218
219 log_info!(
220 "Starting stream: {} {} {}",
221 stream_id,
222 plugin_id,
223 instance_id
224 );
225
226 let data = StreamMessageData::Start(StreamStartData {
227 stream_id: stream_id.clone(),
228 message_type: "stream_start".to_string(),
229 });
230
231 if send_stream_message_to_frontend(plugin_id, instance_id, "stream_start", data, plugin_ctx)
232 {
233 log_info!("Stream started successfully");
234 if let Ok(mut manager) = STREAM_MANAGER.lock() {
236 let stream_info = StreamInfo {
237 id: stream_id.clone(),
238 plugin_id: plugin_id.clone(),
239 message_type: "plugin_stream".to_string(),
240 status: StreamStatus::Active,
241 created_at: SystemTime::now()
242 .duration_since(UNIX_EPOCH)
243 .unwrap()
244 .as_secs(),
245 };
246 manager.insert(stream_id.clone(), stream_info);
247 }
248 Ok(stream_id)
249 } else {
250 Err(StreamError::SendFailed)
251 }
252 }
253
254 fn send_message_stream(
255 &self,
256 stream_id: &str,
257 chunk: &str,
258 is_final: bool,
259 plugin_ctx: &crate::metadata::PluginInstanceContext,
260 ) -> Result<(), StreamError> {
261 {
263 let manager = STREAM_MANAGER
264 .lock()
265 .map_err(|_| StreamError::InvalidState)?;
266 match manager.get(stream_id) {
267 Some(stream_info) => match stream_info.status {
268 StreamStatus::Active | StreamStatus::Finalizing => {}
269 StreamStatus::Paused => return Err(StreamError::InvalidState),
270 StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
271 return Err(StreamError::StreamAlreadyEnded);
272 }
273 },
274 None => return Err(StreamError::StreamNotFound),
275 }
276 }
277
278 let plugin_metadata = self.get_metadata(plugin_ctx);
279 let plugin_id = &plugin_metadata.id;
280 let instance_id = plugin_metadata
281 .instance_id
282 .as_ref()
283 .unwrap_or(&plugin_metadata.id);
284 let data = StreamMessageData::Data(StreamDataData {
285 stream_id: stream_id.to_string(),
286 chunk: chunk.to_string(),
287 is_final,
288 });
289
290 if send_stream_message_to_frontend(plugin_id, instance_id, "stream_data", data, plugin_ctx)
291 {
292 if is_final {
294 if let Ok(mut manager) = STREAM_MANAGER.lock() {
295 if let Some(stream_info) = manager.get_mut(stream_id) {
296 stream_info.status = StreamStatus::Finalizing;
297 }
298 }
299 }
300 Ok(())
301 } else {
302 Err(StreamError::SendFailed)
303 }
304 }
305
306 fn send_message_stream_end(
307 &self,
308 stream_id: &str,
309 success: bool,
310 error_msg: Option<&str>,
311 plugin_ctx: &crate::metadata::PluginInstanceContext,
312 ) -> Result<(), StreamError> {
313 {
315 let manager = STREAM_MANAGER
316 .lock()
317 .map_err(|_| StreamError::InvalidState)?;
318 if !manager.contains_key(stream_id) {
319 return Err(StreamError::StreamNotFound);
320 }
321 }
322
323 let plugin_metadata = self.get_metadata(plugin_ctx);
324 let plugin_id = &plugin_metadata.id;
325 let instance_id = plugin_metadata
326 .instance_id
327 .as_ref()
328 .unwrap_or(&plugin_metadata.id);
329 let data = StreamMessageData::End(StreamEndData {
330 stream_id: stream_id.to_string(),
331 success,
332 error: error_msg.map(|s| s.to_string()),
333 });
334
335 if send_stream_message_to_frontend(plugin_id, instance_id, "stream_end", data, plugin_ctx) {
336 if let Ok(mut manager) = STREAM_MANAGER.lock() {
338 if let Some(stream_info) = manager.get_mut(stream_id) {
339 stream_info.status = if success {
340 StreamStatus::Completed
341 } else {
342 StreamStatus::Error
343 };
344 }
345 }
346 Ok(())
347 } else {
348 Err(StreamError::SendFailed)
349 }
350 }
351
352 fn send_message_stream_pause(
353 &self,
354 stream_id: &str,
355 plugin_ctx: &crate::metadata::PluginInstanceContext,
356 ) -> Result<(), StreamError> {
357 let mut manager = STREAM_MANAGER
358 .lock()
359 .map_err(|_| StreamError::InvalidState)?;
360 match manager.get_mut(stream_id) {
361 Some(stream_info) => {
362 if stream_info.status == StreamStatus::Active {
363 stream_info.status = StreamStatus::Paused;
364
365 let plugin_metadata = self.get_metadata(plugin_ctx);
366 let plugin_id = &plugin_metadata.id;
367 let instance_id = plugin_metadata
368 .instance_id
369 .as_ref()
370 .unwrap_or(&plugin_metadata.id);
371 let data = StreamMessageData::Control(StreamControlData {
372 stream_id: stream_id.to_string(),
373 });
374
375 if send_stream_message_to_frontend(
376 plugin_id,
377 instance_id,
378 "stream_pause",
379 data,
380 plugin_ctx,
381 ) {
382 Ok(())
383 } else {
384 stream_info.status = StreamStatus::Active;
386 Err(StreamError::SendFailed)
387 }
388 } else {
389 Err(StreamError::InvalidState)
390 }
391 }
392 None => Err(StreamError::StreamNotFound),
393 }
394 }
395
396 fn send_message_stream_resume(
397 &self,
398 stream_id: &str,
399 plugin_ctx: &crate::metadata::PluginInstanceContext,
400 ) -> Result<(), StreamError> {
401 let mut manager = STREAM_MANAGER
402 .lock()
403 .map_err(|_| StreamError::InvalidState)?;
404 match manager.get_mut(stream_id) {
405 Some(stream_info) => {
406 if stream_info.status == StreamStatus::Paused {
407 stream_info.status = StreamStatus::Active;
408
409 let plugin_metadata = self.get_metadata(plugin_ctx);
410 let plugin_id = &plugin_metadata.id;
411 let instance_id = plugin_metadata
412 .instance_id
413 .as_ref()
414 .unwrap_or(&plugin_metadata.id);
415 let data = StreamMessageData::Control(StreamControlData {
416 stream_id: stream_id.to_string(),
417 });
418
419 if send_stream_message_to_frontend(
420 plugin_id,
421 instance_id,
422 "stream_resume",
423 data,
424 plugin_ctx,
425 ) {
426 Ok(())
427 } else {
428 stream_info.status = StreamStatus::Paused;
430 Err(StreamError::SendFailed)
431 }
432 } else {
433 Err(StreamError::InvalidState)
434 }
435 }
436 None => Err(StreamError::StreamNotFound),
437 }
438 }
439
440 fn send_message_stream_cancel(
441 &self,
442 stream_id: &str,
443 plugin_ctx: &crate::metadata::PluginInstanceContext,
444 ) -> Result<(), StreamError> {
445 let mut manager = STREAM_MANAGER
446 .lock()
447 .map_err(|_| StreamError::InvalidState)?;
448 match manager.get_mut(stream_id) {
449 Some(stream_info) => match stream_info.status {
450 StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing => {
451 stream_info.status = StreamStatus::Cancelled;
452
453 let plugin_metadata = self.get_metadata(plugin_ctx);
454 let plugin_id = &plugin_metadata.id;
455 let instance_id = plugin_metadata
456 .instance_id
457 .as_ref()
458 .unwrap_or(&plugin_metadata.id);
459 let data = StreamMessageData::Control(StreamControlData {
460 stream_id: stream_id.to_string(),
461 });
462
463 if send_stream_message_to_frontend(
464 plugin_id,
465 instance_id,
466 "stream_cancel",
467 data,
468 plugin_ctx,
469 ) {
470 Ok(())
471 } else {
472 Err(StreamError::SendFailed)
473 }
474 }
475 _ => Err(StreamError::InvalidState),
476 },
477 None => Err(StreamError::StreamNotFound),
478 }
479 }
480
481 fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
482 if let Ok(manager) = STREAM_MANAGER.lock() {
483 manager.get(stream_id).map(|info| info.status.clone())
484 } else {
485 None
486 }
487 }
488
489 fn list_active_streams(
490 &self,
491 plugin_ctx: &crate::metadata::PluginInstanceContext,
492 ) -> Vec<String> {
493 if let Ok(manager) = STREAM_MANAGER.lock() {
494 let plugin_metadata = self.get_metadata(plugin_ctx);
495 let plugin_id = plugin_metadata
496 .instance_id
497 .as_ref()
498 .unwrap_or(&plugin_metadata.id)
499 .clone();
500 manager
501 .iter()
502 .filter(|(_, info)| {
503 info.plugin_id == plugin_id
504 && matches!(
505 info.status,
506 StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing
507 )
508 })
509 .map(|(id, _)| id.clone())
510 .collect()
511 } else {
512 Vec::new()
513 }
514 }
515
516 fn send_message_stream_batch(
517 &self,
518 stream_id: &str,
519 chunks: &[&str],
520 plugin_ctx: &crate::metadata::PluginInstanceContext,
521 ) -> Result<(), StreamError> {
522 {
524 let manager = STREAM_MANAGER
525 .lock()
526 .map_err(|_| StreamError::InvalidState)?;
527 match manager.get(stream_id) {
528 Some(stream_info) => match stream_info.status {
529 StreamStatus::Active | StreamStatus::Finalizing => {}
530 StreamStatus::Paused => return Err(StreamError::InvalidState),
531 StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
532 return Err(StreamError::StreamAlreadyEnded);
533 }
534 },
535 None => return Err(StreamError::StreamNotFound),
536 }
537 }
538
539 let plugin_metadata = self.get_metadata(plugin_ctx);
540 let plugin_id = &plugin_metadata.id;
541 let instance_id = plugin_metadata
542 .instance_id
543 .as_ref()
544 .unwrap_or(&plugin_metadata.id);
545
546 for (i, chunk) in chunks.iter().enumerate() {
547 let is_final = i == chunks.len() - 1;
548 let data = StreamMessageData::Data(StreamDataData {
549 stream_id: stream_id.to_string(),
550 chunk: chunk.to_string(),
551 is_final,
552 });
553
554 if !send_stream_message_to_frontend(
555 plugin_id,
556 instance_id,
557 "stream_data",
558 data,
559 plugin_ctx,
560 ) {
561 return Err(StreamError::SendFailed);
562 }
563 }
564
565 if !chunks.is_empty() {
567 if let Ok(mut manager) = STREAM_MANAGER.lock() {
568 if let Some(stream_info) = manager.get_mut(stream_id) {
569 stream_info.status = StreamStatus::Finalizing;
570 }
571 }
572 }
573
574 Ok(())
575 }
576}