use crate::core::tasks::NaturalLanguageTask;
use crate::prompts::{build_skill_md_prompt, generate_skills_registry};
use crate::skill_scheduler::SkillScheduler;
use crate::tasks::{self, ExecutableTask, TaskStatus};
use crate::workflow::{WorkflowCallback, WorkflowExecutionResult, WorkflowExecutor, WorkflowMode};
use crate::{
HippoxBatchResult, HippoxBoolResult, HippoxConfig, HippoxResult, HippoxStringResult,
HippoxVoidResult, IdentityInformation, IntentAnalysisResult, Pipeline, SystemPipeline,
WorkflowExecResult, get_config, i18n, needs_format_conversion, t, update_config,
};
use hippox_atomic_skills::{Executor, skill_registry};
use langhub::LLMClient;
use langhub::types::{ChatMessage, ModelProvider};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tracing::info;
pub static INPUT_TOKEN_COUNT: AtomicU64 = AtomicU64::new(0);
pub static OUTPUT_TOKEN_COUNT: AtomicU64 = AtomicU64::new(0);
#[derive(Clone)]
pub struct Hippox {
scheduler: SkillScheduler,
executor: Executor,
workflow_mode: WorkflowMode,
workflow_executor: WorkflowExecutor,
is_first_message: Arc<AtomicBool>,
}
impl Hippox {
pub async fn new(
provider: ModelProvider,
api_key: Option<String>,
extra_keys: Option<HashMap<String, String>>,
config: Option<HippoxConfig>,
) -> anyhow::Result<Self> {
Self::with_workflow_mode(
provider,
api_key,
extra_keys,
config,
WorkflowMode::default(),
)
.await
}
pub async fn with_workflow_mode(
provider: ModelProvider,
api_key: Option<String>,
extra_keys: Option<HashMap<String, String>>,
config: Option<HippoxConfig>,
workflow_mode: WorkflowMode,
) -> anyhow::Result<Self> {
info!(
"Initializing Hippox core with workflow mode: {}",
workflow_mode
);
update_config(|global| *global = config.unwrap_or_default())?;
let config = get_config();
i18n::set_language(&config.lang);
let llm = LLMClient::new_with_key(provider, api_key, extra_keys)?;
let scheduler = SkillScheduler::new(llm);
let executor = Executor::new();
let workflow_executor = WorkflowExecutor::new(workflow_mode);
Ok(Self {
scheduler,
executor,
workflow_mode,
workflow_executor,
is_first_message: Arc::new(AtomicBool::new(false)),
})
}
pub fn refresh_llm_skill_registry(&self) -> HippoxVoidResult {
self.is_first_message.store(false, Ordering::SeqCst);
HippoxResult::ok(())
}
pub fn refresh_llm_instances(&self) -> HippoxVoidResult {
self.is_first_message.store(false, Ordering::SeqCst);
HippoxResult::ok(())
}
pub fn get_skills_registry(&self) -> HippoxStringResult {
HippoxResult::ok(generate_skills_registry())
}
pub fn get_identity(&self) -> HippoxResult<IdentityInformation> {
HippoxResult::ok(self.get_config().identity_information)
}
pub fn update_identity<F>(&self, f: F) -> HippoxVoidResult
where
F: FnOnce(&mut IdentityInformation),
{
match self.update_config(|config| {
f(&mut config.identity_information);
}) {
Ok(_) => HippoxResult::ok(()),
Err(e) => HippoxResult::system_error(e.to_string()),
}
}
pub fn set_identity(&self, identity: IdentityInformation) -> HippoxVoidResult {
match self.update_config(|config| {
config.identity_information = identity;
}) {
Ok(_) => HippoxResult::ok(()),
Err(e) => HippoxResult::system_error(e.to_string()),
}
}
pub fn submit(
&self,
input: &str,
callback: Option<Arc<dyn WorkflowCallback>>,
) -> HippoxStringResult {
let executable = Arc::new(NaturalLanguageTask::new(
input.to_string(),
self.workflow_executor.clone(),
self.scheduler.clone(),
));
let task_id = futures::executor::block_on(tasks::create_task_with_executable(
"natural_language".to_string(),
input.to_string(),
executable,
callback,
));
info!(
"Created natural language task: {} with input: {}",
task_id, input
);
HippoxResult::ok(task_id)
}
pub fn submit_batch(
&self,
inputs: Vec<(String, Option<String>, Option<Arc<dyn WorkflowCallback>>)>,
) -> HippoxBatchResult {
let task_ids: Vec<String> = inputs
.into_iter()
.map(|(input, _session_id, callback)| {
self.submit(&input, callback).unwrap_or(String::new())
})
.collect();
HippoxResult::ok(task_ids)
}
pub async fn execute_batch(
&self,
inputs: Vec<(String, Option<Arc<dyn WorkflowCallback>>)>,
) -> HippoxBatchResult {
let mut results = Vec::new();
for (input, callback) in inputs {
results.push(
self.execute(&input, callback)
.await
.unwrap_or(String::new()),
);
}
HippoxResult::ok(results)
}
pub async fn execute(
&self,
input: &str,
callback: Option<Arc<dyn WorkflowCallback>>,
) -> HippoxStringResult {
let temp_task_id = uuid::Uuid::new_v4().to_string();
{
let mut pool = tasks::TASK_POOL.write().await;
let task = tasks::Task::new("temp".to_string(), input.to_string());
pool.tasks.insert(temp_task_id.clone(), task);
}
let pipeline = SystemPipeline::new();
let intent_result = match pipeline
.intent_analysis(&self.scheduler, input, &temp_task_id)
.await
{
Ok(result) => result,
Err(e) => {
tracing::warn!("Intent analysis failed: {}, using raw input", e);
IntentAnalysisResult {
categories: vec![],
clean_intent: input.to_string(),
}
}
};
let clean_intent = &intent_result.clean_intent;
let categories = &intent_result.categories;
let workflow_executor_with_id = self
.workflow_executor
.clone()
.with_task_id(temp_task_id.clone());
let workflow_result = if categories.is_empty() {
pipeline
.workflow_execution(
self.workflow_mode,
&workflow_executor_with_id,
&self.scheduler,
clean_intent,
callback,
)
.await
} else {
let result = workflow_executor_with_id
.execute_with_categories(&self.scheduler, clean_intent, categories)
.await;
let json_output = match result {
WorkflowExecutionResult::Completed(output) => output,
WorkflowExecutionResult::CompletedWithRaw { raw_json, .. } => raw_json,
_ => String::new(),
};
WorkflowExecResult {
json_output,
original_input: clean_intent.to_string(),
}
};
let final_output = if needs_format_conversion(input) {
let format_result = pipeline
.response_formatting(
&self.scheduler,
input,
&workflow_result.json_output,
&temp_task_id,
)
.await;
format_result.final_output
} else {
workflow_result.json_output
};
let (input_tokens, output_tokens) = if let Some(task) = tasks::get_task(&temp_task_id).await
{
(task.input_token_count, task.output_token_count)
} else {
(0, 0)
};
{
let mut pool = tasks::TASK_POOL.write().await;
pool.tasks.remove(&temp_task_id);
}
INPUT_TOKEN_COUNT.fetch_add(input_tokens, std::sync::atomic::Ordering::Relaxed);
OUTPUT_TOKEN_COUNT.fetch_add(output_tokens, std::sync::atomic::Ordering::Relaxed);
HippoxResult::ok_with_tokens(final_output, input_tokens, output_tokens)
}
pub async fn heartbeat(&self) -> HippoxStringResult {
let mut messages: Vec<ChatMessage> = Vec::new();
messages.push(ChatMessage::user("hi"));
match self.scheduler.chat_raw(messages).await {
Ok(result) => {
let usage = result.extract_usage();
let input_tokens = usage.as_ref().map(|u| u.prompt_tokens as u64).unwrap_or(0);
let output_tokens = usage
.as_ref()
.map(|u| u.completion_tokens as u64)
.unwrap_or(0);
INPUT_TOKEN_COUNT.fetch_add(input_tokens, std::sync::atomic::Ordering::Relaxed);
OUTPUT_TOKEN_COUNT.fetch_add(output_tokens, std::sync::atomic::Ordering::Relaxed);
HippoxResult::ok_with_tokens(result.text, input_tokens, output_tokens)
}
Err(e) => HippoxResult::network_error(e.to_string()),
}
}
pub fn get_task_status(&self, task_id: &str) -> HippoxResult<TaskStatus> {
match futures::executor::block_on(tasks::get_task_status(task_id)) {
Some(status) => HippoxResult::ok(status),
None => HippoxResult::system_error(format!("Task not found: {}", task_id)),
}
}
pub fn get_task(&self, task_id: &str) -> HippoxResult<tasks::Task> {
match futures::executor::block_on(tasks::get_task(task_id)) {
Some(task) => HippoxResult::ok(task),
None => HippoxResult::system_error(format!("Task not found: {}", task_id)),
}
}
pub fn cancel_task(&self, task_id: &str) -> HippoxBoolResult {
match futures::executor::block_on(tasks::cancel_task(task_id)) {
true => HippoxResult::ok(true),
false => HippoxResult::system_error(format!("Failed to cancel task: {}", task_id)),
}
}
pub fn pause_task(&self, task_id: &str) -> HippoxBoolResult {
match futures::executor::block_on(tasks::pause_task(task_id)) {
true => HippoxResult::ok(true),
false => HippoxResult::system_error(format!("Failed to pause task: {}", task_id)),
}
}
pub fn resume_task(&self, task_id: &str) -> HippoxBoolResult {
match futures::executor::block_on(tasks::resume_task(task_id)) {
true => HippoxResult::ok(true),
false => HippoxResult::system_error(format!("Failed to resume task: {}", task_id)),
}
}
pub fn retry_task(&self, task_id: &str) -> HippoxBoolResult {
match futures::executor::block_on(tasks::retry_task(task_id)) {
true => HippoxResult::ok(true),
false => HippoxResult::system_error(format!("Failed to retry task: {}", task_id)),
}
}
pub fn list_atomic_skills(&self) -> HippoxStringResult {
let skills = skill_registry::list_skills();
if skills.is_empty() {
return HippoxResult::ok(t!("skill.no_skills_available").to_string());
}
let mut result = String::new();
for name in skills {
if let Some(skill) = skill_registry::get_skill(&name) {
let emoji = match skill.category() {
"file" => "📁",
"net" => "🌐",
"math" => "🔢",
"time" => "🕐",
"system" => "💻",
"db" => "🗄️",
"devops" => "🚀",
"document" => "📄",
"message" => "💬",
"task" => "⏰",
_ => "⚙️",
};
result.push_str(&format!(
" {} - **{}**: {}\n",
emoji,
name,
skill.description()
));
}
}
HippoxResult::ok(result)
}
pub fn get_atomic_skill_names(&self) -> HippoxBatchResult {
HippoxResult::ok(skill_registry::list_skills())
}
pub fn has_atomic_skills(&self) -> HippoxBoolResult {
HippoxResult::ok(!skill_registry::list_skills().is_empty())
}
pub fn executor(&self) -> &Executor {
&self.executor
}
pub fn scheduler(&self) -> &SkillScheduler {
&self.scheduler
}
pub fn workflow_mode(&self) -> WorkflowMode {
self.workflow_mode
}
pub fn update_config<F>(&self, f: F) -> anyhow::Result<()>
where
F: FnOnce(&mut HippoxConfig),
{
crate::config::update_config(f)
}
pub fn get_config(&self) -> HippoxConfig {
crate::config::get_config()
}
pub fn get_current_input_token_count(&self) -> u64 {
INPUT_TOKEN_COUNT.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_current_output_token_count(&self) -> u64 {
OUTPUT_TOKEN_COUNT.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn storage_task_pool(&self, path: String) -> HippoxVoidResult {
let tasks = futures::executor::block_on(tasks::get_all_tasks(None));
let completed_tasks: Vec<tasks::Task> = tasks
.into_iter()
.filter(|task| {
matches!(
task.status,
TaskStatus::Completed
| TaskStatus::Failed
| TaskStatus::Cancelled
| TaskStatus::Timeout
)
})
.collect();
let json_data = json!({
"export_time": chrono::Local::now().to_rfc3339(),
"total_count": completed_tasks.len(),
"tasks": completed_tasks.iter().map(|task| {
json!({
"id": task.id,
"task_type": task.task_type,
"input": task.input,
"status": format!("{:?}", task.status),
"final_output": task.final_output,
"error": task.error,
"created_at": task.created_at,
"started_at": task.started_at,
"completed_at": task.completed_at,
"duration_ms": task.duration_ms,
"input_token_count": task.input_token_count,
"output_token_count": task.output_token_count,
"steps": task.steps.iter().map(|step| {
json!({
"skill_name": step.skill_name,
"status": format!("{:?}", step.status),
"output": step.output,
"error": step.error,
"duration_ms": step.duration_ms,
})
}).collect::<Vec<_>>(),
})
}).collect::<Vec<_>>(),
});
let json_string = match serde_json::to_string_pretty(&json_data) {
Ok(s) => s,
Err(e) => {
return HippoxResult::system_error(format!("Failed to serialize tasks: {}", e));
}
};
match fs::write(&path, json_string) {
Ok(_) => HippoxResult::ok(()),
Err(e) => HippoxResult::system_error(format!("Failed to write file {}: {}", path, e)),
}
}
}