use crate::core::tasks::NaturalLanguageTask;
use crate::executors::Executor;
use crate::prompts::{
build_skill_md_prompt, generate_instances_registry, generate_skills_registry,
};
use crate::skill_loader::SkillLoader;
use crate::skill_scheduler::SkillScheduler;
use crate::tasks::{self, ExecutableTask, TaskStatus};
use crate::workflow::{WorkflowCallback, WorkflowExecutionResult, WorkflowExecutor, WorkflowMode};
use crate::{
HippoxConfig, IdentityInformation, IntentAnalysisResult, Pipeline, SystemPipeline,
WorkflowExecResult, get_config, i18n, needs_format_conversion, t, update_config,
};
use langhub::LLMClient;
use langhub::types::{ChatMessage, ModelProvider};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::info;
#[derive(Clone)]
pub struct Hippox {
scheduler: SkillScheduler,
executor: Executor,
workflow_mode: WorkflowMode,
workflow_executor: WorkflowExecutor,
is_first_message: Arc<AtomicBool>,
}
impl Hippox {
pub async fn new(
provider: ModelProvider,
api_key: Option<String>,
extra_keys: Option<HashMap<String, String>>,
config: Option<HippoxConfig>,
) -> anyhow::Result<Self> {
Self::with_workflow_mode(
provider,
api_key,
extra_keys,
config,
WorkflowMode::default(),
)
.await
}
pub async fn with_workflow_mode(
provider: ModelProvider,
api_key: Option<String>,
extra_keys: Option<HashMap<String, String>>,
config: Option<HippoxConfig>,
workflow_mode: WorkflowMode,
) -> anyhow::Result<Self> {
info!(
"Initializing Hippox core with workflow mode: {}",
workflow_mode
);
update_config(|global| *global = config.unwrap_or_default())?;
let config = get_config();
i18n::set_language(&config.lang);
let llm = LLMClient::new_with_key(provider, api_key, extra_keys)?;
let scheduler = SkillScheduler::new(llm);
let executor = Executor::new();
let workflow_executor = WorkflowExecutor::new(workflow_mode);
Ok(Self {
scheduler,
executor,
workflow_mode,
workflow_executor,
is_first_message: Arc::new(AtomicBool::new(false)),
})
}
pub fn refresh_llm_skill_registry(&self) {
self.is_first_message.store(false, Ordering::SeqCst);
}
pub fn refresh_llm_instances(&self) {
self.is_first_message.store(false, Ordering::SeqCst);
}
pub fn get_skills_registry(&self) -> String {
generate_skills_registry()
}
pub fn get_instances_registry(&self) -> String {
generate_instances_registry()
}
pub fn get_identity(&self) -> IdentityInformation {
self.get_config().identity_information
}
pub fn update_identity<F>(&self, f: F) -> anyhow::Result<()>
where
F: FnOnce(&mut IdentityInformation),
{
self.update_config(|config| {
f(&mut config.identity_information);
})
}
pub fn set_identity(&self, identity: IdentityInformation) -> anyhow::Result<()> {
self.update_config(|config| {
config.identity_information = identity;
})
}
pub fn submit(&self, input: &str, callback: Option<Arc<dyn WorkflowCallback>>) -> String {
let executable = Arc::new(NaturalLanguageTask::new(
input.to_string(),
self.workflow_executor.clone(),
self.scheduler.clone(),
));
let task_id = futures::executor::block_on(tasks::create_task_with_executable(
"natural_language".to_string(),
input.to_string(),
executable,
callback,
));
info!(
"Created natural language task: {} with input: {}",
task_id, input
);
task_id
}
pub fn submit_batch(
&self,
inputs: Vec<(String, Option<String>, Option<Arc<dyn WorkflowCallback>>)>,
) -> Vec<String> {
inputs
.into_iter()
.map(|(input, _session_id, callback)| self.submit(&input, callback))
.collect()
}
pub async fn execute(
&self,
input: &str,
callback: Option<Arc<dyn WorkflowCallback>>,
) -> String {
let pipeline = SystemPipeline::new();
let intent_result = match pipeline.intent_analysis(&self.scheduler, input).await {
Ok(result) => result,
Err(e) => {
tracing::warn!("Intent analysis failed: {}, using raw input", e);
IntentAnalysisResult {
categories: vec![],
clean_intent: input.to_string(),
}
}
};
let clean_intent = &intent_result.clean_intent;
let categories = &intent_result.categories;
let workflow_result = if categories.is_empty() {
pipeline
.workflow_execution(
self.workflow_mode,
&self.workflow_executor,
&self.scheduler,
clean_intent,
callback,
)
.await
} else {
let result = self
.workflow_executor
.execute_with_categories(&self.scheduler, clean_intent, categories)
.await;
let json_output = match result {
WorkflowExecutionResult::Completed(output) => output,
WorkflowExecutionResult::CompletedWithRaw { raw_json, .. } => raw_json,
_ => String::new(),
};
WorkflowExecResult {
json_output,
original_input: clean_intent.to_string(),
}
};
if needs_format_conversion(input) {
let format_result = pipeline
.response_formatting(&self.scheduler, input, &workflow_result.json_output)
.await;
format_result.final_output
} else {
workflow_result.json_output
}
}
pub async fn execute_batch(
&self,
inputs: Vec<(String, Option<Arc<dyn WorkflowCallback>>)>,
) -> Vec<String> {
let mut results = Vec::new();
for (input, callback) in inputs {
let result = self.execute(&input, callback).await;
results.push(result);
}
results
}
pub async fn heartbeat(&self) -> String {
let mut messages: Vec<ChatMessage> = Vec::new();
messages.push(ChatMessage::user("hi"));
match self.scheduler.get_llm().chat(messages).await {
Ok(response) => response,
Err(e) => format!("Error: {}", e),
}
}
pub fn get_task_status(&self, task_id: &str) -> Option<TaskStatus> {
futures::executor::block_on(tasks::get_task_status(task_id))
}
pub fn get_task(&self, task_id: &str) -> Option<tasks::Task> {
futures::executor::block_on(tasks::get_task(task_id))
}
pub fn cancel_task(&self, task_id: &str) -> bool {
futures::executor::block_on(tasks::cancel_task(task_id))
}
pub fn pause_task(&self, task_id: &str) -> bool {
futures::executor::block_on(tasks::pause_task(task_id))
}
pub fn resume_task(&self, task_id: &str) -> bool {
futures::executor::block_on(tasks::resume_task(task_id))
}
pub fn retry_task(&self, task_id: &str) -> bool {
futures::executor::block_on(tasks::retry_task(task_id))
}
pub fn list_atomic_skills(&self) -> String {
let skills = crate::executors::registry::list_skills();
if skills.is_empty() {
return t!("skill.no_skills_available").to_string();
}
let mut result = String::new();
for name in skills {
if let Some(skill) = crate::executors::registry::get_skill(&name) {
let emoji = match skill.category() {
"file" => "📁",
"net" => "🌐",
"math" => "🔢",
"time" => "🕐",
"system" => "💻",
"db" => "🗄️",
"devops" => "🚀",
"document" => "📄",
"message" => "💬",
"task" => "⏰",
_ => "⚙️",
};
result.push_str(&format!(
" {} - **{}**: {}\n",
emoji,
name,
skill.description()
));
}
}
result
}
pub fn list_skill_md_files(&self, skills_dir: &str) -> String {
match SkillLoader::load_all(skills_dir) {
Ok(skills) => {
if skills.is_empty() {
return t!("skill.no_skill_md_available").to_string();
}
let mut result = String::new();
for skill in skills {
let emoji = skill
.metadata
.as_ref()
.and_then(|m| m.emoji.as_ref())
.map(|e| e.as_str())
.unwrap_or("📋");
result.push_str(&format!(
" {} - **{}**: {}\n",
emoji, skill.name, skill.description
));
}
result
}
Err(e) => format!("{}: {}", t!("error.list_skills_failed"), e),
}
}
pub fn get_atomic_skill_names(&self) -> Vec<String> {
crate::executors::registry::list_skills()
}
pub fn get_skill_md_names(&self, skills_dir: &str) -> Vec<String> {
match SkillLoader::load_all(skills_dir) {
Ok(skills) => skills.into_iter().map(|s| s.name).collect(),
Err(_) => Vec::new(),
}
}
pub fn has_atomic_skills(&self) -> bool {
!crate::executors::registry::list_skills().is_empty()
}
pub fn executor(&self) -> &Executor {
&self.executor
}
pub fn scheduler(&self) -> &SkillScheduler {
&self.scheduler
}
pub fn workflow_mode(&self) -> WorkflowMode {
self.workflow_mode
}
pub fn update_config<F>(&self, f: F) -> anyhow::Result<()>
where
F: FnOnce(&mut HippoxConfig),
{
crate::config::update_config(f)
}
pub fn get_config(&self) -> HippoxConfig {
crate::config::get_config()
}
}