use std::time::Instant;
use crate::ai::provider::{
AiProvider, CompletionRequest, MessageRole, ProviderMessage,
};
use crate::errors::{NoosError, NoosResult};
use crate::kernel::events::{EventBus, PipelineDoneEvent, PipelineStepEvent};
pub struct PrimitiveStep {
pub id: String,
pub name: String,
pub system_prompt: String,
pub model: String,
pub max_tokens: u32,
pub temperature: f32,
}
pub struct PipelineConfig {
pub composition_id: String,
pub steps: Vec<PrimitiveStep>,
}
#[derive(Debug, Clone)]
pub struct StepResult {
pub step_index: usize,
pub primitive_id: String,
pub output: String,
pub model: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub struct PipelineResult {
pub composition_id: String,
pub steps: Vec<StepResult>,
pub final_output: String,
pub total_duration_ms: u64,
}
pub async fn execute_pipeline(
config: &PipelineConfig,
input: &str,
ai: &dyn AiProvider,
events: Option<&EventBus>,
) -> NoosResult<PipelineResult> {
let pipeline_start = Instant::now();
let mut step_results = Vec::new();
let mut current_input = input.to_string();
for (i, step) in config.steps.iter().enumerate() {
let step_start = Instant::now();
let user_content = if i == 0 {
current_input.clone()
} else {
format!(
"Original input: {}\n\nPrevious analysis:\n{}",
input, current_input
)
};
let request = CompletionRequest {
model: step.model.clone(),
messages: vec![ProviderMessage {
role: MessageRole::User,
content: user_content,
}],
system_prompt: Some(step.system_prompt.clone()),
max_tokens: step.max_tokens,
temperature: step.temperature,
stream: false,
};
let response = ai.complete(request).await.map_err(|e| NoosError::Pipeline {
composition_id: config.composition_id.clone(),
step: i,
message: e.to_string(),
})?;
let duration_ms = step_start.elapsed().as_millis() as u64;
if let Some(bus) = events {
bus.emit(PipelineStepEvent {
composition_id: config.composition_id.clone(),
step: i,
primitive: step.id.clone(),
model: step.model.clone(),
duration_ms,
});
}
step_results.push(StepResult {
step_index: i,
primitive_id: step.id.clone(),
output: response.text.clone(),
model: response.model,
duration_ms,
});
current_input = response.text;
}
let total_duration_ms = pipeline_start.elapsed().as_millis() as u64;
if let Some(bus) = events {
bus.emit(PipelineDoneEvent {
composition_id: config.composition_id.clone(),
total_steps: config.steps.len(),
duration_ms: total_duration_ms,
});
}
Ok(PipelineResult {
composition_id: config.composition_id.clone(),
final_output: current_input,
steps: step_results,
total_duration_ms,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::ai::provider::{AiProviderType, CompletionResponse, StreamChunk, TokenUsage};
use async_trait::async_trait;
struct MockAi {
response: String,
}
#[async_trait]
impl AiProvider for MockAi {
fn provider_type(&self) -> AiProviderType {
AiProviderType::Local
}
async fn complete(&self, _request: CompletionRequest) -> NoosResult<CompletionResponse> {
Ok(CompletionResponse {
text: self.response.clone(),
usage: TokenUsage::default(),
model: "mock".into(),
})
}
async fn stream(
&self,
_request: CompletionRequest,
sender: tokio::sync::mpsc::Sender<StreamChunk>,
) -> NoosResult<()> {
let _ = sender.send(StreamChunk::TextDelta(self.response.clone())).await;
let _ = sender.send(StreamChunk::Done).await;
Ok(())
}
}
#[tokio::test]
async fn single_step_pipeline() {
let ai = MockAi {
response: "analysis result".into(),
};
let config = PipelineConfig {
composition_id: "test".into(),
steps: vec![PrimitiveStep {
id: "superpose".into(),
name: "Superpose".into(),
system_prompt: "Analyze this".into(),
model: "mock".into(),
max_tokens: 1024,
temperature: 0.7,
}],
};
let result = execute_pipeline(&config, "input text", &ai, None).await.unwrap();
assert_eq!(result.final_output, "analysis result");
assert_eq!(result.steps.len(), 1);
assert_eq!(result.composition_id, "test");
}
#[tokio::test]
async fn multi_step_pipeline_chains_output() {
let ai = MockAi {
response: "step output".into(),
};
let config = PipelineConfig {
composition_id: "dissolve".into(),
steps: vec![
PrimitiveStep {
id: "superpose".into(),
name: "S".into(),
system_prompt: "Step 1".into(),
model: "mock".into(),
max_tokens: 1024,
temperature: 0.7,
},
PrimitiveStep {
id: "interfere".into(),
name: "I".into(),
system_prompt: "Step 2".into(),
model: "mock".into(),
max_tokens: 1024,
temperature: 0.7,
},
],
};
let result = execute_pipeline(&config, "input", &ai, None).await.unwrap();
assert_eq!(result.steps.len(), 2);
assert_eq!(result.steps[0].primitive_id, "superpose");
assert_eq!(result.steps[1].primitive_id, "interfere");
}
#[tokio::test]
async fn pipeline_emits_events() {
let ai = MockAi {
response: "ok".into(),
};
let bus = EventBus::new();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let c = count.clone();
bus.on::<PipelineStepEvent>(move |_| {
c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
});
let config = PipelineConfig {
composition_id: "test".into(),
steps: vec![PrimitiveStep {
id: "s".into(),
name: "S".into(),
system_prompt: "".into(),
model: "m".into(),
max_tokens: 100,
temperature: 0.5,
}],
};
execute_pipeline(&config, "x", &ai, Some(&bus)).await.unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::SeqCst), 1);
}
}