use crate::{
CrabTalk,
hooks::{Memory, delegate},
storage::FsStorage,
system::{SharedRuntime, hook::CompositeHook},
system::{event, host::SystemEnv},
};
use anyhow::Result;
use crabllm_core::Provider;
use crabllm_provider::{ProviderRegistry, RemoteProvider};
use mcp::McpHandler;
use runtime::{Hook, Runtime};
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
sync::{Arc, OnceLock},
};
use tokio::sync::{RwLock, broadcast};
use wcore::{LlmConfig, ResolvedDirs, model::Model, resolve_dirs, storage::Storage};
pub type DefaultProvider = crate::provider::Retrying<ProviderRegistry<RemoteProvider>>;
pub type BuildProvider<P> =
Arc<dyn Fn(&wcore::Config, &[String]) -> Result<wcore::model::Model<P>> + Send + Sync>;
pub fn build_default_provider(
config: &wcore::Config,
models: &[String],
) -> Result<Model<DefaultProvider>> {
build_providers(config, models)
}
impl<P: Provider + 'static> CrabTalk<P> {
pub(crate) async fn build(
config: &wcore::Config,
config_dir: &Path,
build_provider: BuildProvider<P>,
) -> Result<Self> {
let runtime_once: Arc<OnceLock<SharedRuntime<P>>> = Arc::new(OnceLock::new());
let node_hook = CompositeHook::new(Arc::new(parking_lot::RwLock::new(BTreeMap::new())));
let (runtime, mcp, node_hook, tool_hook) = Self::build_all(
config,
config_dir,
&build_provider,
runtime_once.clone(),
node_hook,
)
.await?;
let shared_runtime: SharedRuntime<P> = Arc::new(RwLock::new(Arc::new(runtime)));
runtime_once
.set(shared_runtime.clone())
.unwrap_or_else(|_| panic!("runtime already initialized"));
{
let rebuild_runtime = shared_runtime.clone();
tokio::spawn(async move {
let rt = rebuild_runtime.read().await.clone();
if let Err(e) = rt.rebuild_session_index().await {
tracing::warn!("session index rebuild failed: {e}");
}
});
}
let fire_runtime = shared_runtime.clone();
let fire: event::FireCallback = Arc::new(move |sub, payload| {
let runtime = fire_runtime.clone();
let target_agent = sub.target_agent.clone();
let source = sub.source.clone();
let payload = payload.to_owned();
tokio::spawn(async move {
let rt = runtime.read().await.clone();
let sender = format!("event:{source}");
let conversation_id = match rt
.get_or_create_conversation(&target_agent, &sender)
.await
{
Ok(id) => id,
Err(e) => {
tracing::warn!(
"event fire: get_or_create_conversation(agent='{target_agent}'): {e}"
);
return;
}
};
if let Err(e) = rt.send_to(conversation_id, &payload, &sender, None).await {
tracing::warn!("event fire: send_to(agent='{target_agent}'): {e}");
}
});
});
let event_bus = event::EventBus::load(config_dir.to_path_buf(), fire);
let events = Arc::new(parking_lot::Mutex::new(event_bus));
{
let events_for_sink = events.clone();
let sink: crate::system::hook::EventSink =
Arc::new(move |source: &str, payload: &str| {
events_for_sink.lock().publish(source, payload);
});
node_hook.set_event_sink(sink);
}
Ok(Self {
runtime: shared_runtime,
hook: node_hook,
config_dir: config_dir.to_path_buf(),
started_at: std::time::Instant::now(),
events,
build_provider,
mcp,
tool_hook,
})
}
pub async fn reload(&self) -> Result<()> {
let config = wcore::Config::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
let runtime_once: Arc<OnceLock<SharedRuntime<P>>> = Arc::new(OnceLock::new());
runtime_once
.set(self.runtime.clone())
.unwrap_or_else(|_| panic!("runtime_once already set"));
let node_hook = CompositeHook::new(self.hook.scopes.clone());
let (mut new_runtime, _mcp, new_hook, _) = Self::build_all(
&config,
&self.config_dir,
&self.build_provider,
runtime_once,
node_hook,
)
.await?;
{
let old_runtime = self.runtime.read().await;
(**old_runtime).transfer_to(&mut new_runtime).await;
}
{
let events_for_sink = self.events.clone();
let sink: crate::system::hook::EventSink =
Arc::new(move |source: &str, payload: &str| {
events_for_sink.lock().publish(source, payload);
});
new_hook.set_event_sink(sink);
}
*self.runtime.write().await = Arc::new(new_runtime);
tracing::info!("configuration reloaded");
Ok(())
}
async fn build_all(
config: &wcore::Config,
config_dir: &Path,
build_provider: &BuildProvider<P>,
runtime_once: Arc<OnceLock<SharedRuntime<P>>>,
mut node_hook: CompositeHook,
) -> Result<(
Runtime<crate::system::SystemCfg<P>>,
Arc<McpHandler>,
Arc<CompositeHook>,
Arc<crate::hooks::tool::ToolHook>,
)> {
let dirs = resolve_dirs(config_dir);
let storage = Self::build_storage(config_dir, &dirs);
crate::storage::fs::migrate::migrate_settings(storage.as_ref()).await?;
let models = fetch_models(&config.llm).await;
let default_model = models.first().cloned().unwrap_or_default();
storage.scaffold(&default_model).await?;
let model = build_provider(config, &models)?;
let mcp_handler: Arc<McpHandler> = Arc::new(McpHandler::empty());
let (tool_hook, shared_memory) = Self::register_tools(
&mut node_hook,
storage.clone(),
config_dir,
mcp_handler.clone(),
config.env.clone(),
runtime_once,
)
.await?;
let node_hook = Arc::new(node_hook);
let (events_tx, _) = broadcast::channel(256);
let env = Arc::new(SystemEnv {
events_tx,
hook: node_hook.clone(),
});
let mut tools = wcore::ToolRegistry::new();
for schema in Hook::schema(node_hook.as_ref()) {
tools.insert(schema);
}
let runtime = Runtime::new(model, env, storage, shared_memory, tools);
runtime.set_models(models);
let mut runtime = runtime;
Self::register_agents(&mut runtime, &dirs).await?;
Ok((runtime, mcp_handler, node_hook, tool_hook))
}
fn build_storage(config_dir: &Path, dirs: &ResolvedDirs) -> Arc<FsStorage> {
let skill_roots: Vec<PathBuf> = dirs
.skill_dirs
.iter()
.filter(|dir| dir.exists())
.cloned()
.collect();
Arc::new(FsStorage::new(
config_dir.to_path_buf(),
config_dir.join("sessions"),
skill_roots,
))
}
async fn register_tools(
node_hook: &mut CompositeHook,
storage: Arc<FsStorage>,
config_dir: &Path,
mcp_handler: Arc<McpHandler>,
env_overlay: BTreeMap<String, String>,
runtime_once: Arc<OnceLock<SharedRuntime<P>>>,
) -> Result<(Arc<crate::hooks::tool::ToolHook>, runtime::SharedMemory)> {
let memory_wrapper = Memory::open(config_dir.join("memory.db"))?;
let shared_memory = memory_wrapper.shared();
let memory = Arc::new(memory_wrapper);
let scopes = node_hook.scopes.clone();
let skills = storage.list_skills().await.unwrap_or_default();
let tool_hook = Arc::new(crate::hooks::tool::ToolHook::new());
node_hook.register_hook("tool", tool_hook.clone());
node_hook.register_hook(
"memory",
Arc::new(crate::hooks::memory::MemoryHook::new(memory)),
);
node_hook.register_hook(
"sessions",
Arc::new(crate::hooks::sessions::SessionsHook::<P>::new(
runtime_once.clone(),
)),
);
node_hook.register_hook(
"skill",
Arc::new(crate::hooks::skill::handler::SkillHook::new(
skills,
scopes.clone(),
)),
);
node_hook.register_hook(
"delegate",
Arc::new(delegate::DelegateHook::<P>::new(runtime_once)),
);
node_hook.register_hook(
"mcp",
Arc::new(crate::hooks::mcp::McpHook::new(mcp_handler, env_overlay)),
);
Ok((tool_hook, shared_memory))
}
async fn register_agents(
runtime: &mut Runtime<crate::system::SystemCfg<P>>,
dirs: &ResolvedDirs,
) -> Result<()> {
let stored_agents = runtime.storage().list_agents().await?;
let stored_names: std::collections::BTreeSet<String> =
stored_agents.iter().map(|a| a.name.clone()).collect();
for agent in stored_agents {
if agent.system_prompt.is_empty() {
tracing::warn!(name = %agent.name, "stored agent has no prompt — skipping");
continue;
}
if agent.model.is_empty() {
tracing::warn!(name = %agent.name, "stored agent has no model — skipping");
continue;
}
runtime.add_agent(agent);
}
for (name, agent) in &dirs.package_agents {
if stored_names.contains(name) {
continue;
}
let agent = agent.clone();
if agent.system_prompt.is_empty() {
tracing::warn!(name = %name, "package agent has no prompt — skipping");
continue;
}
if agent.model.is_empty() {
tracing::warn!(name = %name, "package agent has no model — skipping");
continue;
}
runtime.add_agent(agent);
}
Ok(())
}
}
fn build_providers(config: &wcore::Config, models: &[String]) -> Result<Model<DefaultProvider>> {
let llm = &config.llm;
let provider_cfg = crabllm_core::ProviderConfig {
kind: crabllm_core::ProviderKind::Anthropic,
base_url: (!llm.base_url.is_empty()).then(|| llm.base_url.clone()),
api_key: (!llm.api_key.is_empty()).then(|| llm.api_key.clone()),
models: models.to_vec(),
..Default::default()
};
let mut providers = std::collections::HashMap::new();
providers.insert("llm".to_owned(), provider_cfg);
let registry = ProviderRegistry::from_provider_configs(
&providers,
&std::collections::HashMap::new(),
|r| r,
)?;
let retrying = crate::provider::Retrying::new(registry);
tracing::info!(
"llm endpoint registered — {} models from {}",
models.len(),
llm.base_url
);
Ok(Model::new(retrying))
}
async fn fetch_models(llm: &LlmConfig) -> Vec<String> {
if llm.base_url.is_empty() {
tracing::warn!("no llm.base_url configured in config.toml — model list is empty");
return Vec::new();
}
let url = format!("{}/models", llm.base_url.trim_end_matches('/'));
let mut req = reqwest::Client::new().get(&url);
if !llm.api_key.is_empty() {
req = req.bearer_auth(&llm.api_key);
}
match fetch_models_inner(req).await {
Ok(models) => models,
Err(e) => {
tracing::warn!("failed to fetch {url}: {e}");
Vec::new()
}
}
}
async fn fetch_models_inner(req: reqwest::RequestBuilder) -> Result<Vec<String>> {
let body: serde_json::Value = req.send().await?.error_for_status()?.json().await?;
Ok(body
.get("data")
.and_then(|d| d.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.get("id").and_then(|i| i.as_str()).map(String::from))
.collect()
})
.unwrap_or_default())
}