use crate::{log_info, PluginHandler};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub enum StreamError {
SendFailed,
InvalidStreamId,
StreamNotFound,
StreamAlreadyEnded,
InvalidState,
}
impl std::fmt::Display for StreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamError::SendFailed => write!(f, "Failed to send message to frontend"),
StreamError::InvalidStreamId => write!(f, "Invalid stream ID"),
StreamError::StreamNotFound => write!(f, "Stream not found"),
StreamError::StreamAlreadyEnded => write!(f, "Stream already ended"),
StreamError::InvalidState => write!(f, "Invalid stream state"),
}
}
}
impl std::error::Error for StreamError {}
#[derive(Debug, Clone, PartialEq)]
pub enum StreamStatus {
Active,
Paused,
Finalizing,
Completed,
Error,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub id: String,
pub plugin_id: String,
pub message_type: String,
pub status: StreamStatus,
pub created_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamMessageWrapper {
pub r#type: String,
pub plugin_id: String,
pub instance_id: String,
pub data: StreamMessageData,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum StreamMessageData {
Start(StreamStartData),
Data(StreamDataData),
End(StreamEndData),
Control(StreamControlData),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamStartData {
pub stream_id: String,
pub message_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamDataData {
pub stream_id: String,
pub chunk: String,
pub is_final: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamEndData {
pub stream_id: String,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamControlData {
pub stream_id: String,
}
static STREAM_MANAGER: std::sync::LazyLock<Arc<Mutex<HashMap<String, StreamInfo>>>> =
std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
fn generate_stream_id() -> String {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("stream_{}", timestamp)
}
fn send_stream_message_to_frontend(
plugin_id: &str,
instance_id: &str,
message_type: &str,
data: StreamMessageData,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> bool {
let wrapper = StreamMessageWrapper {
r#type: message_type.to_string(),
plugin_id: plugin_id.to_string(),
instance_id: instance_id.to_string(),
data,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
};
match serde_json::to_string(&wrapper) {
Ok(payload) => plugin_ctx.send_to_frontend("plugin-stream", &payload),
Err(_) => false,
}
}
pub trait PluginStreamMessage {
fn send_message_stream_start(
&self,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<String, StreamError>;
fn send_message_stream(
&self,
stream_id: &str,
chunk: &str,
is_final: bool,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
fn send_message_stream_end(
&self,
stream_id: &str,
success: bool,
error_msg: Option<&str>,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
fn send_message_stream_pause(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
fn send_message_stream_resume(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
fn send_message_stream_cancel(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus>;
fn list_active_streams(
&self,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Vec<String>;
fn send_message_stream_batch(
&self,
stream_id: &str,
chunks: &[&str],
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError>;
}
impl<T: PluginHandler> PluginStreamMessage for T {
fn send_message_stream_start(
&self,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<String, StreamError> {
log_info!("Starting stream with context: {:?}", plugin_ctx);
let stream_id = generate_stream_id();
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
log_info!(
"Starting stream: {} {} {}",
stream_id,
plugin_id,
instance_id
);
let data = StreamMessageData::Start(StreamStartData {
stream_id: stream_id.clone(),
message_type: "stream_start".to_string(),
});
if send_stream_message_to_frontend(plugin_id, instance_id, "stream_start", data, plugin_ctx)
{
log_info!("Stream started successfully");
if let Ok(mut manager) = STREAM_MANAGER.lock() {
let stream_info = StreamInfo {
id: stream_id.clone(),
plugin_id: plugin_id.clone(),
message_type: "plugin_stream".to_string(),
status: StreamStatus::Active,
created_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
};
manager.insert(stream_id.clone(), stream_info);
}
Ok(stream_id)
} else {
Err(StreamError::SendFailed)
}
}
fn send_message_stream(
&self,
stream_id: &str,
chunk: &str,
is_final: bool,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
{
let manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
match manager.get(stream_id) {
Some(stream_info) => match stream_info.status {
StreamStatus::Active | StreamStatus::Finalizing => {}
StreamStatus::Paused => return Err(StreamError::InvalidState),
StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
return Err(StreamError::StreamAlreadyEnded);
}
},
None => return Err(StreamError::StreamNotFound),
}
}
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
let data = StreamMessageData::Data(StreamDataData {
stream_id: stream_id.to_string(),
chunk: chunk.to_string(),
is_final,
});
if send_stream_message_to_frontend(plugin_id, instance_id, "stream_data", data, plugin_ctx)
{
if is_final {
if let Ok(mut manager) = STREAM_MANAGER.lock() {
if let Some(stream_info) = manager.get_mut(stream_id) {
stream_info.status = StreamStatus::Finalizing;
}
}
}
Ok(())
} else {
Err(StreamError::SendFailed)
}
}
fn send_message_stream_end(
&self,
stream_id: &str,
success: bool,
error_msg: Option<&str>,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
{
let manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
if !manager.contains_key(stream_id) {
return Err(StreamError::StreamNotFound);
}
}
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
let data = StreamMessageData::End(StreamEndData {
stream_id: stream_id.to_string(),
success,
error: error_msg.map(|s| s.to_string()),
});
if send_stream_message_to_frontend(plugin_id, instance_id, "stream_end", data, plugin_ctx) {
if let Ok(mut manager) = STREAM_MANAGER.lock() {
if let Some(stream_info) = manager.get_mut(stream_id) {
stream_info.status = if success {
StreamStatus::Completed
} else {
StreamStatus::Error
};
}
}
Ok(())
} else {
Err(StreamError::SendFailed)
}
}
fn send_message_stream_pause(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
let mut manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
match manager.get_mut(stream_id) {
Some(stream_info) => {
if stream_info.status == StreamStatus::Active {
stream_info.status = StreamStatus::Paused;
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
let data = StreamMessageData::Control(StreamControlData {
stream_id: stream_id.to_string(),
});
if send_stream_message_to_frontend(
plugin_id,
instance_id,
"stream_pause",
data,
plugin_ctx,
) {
Ok(())
} else {
stream_info.status = StreamStatus::Active;
Err(StreamError::SendFailed)
}
} else {
Err(StreamError::InvalidState)
}
}
None => Err(StreamError::StreamNotFound),
}
}
fn send_message_stream_resume(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
let mut manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
match manager.get_mut(stream_id) {
Some(stream_info) => {
if stream_info.status == StreamStatus::Paused {
stream_info.status = StreamStatus::Active;
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
let data = StreamMessageData::Control(StreamControlData {
stream_id: stream_id.to_string(),
});
if send_stream_message_to_frontend(
plugin_id,
instance_id,
"stream_resume",
data,
plugin_ctx,
) {
Ok(())
} else {
stream_info.status = StreamStatus::Paused;
Err(StreamError::SendFailed)
}
} else {
Err(StreamError::InvalidState)
}
}
None => Err(StreamError::StreamNotFound),
}
}
fn send_message_stream_cancel(
&self,
stream_id: &str,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
let mut manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
match manager.get_mut(stream_id) {
Some(stream_info) => match stream_info.status {
StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing => {
stream_info.status = StreamStatus::Cancelled;
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
let data = StreamMessageData::Control(StreamControlData {
stream_id: stream_id.to_string(),
});
if send_stream_message_to_frontend(
plugin_id,
instance_id,
"stream_cancel",
data,
plugin_ctx,
) {
Ok(())
} else {
Err(StreamError::SendFailed)
}
}
_ => Err(StreamError::InvalidState),
},
None => Err(StreamError::StreamNotFound),
}
}
fn get_stream_status(&self, stream_id: &str) -> Option<StreamStatus> {
if let Ok(manager) = STREAM_MANAGER.lock() {
manager.get(stream_id).map(|info| info.status.clone())
} else {
None
}
}
fn list_active_streams(
&self,
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Vec<String> {
if let Ok(manager) = STREAM_MANAGER.lock() {
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id)
.clone();
manager
.iter()
.filter(|(_, info)| {
info.plugin_id == plugin_id
&& matches!(
info.status,
StreamStatus::Active | StreamStatus::Paused | StreamStatus::Finalizing
)
})
.map(|(id, _)| id.clone())
.collect()
} else {
Vec::new()
}
}
fn send_message_stream_batch(
&self,
stream_id: &str,
chunks: &[&str],
plugin_ctx: &crate::metadata::PluginInstanceContext,
) -> Result<(), StreamError> {
{
let manager = STREAM_MANAGER
.lock()
.map_err(|_| StreamError::InvalidState)?;
match manager.get(stream_id) {
Some(stream_info) => match stream_info.status {
StreamStatus::Active | StreamStatus::Finalizing => {}
StreamStatus::Paused => return Err(StreamError::InvalidState),
StreamStatus::Completed | StreamStatus::Error | StreamStatus::Cancelled => {
return Err(StreamError::StreamAlreadyEnded);
}
},
None => return Err(StreamError::StreamNotFound),
}
}
let plugin_metadata = self.get_metadata(plugin_ctx);
let plugin_id = &plugin_metadata.id;
let instance_id = plugin_metadata
.instance_id
.as_ref()
.unwrap_or(&plugin_metadata.id);
for (i, chunk) in chunks.iter().enumerate() {
let is_final = i == chunks.len() - 1;
let data = StreamMessageData::Data(StreamDataData {
stream_id: stream_id.to_string(),
chunk: chunk.to_string(),
is_final,
});
if !send_stream_message_to_frontend(
plugin_id,
instance_id,
"stream_data",
data,
plugin_ctx,
) {
return Err(StreamError::SendFailed);
}
}
if !chunks.is_empty() {
if let Ok(mut manager) = STREAM_MANAGER.lock() {
if let Some(stream_info) = manager.get_mut(stream_id) {
stream_info.status = StreamStatus::Finalizing;
}
}
}
Ok(())
}
}