use crate::dora_adapter::error::{DoraError, DoraResult};
use crate::plugin::AgentPlugin;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperatorConfig {
pub operator_id: String,
pub name: String,
pub input_mapping: HashMap<String, String>,
pub output_mapping: HashMap<String, String>,
pub custom_config: HashMap<String, String>,
}
impl Default for OperatorConfig {
fn default() -> Self {
Self {
operator_id: uuid::Uuid::now_v7().to_string(),
name: "default_operator".to_string(),
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
custom_config: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperatorInput {
pub input_id: String,
pub data: Vec<u8>,
pub metadata: HashMap<String, String>,
}
impl OperatorInput {
pub fn new(input_id: String, data: Vec<u8>) -> Self {
Self {
input_id,
data,
metadata: HashMap::new(),
}
}
pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> DoraResult<T> {
bincode::deserialize(&self.data).map_err(|e| DoraError::DeserializationError(e.to_string()))
}
pub fn deserialize_json<T: for<'de> Deserialize<'de>>(&self) -> DoraResult<T> {
serde_json::from_slice(&self.data)
.map_err(|e| DoraError::DeserializationError(e.to_string()))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperatorOutput {
pub output_id: String,
pub data: Vec<u8>,
pub metadata: HashMap<String, String>,
}
impl OperatorOutput {
pub fn new(output_id: String, data: Vec<u8>) -> Self {
Self {
output_id,
data,
metadata: HashMap::new(),
}
}
pub fn from_serializable<T: Serialize>(output_id: String, value: &T) -> DoraResult<Self> {
let data = bincode::serialize(value)?;
Ok(Self::new(output_id, data))
}
pub fn from_json<T: Serialize>(output_id: String, value: &T) -> DoraResult<Self> {
let data = serde_json::to_vec(value)?;
Ok(Self::new(output_id, data))
}
}
pub struct DoraPluginOperator {
config: OperatorConfig,
plugin: Arc<RwLock<Box<dyn AgentPlugin>>>,
initialized: bool,
}
impl DoraPluginOperator {
pub fn new(config: OperatorConfig, plugin: Box<dyn AgentPlugin>) -> Self {
Self {
config,
plugin: Arc::new(RwLock::new(plugin)),
initialized: false,
}
}
pub fn config(&self) -> &OperatorConfig {
&self.config
}
pub async fn init(&mut self) -> DoraResult<()> {
if self.initialized {
return Ok(());
}
let mut plugin = self.plugin.write().await;
plugin
.init_plugin()
.await
.map_err(|e| DoraError::OperatorError(e.to_string()))?;
self.initialized = true;
info!("DoraPluginOperator {} initialized", self.config.operator_id);
Ok(())
}
pub async fn on_input(&self, input: OperatorInput) -> DoraResult<Vec<OperatorOutput>> {
if !self.initialized {
return Err(DoraError::OperatorError(
"Operator not initialized".to_string(),
));
}
let input_str = String::from_utf8(input.data.clone())
.unwrap_or_else(|_| format!("binary_data_{}", input.data.len()));
let mut plugin = self.plugin.write().await;
let result = plugin
.execute(input_str)
.await
.map_err(|e| DoraError::OperatorError(e.to_string()))?;
let output = OperatorOutput::new("default_output".to_string(), result.into_bytes());
Ok(vec![output])
}
pub async fn on_inputs(&self, inputs: Vec<OperatorInput>) -> DoraResult<Vec<OperatorOutput>> {
let mut outputs = Vec::new();
for input in inputs {
let mut output = self.on_input(input).await?;
outputs.append(&mut output);
}
Ok(outputs)
}
}
#[async_trait::async_trait]
pub trait MoFAOperator: Send + Sync {
fn operator_id(&self) -> &str;
async fn init_operator(&mut self) -> DoraResult<()>;
async fn process(&mut self, input: OperatorInput) -> DoraResult<Vec<OperatorOutput>>;
async fn cleanup(&mut self) -> DoraResult<()>;
}
pub struct PluginOperatorAdapter {
plugin: Box<dyn AgentPlugin>,
operator_id: String,
}
impl PluginOperatorAdapter {
pub fn new(operator_id: String, plugin: Box<dyn AgentPlugin>) -> Self {
Self {
plugin,
operator_id,
}
}
}
#[async_trait::async_trait]
impl MoFAOperator for PluginOperatorAdapter {
fn operator_id(&self) -> &str {
&self.operator_id
}
async fn init_operator(&mut self) -> DoraResult<()> {
self.plugin
.init_plugin()
.await
.map_err(|e| DoraError::OperatorError(e.to_string()))
}
async fn process(&mut self, input: OperatorInput) -> DoraResult<Vec<OperatorOutput>> {
let input_str =
String::from_utf8(input.data).unwrap_or_else(|_| "invalid_utf8".to_string());
let result = self
.plugin
.execute(input_str)
.await
.map_err(|e| DoraError::OperatorError(e.to_string()))?;
let output = OperatorOutput::new("output".to_string(), result.into_bytes());
Ok(vec![output])
}
async fn cleanup(&mut self) -> DoraResult<()> {
Ok(())
}
}
pub struct OperatorChain {
operators: Vec<Box<dyn MoFAOperator>>,
}
impl OperatorChain {
pub fn new() -> Self {
Self {
operators: Vec::new(),
}
}
pub fn add_operator(&mut self, operator: Box<dyn MoFAOperator>) {
self.operators.push(operator);
}
pub async fn init_all(&mut self) -> DoraResult<()> {
for op in &mut self.operators {
op.init_operator().await?;
}
Ok(())
}
pub async fn process(&mut self, input: OperatorInput) -> DoraResult<Vec<OperatorOutput>> {
if self.operators.is_empty() {
return Ok(vec![]);
}
let mut current_outputs = vec![OperatorOutput::new(
input.input_id.clone(),
input.data.clone(),
)];
for op in &mut self.operators {
let mut next_outputs = Vec::new();
for output in current_outputs {
let input = OperatorInput::new(output.output_id, output.data);
let mut results = op.process(input).await?;
next_outputs.append(&mut results);
}
current_outputs = next_outputs;
}
Ok(current_outputs)
}
}
impl Default for OperatorChain {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::plugin::LLMPlugin;
#[tokio::test]
async fn test_plugin_operator_adapter() {
let plugin = Box::new(LLMPlugin::new("test_llm"));
let mut adapter = PluginOperatorAdapter::new("test_op".to_string(), plugin);
adapter.init_operator().await.unwrap();
let input = OperatorInput::new("input".to_string(), b"Hello".to_vec());
let outputs = adapter.process(input).await.unwrap();
assert!(!outputs.is_empty());
}
#[tokio::test]
async fn test_operator_chain() {
let mut chain = OperatorChain::new();
let plugin1 = Box::new(LLMPlugin::new("llm1"));
let adapter1 = PluginOperatorAdapter::new("op1".to_string(), plugin1);
chain.add_operator(Box::new(adapter1));
chain.init_all().await.unwrap();
let input = OperatorInput::new("input".to_string(), b"Test".to_vec());
let outputs = chain.process(input).await.unwrap();
assert!(!outputs.is_empty());
}
}