use clap::Parser;
use kowalski_core::agent::Agent;
use kowalski_core::config::Config;
use kowalski_core::tools::ToolCall;
use log::info;
use serde_json::json;
use std::collections::HashMap;
use std::fs;
use std::io::{self, Write};
use std::sync::Arc;
use tokio::sync::RwLock;
use kowalski_core::memory::consolidation::{Consolidator, MemoryWeaver};
#[derive(Parser, Debug)]
#[clap(
author,
version,
about = "Kowalski CLI — agents, memory, and MCP operators.",
long_about = "Operators: `run`, `config check`, `db migrate`, `doctor`, `mcp ping`, `mcp tools`, `federation ping-notify` (with `--features postgres`) (see --help on each)."
)]
struct Cli {
#[clap(subcommand)]
command: Option<Commands>,
#[clap(short, long)]
interactive: bool,
#[clap(short, long)]
config: Option<String>,
}
#[derive(Parser, Debug)]
enum Commands {
Create {
agent_type: String,
#[clap(short, long)]
prompt: Option<String>,
#[clap(short, long)]
temperature: Option<f32>,
#[clap(short, long)]
name: Option<String>,
#[clap(short, long)]
config: Option<String>,
},
Chat {
agent: String,
#[clap(short, long)]
prompt: Option<String>,
#[clap(short, long)]
temperature: Option<f32>,
#[clap(short, long)]
model: Option<String>,
},
List,
Agents,
Consolidate {
#[clap(long)]
delete: bool,
},
Mcp {
#[clap(subcommand)]
command: McpCommands,
},
Config {
#[clap(subcommand)]
command: ConfigCommands,
},
Db {
#[clap(subcommand)]
command: DbCommands,
},
Doctor {
#[clap(long)]
ollama_url: Option<String>,
},
Run {
#[clap(short, long)]
config: Option<String>,
},
Federation {
#[clap(subcommand)]
command: FederationCommands,
},
Extension {
#[clap(subcommand)]
command: ExtensionCommands,
},
AgentApp {
#[clap(subcommand)]
command: AgentAppCommands,
},
}
#[derive(Parser, Debug)]
enum ConfigCommands {
Check {
#[clap(default_value = "config.toml")]
path: String,
},
}
#[derive(Parser, Debug)]
enum DbCommands {
Migrate {
#[clap(long)]
url: Option<String>,
#[clap(short, long)]
config: Option<String>,
},
}
#[derive(Parser, Debug)]
enum FederationCommands {
PingNotify {
#[clap(short, long)]
config: Option<String>,
},
}
#[derive(Parser, Debug)]
enum McpCommands {
Ping {
#[clap(short, long)]
config: Option<String>,
},
Tools {
#[clap(short, long)]
config: Option<String>,
},
}
#[derive(Parser, Debug)]
enum ExtensionCommands {
List,
Run {
name: String,
#[clap(trailing_var_arg = true, allow_hyphen_values = true)]
args: Vec<String>,
},
}
#[derive(Parser, Debug)]
enum AgentAppCommands {
List {
#[clap(short, long)]
path: Option<String>,
},
Validate {
#[clap(short, long)]
path: Option<String>,
},
Run {
source: String,
#[clap(short, long)]
question: Option<String>,
#[clap(short, long)]
path: Option<String>,
#[clap(long)]
api: Option<String>,
},
Delegate {
capability: String,
source: String,
#[clap(short, long)]
question: Option<String>,
#[clap(long)]
api: Option<String>,
},
Worker {
agent_id: String,
#[clap(short, long)]
path: Option<String>,
#[clap(long)]
api: Option<String>,
#[clap(long)]
topic: Option<String>,
#[clap(long)]
role: Option<String>,
#[clap(long)]
capability: Option<String>,
},
Proof {
#[clap(short, long)]
path: Option<String>,
#[clap(long)]
api: Option<String>,
#[clap(long)]
agent_id: Option<String>,
#[clap(long)]
capability: Option<String>,
#[clap(long)]
source: Option<String>,
#[clap(long)]
question: Option<String>,
},
}
struct AgentManager {
agents: Arc<RwLock<HashMap<String, Box<dyn Agent + Send + Sync>>>>,
configs: Arc<RwLock<HashMap<String, Config>>>,
}
impl AgentManager {
fn new() -> Self {
Self {
agents: Arc::new(RwLock::new(HashMap::new())),
configs: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn create_agent_from_config(
&self,
config_path: &str,
) -> Result<String, Box<dyn std::error::Error>> {
use kowalski_cli::config::AgentConfig;
use std::path::Path;
let agent_config = AgentConfig::load_from_file(Path::new(config_path))
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
println!(
"Loading agent '{}' of type '{}'...",
agent_config.name, agent_config.agent_type
);
self.create_agent(
agent_config.name.clone(),
&agent_config.agent_type,
agent_config.system_prompt.as_deref(),
agent_config.temperature,
)
.await?;
Ok(agent_config.name)
}
async fn create_agent(
&self,
name: String,
agent_type: &str,
_prompt: Option<&str>,
_temperature: Option<f32>,
) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::default();
use kowalski_core::template::default::DefaultTemplate;
let builder = DefaultTemplate::create_agent(vec![], None, Some(0.7)).await?;
let mut template_agent = builder.build().await?;
template_agent.base_mut().set_system_prompt(&format!(
"Starting generic agent (was requested type: {})",
agent_type
));
let agent: Box<dyn Agent + Send + Sync> = Box::new(template_agent);
self.agents.write().await.insert(name.clone(), agent);
self.configs.write().await.insert(name, config);
Ok(())
}
async fn get_agent_mut(
&self,
name: &str,
) -> Option<tokio::sync::RwLockWriteGuard<'_, HashMap<String, Box<dyn Agent + Send + Sync>>>>
{
let guard = self.agents.write().await;
if guard.contains_key(name) {
Some(guard)
} else {
None
}
}
async fn get_config(&self, name: &str) -> Option<Config> {
self.configs.read().await.get(name).cloned()
}
async fn list_agents(&self) -> Result<(), Box<dyn std::error::Error>> {
let agents = self.agents.read().await;
println!("Active agents:");
for (name, _) in agents.iter() {
println!("- {}", name);
}
Ok(())
}
}
async fn run_mcp_ping(config_path: Option<&str>) -> Result<(), Box<dyn std::error::Error>> {
use kowalski_cli::config::load_mcp_config_from_file;
let path = kowalski_cli::ops::mcp_config_path(config_path);
let mcp = load_mcp_config_from_file(&path)?;
if mcp.servers.is_empty() {
println!(
"No MCP servers under [mcp] in {}. Add [[mcp.servers]] entries (see comments in config.toml).",
path.display()
);
return Ok(());
}
println!(
"MCP ping — {} ({} server(s))\n",
path.display(),
mcp.servers.len()
);
let results = kowalski_cli::ops::mcp_ping_results(&path).await?;
for r in results {
print!(" {} <{}> [{}] ... ", r.name, r.url, r.transport);
io::stdout().flush()?;
if r.ok {
println!("OK — {} tool(s)", r.tool_count.unwrap_or(0));
} else {
let err = r.error.as_deref().unwrap_or("");
if err.starts_with("tools/list:") {
println!("partial — {}", err);
} else {
println!("FAILED — {}", err);
}
}
}
Ok(())
}
async fn run_mcp_tools(config_path: Option<&str>) -> Result<(), Box<dyn std::error::Error>> {
use kowalski_cli::config::load_mcp_config_from_file;
let path = kowalski_cli::ops::mcp_config_path(config_path);
let mcp = load_mcp_config_from_file(&path)?;
if mcp.servers.is_empty() {
println!(
"No MCP servers under [mcp] in {}. Add [[mcp.servers]] entries.",
path.display()
);
return Ok(());
}
println!(
"MCP tools — {} ({} server(s))\n",
path.display(),
mcp.servers.len()
);
for server in &mcp.servers {
let loc = if server.url.trim().is_empty() {
server.command.join(" ")
} else {
server.url.clone()
};
println!(
"[{}] {} ({})",
server.name,
loc,
match server.transport {
kowalski_core::config::McpTransport::Http => "http",
kowalski_core::config::McpTransport::Sse => "sse",
kowalski_core::config::McpTransport::Stdio => "stdio",
}
);
let tools_result = if matches!(server.transport, kowalski_core::config::McpTransport::Stdio)
{
match kowalski_core::McpStdioClient::connect(server).await {
Ok(c) => c.list_tools().await,
Err(e) => Err(e),
}
} else {
match kowalski_core::mcp::McpClient::connect_server(server).await {
Ok(client) => {
if let Some(sid) = client.session_id() {
println!(" Session: {}", sid);
}
client.list_tools().await
}
Err(e) => Err(e),
}
};
match tools_result {
Ok(tools) => {
if tools.is_empty() {
println!(" (no tools reported)");
}
for t in &tools {
let desc = t.description.trim();
let short = if desc.len() > 120 {
format!("{}…", &desc[..120])
} else {
desc.to_string()
};
println!(" • {} — {}", t.name, short);
}
}
Err(e) => println!(" FAILED: {}", e),
}
println!();
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let cli = Cli::parse();
let manager = AgentManager::new();
let mut active_agent_name = None;
if let Some(config_path) = &cli.config {
match manager.create_agent_from_config(config_path).await {
Ok(name) => active_agent_name = Some(name),
Err(e) => {
eprintln!("Error loading config: {}", e);
return Err(e);
}
}
}
if cli.interactive {
println!("Starting Kowalski in interactive mode...");
let agent_name = active_agent_name.unwrap_or_else(|| {
"default".to_string()
});
if manager.get_agent_mut(&agent_name).await.is_none() {
manager
.create_agent(agent_name.clone(), "web", None, None)
.await?;
}
let mut agents_guard = manager.get_agent_mut(&agent_name).await.unwrap();
if let Some(agent) = agents_guard.remove(&agent_name) {
let mut session = kowalski_cli::interactive::InteractiveSession::new(agent, "llama3");
session.run().await?;
return Ok(());
}
}
match cli.command {
Some(Commands::Create {
agent_type,
prompt,
temperature,
name,
config,
}) => {
if let Some(config_path) = config {
manager.create_agent_from_config(&config_path).await?;
} else {
let name = name.unwrap_or_else(|| format!("{}-agent", agent_type));
manager
.create_agent(name, &agent_type, prompt.as_deref(), temperature)
.await?;
}
}
Some(Commands::Chat { agent, .. }) => {
let agents_guard = manager.get_agent_mut(&agent).await;
if let Some(mut agents_guard) = agents_guard {
if let Some(agent_ref) = agents_guard.get_mut(&agent) {
let config = manager
.get_config(&agent)
.await
.unwrap_or_else(Config::default);
let conv_id = agent_ref.start_conversation(&config.ollama.model);
println!(
"Chat session started with agent '{}'. Type /bye to end chat.",
agent
);
println!("Model in use: {}", config.ollama.model);
let tools = agent_ref.list_tools().await;
if !tools.is_empty() {
info!("Registered tools:");
for (name, desc) in tools {
info!(" - {}: {}", name, desc);
}
} else {
info!("No tools registered or tool listing not available.");
}
chat_loop(agent_ref, conv_id).await?;
} else {
println!("Agent '{}' not found.", agent);
}
} else {
println!("Agent '{}' not found.", agent);
}
}
Some(Commands::List) => list_agents()?,
Some(Commands::Agents) => manager.list_agents().await?,
Some(Commands::Mcp { command }) => match command {
McpCommands::Ping {
config: config_path,
} => {
run_mcp_ping(config_path.as_deref()).await?;
}
McpCommands::Tools {
config: config_path,
} => {
run_mcp_tools(config_path.as_deref()).await?;
}
},
Some(Commands::Config { command }) => match command {
ConfigCommands::Check { path } => {
kowalski_cli::ops::run_config_check(std::path::Path::new(&path))?;
}
},
Some(Commands::Db { command }) => match command {
DbCommands::Migrate { url, config } => {
kowalski_cli::ops::run_db_migrate(url, config).await?;
}
},
Some(Commands::Doctor { ollama_url }) => {
kowalski_cli::ops::run_doctor(ollama_url).await?;
}
Some(Commands::Run { config }) => {
kowalski_cli::run_ops::run_orchestrator(config.as_deref()).await?;
}
Some(Commands::Federation { command }) => match command {
FederationCommands::PingNotify { config } => {
kowalski_cli::federation_ops::run_ping_notify(config.as_deref()).await?;
}
},
Some(Commands::Extension { command }) => match command {
ExtensionCommands::List => {
let items = kowalski_cli::extension_ops::list_extensions()?;
if items.is_empty() {
println!("No extensions found.");
println!(
"Install `kowalski-ext-<name>` in PATH or add `.kowalski/extensions/<name>/run`."
);
} else {
println!("Available extensions:");
for name in items {
println!("- {}", name);
}
}
}
ExtensionCommands::Run { name, args } => {
kowalski_cli::extension_ops::run_extension(&name, &args)?;
}
},
Some(Commands::AgentApp { command }) => match command {
AgentAppCommands::List { path } => {
kowalski_cli::agent_app_ops::list_agents(path.as_deref())?;
}
AgentAppCommands::Validate { path } => {
kowalski_cli::agent_app_ops::validate(path.as_deref())?;
}
AgentAppCommands::Run {
source,
question,
path,
api,
} => {
let out = tokio::task::spawn_blocking(move || {
kowalski_cli::agent_app_ops::run(
path.as_deref(),
&source,
question.as_deref(),
api.as_deref(),
)
.map_err(|e| e.to_string())
})
.await?;
if let Err(e) = out {
return Err(e.into());
}
}
AgentAppCommands::Delegate {
capability,
source,
question,
api,
} => {
let out = tokio::task::spawn_blocking(move || {
kowalski_cli::agent_app_ops::federate_delegate(
api.as_deref(),
&capability,
&source,
question.as_deref(),
)
.map_err(|e| e.to_string())
})
.await?;
if let Err(e) = out {
return Err(e.into());
}
}
AgentAppCommands::Worker {
agent_id,
path,
api,
topic,
role,
capability,
} => {
let out = tokio::task::spawn_blocking(move || {
kowalski_cli::agent_app_ops::federate_worker(
path.as_deref(),
api.as_deref(),
&agent_id,
topic.as_deref(),
role.as_deref(),
capability.as_deref(),
)
.map_err(|e| e.to_string())
})
.await?;
if let Err(e) = out {
return Err(e.into());
}
}
AgentAppCommands::Proof {
path,
api,
agent_id,
capability,
source,
question,
} => {
let out = tokio::task::spawn_blocking(move || {
kowalski_cli::agent_app_ops::proof_check(
path.as_deref(),
api.as_deref(),
agent_id.as_deref(),
capability.as_deref(),
source.as_deref(),
question.as_deref(),
)
.map_err(|e| e.to_string())
})
.await?;
if let Err(e) = out {
return Err(e.into());
}
}
},
Some(Commands::Consolidate { delete }) => {
let config = Config::default();
let ollama_model = &config.ollama.model;
let llm_provider: std::sync::Arc<dyn kowalski_core::llm::LLMProvider> =
std::sync::Arc::new(kowalski_core::llm::OllamaProvider::new(
&config.ollama.host,
config.ollama.port,
));
kowalski_core::db::run_memory_migrations_if_configured(&config).await?;
let mut weaver = Consolidator::new(&config.memory, llm_provider, ollama_model).await?;
weaver.run(delete).await?;
println!("Memory consolidation complete.");
}
None => {
println!("Kowalski CLI Interactive Mode. Type 'help' for commands.");
repl(manager).await?;
}
}
Ok(())
}
async fn chat_loop(
agent: &mut Box<dyn Agent + Send + Sync>,
mut conv_id: String,
) -> Result<(), Box<dyn std::error::Error>> {
let agent_name = agent.name().to_lowercase();
println!("Agent name: '{}'", agent_name);
loop {
print!("You: ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input_trimmed = input.trim();
if input_trimmed.eq_ignore_ascii_case("/bye") {
println!("Goodbye!");
break;
}
if input_trimmed.starts_with("/save") {
let filename = input_trimmed.strip_prefix("/save").unwrap().trim();
if filename.is_empty() {
println!("Usage: /save <filename>");
} else {
match agent.export_conversation(&conv_id) {
Ok(json) => {
let _ = fs::create_dir_all("sessions");
let path = format!("sessions/{}.json", filename);
if let Err(e) = fs::write(&path, json) {
eprintln!("Failed to write session file: {}", e);
} else {
println!("Conversation saved to {}", path);
}
}
Err(e) => eprintln!("Failed to save conversation: {}", e),
}
}
continue;
}
if input_trimmed.starts_with("/load") {
let filename = input_trimmed.strip_prefix("/load").unwrap().trim();
if filename.is_empty() {
println!("Usage: /load <filename>");
} else {
let path = format!("sessions/{}.json", filename);
match fs::read_to_string(&path) {
Ok(json) => match agent.import_conversation(&json) {
Ok(new_id) => {
conv_id = new_id;
println!("Conversation loaded. Current session ID: {}", conv_id);
}
Err(e) => eprintln!("Failed to import conversation: {}", e),
},
Err(e) => eprintln!("Failed to read session file: {}", e),
}
}
continue;
}
info!("Using tool-calling chat method");
match chat_with_tools(agent, &conv_id, &input).await {
Ok(_) => {
info!("Tool-calling chat completed successfully");
}
Err(e) => {
eprintln!("Tool-calling chat failed: {}", e);
use_regular_chat(agent, &conv_id, &input).await?;
}
}
}
Ok(())
}
async fn chat_with_tools(
agent: &mut Box<dyn Agent + Send + Sync>,
conv_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _response = agent.chat_with_tools(conv_id, input).await?;
io::stdout().flush()?;
Ok(())
}
async fn use_regular_chat(
agent: &mut Box<dyn Agent + Send + Sync>,
conv_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
agent.add_message(conv_id, "user", input).await;
let response = agent.chat_with_history(conv_id, input.trim(), None).await?;
println!("{}", response);
io::stdout().flush()?;
println!();
agent.add_message(conv_id, "assistant", input).await;
Ok(())
}
fn list_agents() -> Result<(), Box<dyn std::error::Error>> {
println!("Available agent types:");
println!("- web: Web research and information retrieval");
println!("- academic: Academic research and paper analysis");
println!("- code: Code analysis, refactoring, and documentation");
println!("- data: Data analysis and processing");
Ok(())
}
async fn repl(manager: AgentManager) -> Result<(), Box<dyn std::error::Error>> {
loop {
print!("kowalski> ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim();
if input.is_empty() {
continue;
}
let mut parts = input.split_whitespace();
let cmd = parts.next().unwrap_or("");
match cmd {
"exit" | "quit" | "bye" | "/bye" => {
println!("Exiting Kowalski CLI.");
break;
}
"help" => {
println!("Commands:");
println!(" create <type> [--name <name>]: Create an agent");
println!(" chat <name>: Chat with an agent");
println!(" list: List available agent types");
println!(" agents: List active agents");
println!(" bye | /bye : Exit the CLI");
println!();
println!("Operators (run outside this REPL):");
println!(" kowalski-cli mcp ping [-c config.toml] — health + tool count");
println!(" kowalski-cli mcp tools [-c config.toml] — list tools per server");
println!(" kowalski-cli config check [config.toml]");
println!(" kowalski-cli db migrate [--url] [-c config.toml]");
println!(" kowalski-cli doctor [--ollama-url URL]");
println!(
" kowalski-cli federation ping-notify [-c config.toml] — pg_notify smoke (needs --features postgres)"
);
println!(" kowalski-cli extension list");
println!(" kowalski-cli extension run <name> [-- <args...>]");
println!(" kowalski-cli agent-app <list|validate|run> [args]");
println!(
" kowalski — /api/federation/registry, /api/federation/stream (SSE), /api/federation/delegate; with --features postgres + memory.database_url, LISTEN kowalski_federation → broker"
);
}
"create" => {
let agent_type = parts.next();
let name = parts.next();
if let Some(agent_type) = agent_type {
let agent_name = match name {
Some(n) => n.to_string(),
None => format!("{}-agent", agent_type),
};
manager
.create_agent(agent_name.clone(), agent_type, None, None)
.await?;
println!("Agent created successfully: {}", agent_name);
} else {
println!("Usage: create <type> [name]");
}
}
"chat" => {
let name = parts.next();
if let Some(name) = name {
let agents_guard = manager.get_agent_mut(name).await;
if let Some(mut agents_guard) = agents_guard {
if let Some(agent_ref) = agents_guard.get_mut(name) {
let config = manager
.get_config(name)
.await
.unwrap_or_else(Config::default);
let conv_id = agent_ref.start_conversation(&config.ollama.model);
info!(
"Chat session started with agent '{}'. Type /bye to end chat.",
name
);
info!("[DEBUG] Model in use: {}", config.ollama.model);
let tools = agent_ref.list_tools().await;
if !tools.is_empty() {
info!("[DEBUG] Registered tools:");
for (name, desc) in tools {
info!(" - {}: {}", name, desc);
}
} else {
info!("[DEBUG] No tools registered or tool listing not available.");
}
chat_loop(agent_ref, conv_id.clone()).await?;
} else {
println!("Agent '{}' not found.", name);
}
} else {
println!("Agent '{}' not found.", name);
}
} else {
println!("Usage: chat <name>");
}
}
"list" => {
list_agents()?;
}
"agents" => {
manager.list_agents().await?;
}
_ => {
println!(
"Unknown command: {}. Type 'help' for a list of commands.",
cmd
);
}
}
}
Ok(())
}
#[allow(dead_code)]
fn rule_based_tool_call(user_input: &str) -> Option<ToolCall> {
let input = user_input.to_lowercase();
if input.contains("list")
&& input.contains("directory")
&& let Some(path) = input.split_whitespace().find(|w| w.starts_with('/'))
{
return Some(ToolCall {
name: "fs_tool".to_string(),
parameters: json!({ "task": "list_dir", "path": path }),
reasoning: Some("Rule-based: user asked to list a directory".to_string()),
});
}
if input.contains("first 10 lines")
&& input.contains(".csv")
&& let Some(path) = input.split_whitespace().find(|w| w.ends_with(".csv"))
{
return Some(ToolCall {
name: "fs_tool".to_string(),
parameters: json!({ "task": "get_file_first_lines", "path": path, "num_lines": 10 }),
reasoning: Some("Rule-based: user asked for first 10 lines of a CSV".to_string()),
});
}
None
}