#![cfg(not(target_arch = "wasm32"))]
pub mod feishu;
pub mod telegram;
pub mod wechat;
use anyhow::{anyhow, Result};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::core::context::{WorkflowContext, WorkflowEvent};
use crate::core::executor::WorkflowExecutor;
use crate::core::parser::GraphParser;
use crate::services::config::JuglansConfig;
use crate::services::local_runtime::LocalRuntime;
use crate::services::prompt_loader::PromptRegistry;
pub struct PlatformMessage {
pub event_type: String,
pub event_data: Value,
pub platform_user_id: String,
pub platform_chat_id: String,
pub text: String,
pub username: Option<String>,
pub platform: String,
}
pub struct BotReply {
pub text: String,
}
#[async_trait::async_trait]
pub trait ToolExecutor: Send + Sync {
async fn execute(&self, tool_name: &str, args: Value) -> Result<String>;
}
pub async fn run_agent_for_message(
config: &JuglansConfig,
project_root: &Path,
agent_slug: &str,
message: &PlatformMessage,
tool_executor: Option<&dyn ToolExecutor>,
) -> Result<BotReply> {
let wf_path = {
let jg_pattern = project_root
.join(format!("**/{}.jg", agent_slug))
.to_string_lossy()
.to_string();
glob::glob(&jg_pattern)
.ok()
.and_then(|mut paths| paths.find_map(|p| p.ok()))
.ok_or_else(|| anyhow!("Workflow '{}' not found in workspace", agent_slug))?
};
let wf_content = fs::read_to_string(&wf_path)
.map_err(|e| anyhow!("Workflow File Error: {} (tried {:?})", e, wf_path))?;
let runtime: Arc<LocalRuntime> = Arc::new(LocalRuntime::new_with_config(&config.ai));
let mut prompt_registry = PromptRegistry::new();
let _ = prompt_registry.load_from_paths(&[
project_root.join("**/*.jgx").to_string_lossy().to_string(),
project_root
.join("**/*.jgprompt")
.to_string_lossy()
.to_string(),
]);
let mut executor =
WorkflowExecutor::new_with_debug(Arc::new(prompt_registry), runtime, config.debug.clone())
.await;
{
use crate::core::tool_loader::ToolLoader;
use crate::services::tool_registry::ToolRegistry;
let tool_pattern = project_root.join("**/*.json").to_string_lossy().to_string();
if let Ok(tools) = ToolLoader::load_from_glob(&tool_pattern, project_root) {
if !tools.is_empty() {
let mut registry = ToolRegistry::new();
registry.register_all(tools);
executor.set_tool_registry(Arc::new(registry));
}
}
}
let mut wf_graph = GraphParser::parse(&wf_content)?;
crate::core::macro_expand::expand_decorators(&mut wf_graph)?;
let parsed_workflow = Some(Arc::new(wf_graph));
if let Some(ref wf) = parsed_workflow {
executor.load_tools(wf).await;
executor.apply_limits(&config.limits);
if let Err(e) = executor.init_python_runtime(wf, config.limits.python_workers) {
warn!("Failed to initialize Python runtime: {}", e);
}
}
let executor = Arc::new(executor);
executor
.get_registry()
.set_executor(Arc::downgrade(&executor));
if let Err(e) = crate::services::history::init_global(&config.history) {
warn!("[history] init_global failed: {}", e);
}
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<WorkflowEvent>();
let ctx = WorkflowContext::with_sender(tx.clone());
let derived_chat_id = format!(
"{}:{}:{}",
message.platform, message.platform_chat_id, agent_slug
);
ctx.set("input.platform".into(), json!(message.platform))
.ok();
ctx.set("input.event_type".into(), json!(message.event_type))
.ok();
ctx.set("input.event_data".into(), message.event_data.clone())
.ok();
ctx.set("input.user_id".into(), json!(message.platform_user_id))
.ok();
ctx.set("input.chat_id".into(), json!(derived_chat_id)).ok();
ctx.set("input.text".into(), json!(message.text)).ok();
ctx.set("input.message".into(), json!(message.text)).ok(); ctx.set(
"input.platform_chat_id".into(),
json!(message.platform_chat_id),
)
.ok();
ctx.set(
"input.platform_user_id".into(),
json!(message.platform_user_id),
)
.ok();
if let Some(ref username) = message.username {
ctx.set("input.username".into(), json!(username)).ok();
}
if let Ok(config_value) = serde_json::to_value(config) {
ctx.set("config".to_string(), config_value).ok();
}
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&message.text) {
if let Some(obj) = parsed.as_object() {
for (k, v) in obj {
ctx.set(format!("input.{}", k), v.clone()).ok();
}
}
}
let executor_clone = executor.clone();
let agent_slug_owned = agent_slug.to_string();
let exec_handle = tokio::spawn(async move {
let result = if let Some(workflow) = parsed_workflow {
executor_clone.execute_graph(workflow, &ctx).await
} else {
let mut params = HashMap::new();
params.insert("agent".to_string(), agent_slug_owned);
params.insert("message".to_string(), "$input.message".to_string());
executor_clone
.execute_tool_internal("chat", ¶ms, &ctx)
.await
.map(|_| ())
};
if let Err(e) = result {
error!("Bot execution error: {}", e);
let _ = tx.send(WorkflowEvent::Error(e.to_string()));
}
});
let mut reply_text = String::new();
while let Some(event) = rx.recv().await {
match event {
WorkflowEvent::Token(t) => {
reply_text.push_str(&t);
}
WorkflowEvent::Status(s) => {
debug!("[Bot Status] {}", s);
}
WorkflowEvent::Error(e) => {
if reply_text.is_empty() {
reply_text = format!("Error: {}", e);
}
}
WorkflowEvent::ToolCall {
tools, result_tx, ..
} => {
if let Some(executor) = tool_executor {
let mut results = vec![];
for tool in &tools {
let tool_name = tool["name"]
.as_str()
.or_else(|| tool.pointer("/function/name").and_then(|v| v.as_str()))
.unwrap_or("unknown");
let args_str = tool["arguments"]
.as_str()
.or_else(|| {
tool.pointer("/function/arguments").and_then(|v| v.as_str())
})
.unwrap_or("{}");
let tool_call_id = tool["id"].as_str().unwrap_or("").to_string();
let args: Value = serde_json::from_str(args_str).unwrap_or(json!({}));
info!("🔧 [Bot] Executing tool: {}({})", tool_name, args_str);
let content = match executor.execute(tool_name, args).await {
Ok(result) => result,
Err(e) => {
error!("🔧 [Bot] Tool {} failed: {}", tool_name, e);
format!("Error: {}", e)
}
};
results.push(crate::core::context::ToolResultPayload {
tool_call_id,
content,
});
}
let _ = result_tx.send((results, None));
} else {
warn!("[Bot] Client tool call received but no executor available, skipping");
let _ = result_tx.send((vec![], None));
}
}
WorkflowEvent::Meta(_)
| WorkflowEvent::Yield(_)
| WorkflowEvent::ToolStart(_)
| WorkflowEvent::ToolComplete(_)
| WorkflowEvent::NodeStart(_)
| WorkflowEvent::NodeComplete(_) => {
}
}
}
let _ = exec_handle.await;
if reply_text.is_empty() {
if let Ok(Some(val)) = WorkflowContext::new().resolve_path("reply.output") {
if let Some(s) = val.as_str() {
reply_text = s.to_string();
}
}
}
if reply_text.is_empty() {
reply_text = "(No response)".to_string();
}
Ok(BotReply { text: reply_text })
}