1use crate::{log_info, send_to_frontend, PluginHandler};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7#[derive(Debug, Clone)]
9pub enum StreamError {
10 SendFailed,
11 InvalidStreamId,
12 StreamNotFound,
13 StreamAlreadyEnded,
14 InvalidState,
15}
16
17impl std::fmt::Display for StreamError {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 match self {
20 StreamError::SendFailed => write!(f, "Failed to send message to frontend"),
21 StreamError::InvalidStreamId => write!(f, "Invalid stream ID"),
22 StreamError::StreamNotFound => write!(f, "Stream not found"),
23 StreamError::StreamAlreadyEnded => write!(f, "Stream already ended"),
24 StreamError::InvalidState => write!(f, "Invalid stream state"),
25 }
26 }
27}
28
29impl std::error::Error for StreamError {}
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum StreamStatus {
34 Active,
35 Paused,
36 Finalizing,
37 Completed,
38 Error,
39 Cancelled,
40}
41
42#[derive(Debug, Clone)]
44pub struct StreamInfo {
45 pub id: String,
46 pub plugin_id: String,
47 pub stream_type: String,
48 pub status: StreamStatus,
49 pub created_at: u64,
50 pub metadata: Option<String>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct StreamMessageWrapper {
56 pub r#type: String,
57 pub plugin_id: String,
58 pub data: StreamMessageData,
59 pub timestamp: u64,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(untagged)]
65pub enum StreamMessageData {
66 Start(StreamStartData),
67 Data(StreamDataData),
68 End(StreamEndData),
69 Control(StreamControlData),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StreamStartData {
75 pub stream_id: String,
76 pub stream_type: String,
77 pub metadata: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct StreamDataData {
83 pub stream_id: String,
84 pub chunk: String,
85 pub is_final: bool,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct StreamEndData {
91 pub stream_id: String,
92 pub success: bool,
93 pub error: Option<String>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StreamControlData {
99 pub stream_id: String,
100}
101
102static STREAM_MANAGER: std::sync::LazyLock<Arc<Mutex<HashMap<String, StreamInfo>>>> =
104 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
105
106fn generate_stream_id() -> String {
108 let timestamp = SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .unwrap()
111 .as_nanos();
112 format!("stream_{}", timestamp)
113}
114
115fn send_stream_message_to_frontend(
117 plugin_id: &str,
118 message_type: &str,
119 data: StreamMessageData,
120) -> bool {
121 let wrapper = StreamMessageWrapper {
122 r#type: message_type.to_string(),
123 plugin_id: plugin_id.to_string(),
124 data,
125 timestamp: SystemTime::now()
126 .duration_since(UNIX_EPOCH)
127 .unwrap()
128 .as_millis() as u64,
129 };
130
131 match serde_json::to_string(&wrapper) {
132 Ok(payload) => send_to_frontend("plugin-stream", &payload),
133 Err(_) => false,
134 }
135}
136
137pub trait PluginStreamMessage {
139 fn send_message_stream_start(
141 &self,
142 stream_type: &str,
143 metadata: Option<&str>,
144 ) -> Result<String, StreamError>;
145
146 fn send_message_stream(
148 &self,
149 stream_id: &str,
150 chunk: &str,
151 is_final: bool,
152 ) -> Result<(), StreamError>;
153
154 fn send_message_stream_end(
156 &self,
157 stream_id: &str,
158 success: bool,
159 error_msg: Option<&str>,
160 ) -> Result<(), StreamError>;
161
162 fn send_message_stream_pause(&self, stream_id: &str) -> Result<(), StreamError>;
164
165 fn send_message_stream_resume(&self, stream_id: &str) -> Result<(), StreamError>;
167
168 fn send_message_stream_cancel(&self, stream_id: &str) -> Result<(), StreamError>;
170
171 fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus>;
173
174 fn list_active_streams(&self) -> Vec<String>;
176
177 fn send_message_stream_batch(
179 &self,
180 stream_id: &str,
181 chunks: &[&str],
182 ) -> Result<(), StreamError>;
183}
184
185impl<T: PluginHandler> PluginStreamMessage for T {
186 fn send_message_stream_start(
187 &self,
188 stream_type: &str,
189 metadata: Option<&str>,
190 ) -> Result<String, StreamError> {
191 let stream_id = generate_stream_id();
192 let plugin_id = self.get_metadata().id.clone();
193
194 log_info!("Starting stream: {} {}", stream_id, plugin_id);
195
196 let data = StreamMessageData::Start(StreamStartData {
197 stream_id: stream_id.clone(),
198 stream_type: stream_type.to_string(),
199 metadata: metadata.map(|s| s.to_string()),
200 });
201
202 if send_stream_message_to_frontend(&plugin_id, "stream_start", data) {
203 if let Ok(mut manager) = STREAM_MANAGER.lock() {
205 let stream_info = StreamInfo {
206 id: stream_id.clone(),
207 plugin_id: plugin_id.clone(),
208 stream_type: stream_type.to_string(),
209 status: StreamStatus::Active,
210 created_at: SystemTime::now()
211 .duration_since(UNIX_EPOCH)
212 .unwrap()
213 .as_secs(),
214 metadata: metadata.map(|s| s.to_string()),
215 };
216 manager.insert(stream_id.clone(), stream_info);
217 }
218 Ok(stream_id)
219 } else {
220 Err(StreamError::SendFailed)
221 }
222 }
223
224 fn send_message_stream(
225 &self,
226 stream_id: &str,
227 chunk: &str,
228 is_final: bool,
229 ) -> Result<(), StreamError> {
230 {
232 let manager = STREAM_MANAGER
233 .lock()
234 .map_err(|_| StreamError::InvalidState)?;
235 match manager.get(stream_id) {
236 Some(stream_info) => match stream_info.status {
237 StreamStatus::Active | StreamStatus::Finalizing => {}
238 StreamStatus::Paused => return Err(StreamError::InvalidState),
239 StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
240 return Err(StreamError::StreamAlreadyEnded);
241 }
242 },
243 None => return Err(StreamError::StreamNotFound),
244 }
245 }
246
247 let plugin_id = self.get_metadata().id.clone();
248 let data = StreamMessageData::Data(StreamDataData {
249 stream_id: stream_id.to_string(),
250 chunk: chunk.to_string(),
251 is_final,
252 });
253
254 if send_stream_message_to_frontend(&plugin_id, "stream_data", data) {
255 if is_final {
257 if let Ok(mut manager) = STREAM_MANAGER.lock() {
258 if let Some(stream_info) = manager.get_mut(stream_id) {
259 stream_info.status = StreamStatus::Finalizing;
260 }
261 }
262 }
263 Ok(())
264 } else {
265 Err(StreamError::SendFailed)
266 }
267 }
268
269 fn send_message_stream_end(
270 &self,
271 stream_id: &str,
272 success: bool,
273 error_msg: Option<&str>,
274 ) -> Result<(), StreamError> {
275 {
277 let manager = STREAM_MANAGER
278 .lock()
279 .map_err(|_| StreamError::InvalidState)?;
280 if !manager.contains_key(stream_id) {
281 return Err(StreamError::StreamNotFound);
282 }
283 }
284
285 let plugin_id = self.get_metadata().id.clone();
286 let data = StreamMessageData::End(StreamEndData {
287 stream_id: stream_id.to_string(),
288 success,
289 error: error_msg.map(|s| s.to_string()),
290 });
291
292 if send_stream_message_to_frontend(&plugin_id, "stream_end", data) {
293 if let Ok(mut manager) = STREAM_MANAGER.lock() {
295 if let Some(stream_info) = manager.get_mut(stream_id) {
296 stream_info.status = if success {
297 StreamStatus::Completed
298 } else {
299 StreamStatus::Error
300 };
301 }
302 }
303 Ok(())
304 } else {
305 Err(StreamError::SendFailed)
306 }
307 }
308
309 fn send_message_stream_pause(&self, stream_id: &str) -> Result<(), StreamError> {
310 let mut manager = STREAM_MANAGER
311 .lock()
312 .map_err(|_| StreamError::InvalidState)?;
313 match manager.get_mut(stream_id) {
314 Some(stream_info) => {
315 if stream_info.status == StreamStatus::Active {
316 stream_info.status = StreamStatus::Paused;
317
318 let plugin_id = self.get_metadata().id.clone();
319 let data = StreamMessageData::Control(StreamControlData {
320 stream_id: stream_id.to_string(),
321 });
322
323 if send_stream_message_to_frontend(&plugin_id, "stream_pause", data) {
324 Ok(())
325 } else {
326 stream_info.status = StreamStatus::Active;
328 Err(StreamError::SendFailed)
329 }
330 } else {
331 Err(StreamError::InvalidState)
332 }
333 }
334 None => Err(StreamError::StreamNotFound),
335 }
336 }
337
338 fn send_message_stream_resume(&self, stream_id: &str) -> Result<(), StreamError> {
339 let mut manager = STREAM_MANAGER
340 .lock()
341 .map_err(|_| StreamError::InvalidState)?;
342 match manager.get_mut(stream_id) {
343 Some(stream_info) => {
344 if stream_info.status == StreamStatus::Paused {
345 stream_info.status = StreamStatus::Active;
346
347 let plugin_id = self.get_metadata().id.clone();
348 let data = StreamMessageData::Control(StreamControlData {
349 stream_id: stream_id.to_string(),
350 });
351
352 if send_stream_message_to_frontend(&plugin_id, "stream_resume", data) {
353 Ok(())
354 } else {
355 stream_info.status = StreamStatus::Paused;
357 Err(StreamError::SendFailed)
358 }
359 } else {
360 Err(StreamError::InvalidState)
361 }
362 }
363 None => Err(StreamError::StreamNotFound),
364 }
365 }
366
367 fn send_message_stream_cancel(&self, stream_id: &str) -> Result<(), StreamError> {
368 let mut manager = STREAM_MANAGER
369 .lock()
370 .map_err(|_| StreamError::InvalidState)?;
371 match manager.get_mut(stream_id) {
372 Some(stream_info) => match stream_info.status {
373 StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing => {
374 stream_info.status = StreamStatus::Cancelled;
375
376 let plugin_id = self.get_metadata().id.clone();
377 let data = StreamMessageData::Control(StreamControlData {
378 stream_id: stream_id.to_string(),
379 });
380
381 if send_stream_message_to_frontend(&plugin_id, "stream_cancel", data) {
382 Ok(())
383 } else {
384 Err(StreamError::SendFailed)
385 }
386 }
387 _ => Err(StreamError::InvalidState),
388 },
389 None => Err(StreamError::StreamNotFound),
390 }
391 }
392
393 fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
394 if let Ok(manager) = STREAM_MANAGER.lock() {
395 manager.get(stream_id).map(|info| info.status.clone())
396 } else {
397 None
398 }
399 }
400
401 fn list_active_streams(&self) -> Vec<String> {
402 if let Ok(manager) = STREAM_MANAGER.lock() {
403 let plugin_id = self.get_metadata().id.clone();
404 manager
405 .iter()
406 .filter(|(_, info)| {
407 info.plugin_id == plugin_id
408 && matches!(
409 info.status,
410 StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing
411 )
412 })
413 .map(|(id, _)| id.clone())
414 .collect()
415 } else {
416 Vec::new()
417 }
418 }
419
420 fn send_message_stream_batch(
421 &self,
422 stream_id: &str,
423 chunks: &[&str],
424 ) -> Result<(), StreamError> {
425 {
427 let manager = STREAM_MANAGER
428 .lock()
429 .map_err(|_| StreamError::InvalidState)?;
430 match manager.get(stream_id) {
431 Some(stream_info) => match stream_info.status {
432 StreamStatus::Active | StreamStatus::Finalizing => {}
433 StreamStatus::Paused => return Err(StreamError::InvalidState),
434 StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
435 return Err(StreamError::StreamAlreadyEnded);
436 }
437 },
438 None => return Err(StreamError::StreamNotFound),
439 }
440 }
441
442 let plugin_id = self.get_metadata().id.clone();
443
444 for (i, chunk) in chunks.iter().enumerate() {
445 let is_final = i == chunks.len() - 1;
446 let data = StreamMessageData::Data(StreamDataData {
447 stream_id: stream_id.to_string(),
448 chunk: chunk.to_string(),
449 is_final,
450 });
451
452 if !send_stream_message_to_frontend(&plugin_id, "stream_data", data) {
453 return Err(StreamError::SendFailed);
454 }
455 }
456
457 if !chunks.is_empty() {
459 if let Ok(mut manager) = STREAM_MANAGER.lock() {
460 if let Some(stream_info) = manager.get_mut(stream_id) {
461 stream_info.status = StreamStatus::Finalizing;
462 }
463 }
464 }
465
466 Ok(())
467 }
468}