1use anyhow::{Context, Result};
2use clap::{Parser, Subcommand, ValueEnum};
3use spec_ai_core::cli::CliState;
4use spec_ai_core::spec::AgentSpec;
5use std::path::PathBuf;
6use walkdir::WalkDir;
7
8#[cfg(feature = "api")]
9use {
10 spec_ai_api::api::server::{ApiConfig, ApiServer},
11 spec_ai_config::config::AgentRegistry,
12 spec_ai_config::persistence::Persistence,
13 spec_ai_core::tools::ToolRegistry,
14 std::sync::Arc,
15};
16
17#[derive(Parser)]
18#[command(name = "spec-ai")]
19#[command(about = "SpecAI - AI agent framework with spec execution", long_about = None)]
20struct Cli {
21 #[arg(short, long, global = true)]
23 config: Option<PathBuf>,
24
25 #[arg(
27 long = "mode",
28 value_enum,
29 num_args = 0..=1,
30 default_value = "new",
31 default_missing_value = "new",
32 global = true
33 )]
34 mode: TuiMode,
35
36 #[command(subcommand)]
37 command: Option<Commands>,
38}
39
40#[derive(Subcommand)]
41enum Commands {
42 Run {
44 #[arg(value_name = "SPEC_OR_DIR")]
46 specs: Vec<PathBuf>,
47 },
48 Server {
50 #[arg(short, long, default_value = "3000")]
52 port: u16,
53 #[arg(long, default_value = "127.0.0.1")]
55 host: String,
56 #[arg(long)]
58 join: Option<String>,
59 },
60}
61
62#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
63enum TuiMode {
64 New,
65 Legacy,
66}
67
68fn collect_spec_files(path: &PathBuf) -> Result<Vec<PathBuf>> {
69 let mut specs = Vec::new();
70
71 if path.is_file() {
72 if path.extension().and_then(|s| s.to_str()) == Some("spec") {
73 specs.push(path.clone());
74 } else {
75 eprintln!(
76 "Warning: Skipping '{}' (expected .spec extension)",
77 path.display()
78 );
79 }
80 } else if path.is_dir() {
81 for entry in WalkDir::new(path)
82 .follow_links(true)
83 .into_iter()
84 .filter_map(|e| e.ok())
85 {
86 if entry.file_type().is_file() {
87 if let Some(ext) = entry.path().extension() {
88 if ext == "spec" {
89 specs.push(entry.path().to_path_buf());
90 }
91 }
92 }
93 }
94 specs.sort();
95 } else {
96 anyhow::bail!("Path '{}' does not exist", path.display());
97 }
98
99 Ok(specs)
100}
101
102async fn run_spec_file(cli: &mut CliState, spec_path: &PathBuf) -> Result<bool> {
103 if !spec_path.exists() {
104 eprintln!("Error: Spec file '{}' not found", spec_path.display());
105 return Ok(false);
106 }
107
108 let abs_path = spec_path.canonicalize().with_context(|| {
109 format!(
110 "Failed to resolve absolute path for '{}'",
111 spec_path.display()
112 )
113 })?;
114
115 println!("=== Running spec: {} ===", abs_path.display());
116
117 let spec = AgentSpec::from_file(&abs_path)?;
118 let output = cli.agent.run_spec(&spec).await?;
119 cli.maybe_speak_response(&output.response);
120
121 println!("{}", output.response);
123
124 Ok(true)
127}
128
129#[cfg(feature = "api")]
130async fn start_server(
131 config_path: Option<PathBuf>,
132 host: String,
133 port: u16,
134 join: Option<String>,
135) -> Result<()> {
136 use spec_ai_api::api::mesh::MeshClient;
137 use spec_ai_config::config::AppConfig;
138 use spec_ai_core::embeddings::EmbeddingsClient;
139 use std::net::TcpListener;
140
141 let base_filter = "tower_http=debug";
144 let filter = match std::env::var("RUST_LOG") {
145 Ok(env_filter) if !env_filter.is_empty() => format!("{},{}", env_filter, base_filter),
146 _ => format!("spec_ai=info,{}", base_filter),
147 };
148 tracing_subscriber::fmt()
149 .with_env_filter(filter)
150 .with_target(true)
151 .init();
152
153 let instance_id = MeshClient::generate_instance_id();
155 println!("Instance ID: {}", instance_id);
156
157 if let Some(ref registry_addr) = join {
159 let mut test_port = port;
161 let max_attempts = 100;
162 for _ in 0..max_attempts {
163 if TcpListener::bind(format!("{}:{}", host, test_port)).is_ok() {
164 println!("Joining mesh at {} on port {}", registry_addr, test_port);
165 return start_mesh_member(
166 config_path,
167 host,
168 test_port,
169 registry_addr.clone(),
170 instance_id,
171 )
172 .await;
173 }
174 test_port += 1;
175 }
176 anyhow::bail!(
177 "Could not find available port after {} attempts",
178 max_attempts
179 );
180 }
181
182 match TcpListener::bind(format!("{}:{}", host, port)) {
184 Ok(_listener) => {
185 println!(
187 "Starting spec-ai server as mesh leader on {}:{}",
188 host, port
189 );
190 drop(_listener); }
192 Err(_) => {
193 println!(
195 "Port {} is in use. Checking for existing mesh registry...",
196 port
197 );
198 let health_url = format!("http://{}:{}/health", host, port);
199 match reqwest::get(&health_url).await {
200 Ok(response) if response.status().is_success() => {
201 println!("Found existing spec-ai mesh registry at {}:{}", host, port);
202 let mut test_port = port + 1;
204 let max_attempts = 100;
205 for _ in 0..max_attempts {
206 if TcpListener::bind(format!("{}:{}", host, test_port)).is_ok() {
207 println!("Joining mesh on port {}", test_port);
208 let registry_url = format!("{}:{}", host, port);
209 return start_mesh_member(
210 config_path,
211 host,
212 test_port,
213 registry_url,
214 instance_id,
215 )
216 .await;
217 }
218 test_port += 1;
219 }
220 anyhow::bail!(
221 "Could not find available port after {} attempts",
222 max_attempts
223 );
224 }
225 _ => {
226 eprintln!("Error: Port {} is in use by another process", port);
227 eprintln!("Please specify a different port with --port");
228 std::process::exit(1);
229 }
230 }
231 }
232 }
233
234 let app_config = if let Some(path) = config_path {
236 AppConfig::load_from_file(&path)?
237 } else {
238 AppConfig::load()?
239 };
240
241 let persistence = Persistence::new(&app_config.database.path)?;
243
244 let embeddings = if let Some(embeddings_model) = &app_config.model.embeddings_model {
246 if let Some(api_key_source) = &app_config.model.api_key_source {
247 let api_key = if api_key_source.starts_with("ENV:") {
249 std::env::var(&api_key_source[4..]).ok()
250 } else {
251 std::fs::read_to_string(api_key_source).ok()
252 };
253 if let Some(key) = api_key {
254 Some(EmbeddingsClient::with_api_key(
255 embeddings_model.clone(),
256 key,
257 ))
258 } else {
259 Some(EmbeddingsClient::new(embeddings_model.clone()))
260 }
261 } else {
262 Some(EmbeddingsClient::new(embeddings_model.clone()))
263 }
264 } else {
265 None
266 };
267
268 let agent_registry = Arc::new(AgentRegistry::new(
270 app_config.agents.clone(),
271 persistence.clone(),
272 ));
273 let tool_registry = Arc::new(ToolRegistry::with_builtin_tools(
274 Some(Arc::new(persistence.clone())),
275 embeddings,
276 ));
277
278 let api_config = ApiConfig::new()
280 .with_host(host.clone())
281 .with_port(port)
282 .with_cors(true);
283
284 let server = ApiServer::new(
285 api_config.clone(),
286 persistence.clone(),
287 agent_registry.clone(),
288 tool_registry.clone(),
289 app_config.clone(),
290 )?;
291
292 println!(
293 "Server running at https://{} (fingerprint: {})",
294 api_config.bind_address(),
295 server.certificate_fingerprint()
296 );
297 println!("Health check: https://{}/health", api_config.bind_address());
298 println!("Press Ctrl+C to stop the server");
299
300 let mesh_registry = server.mesh_registry();
302 let self_instance = spec_ai_api::api::mesh::MeshInstance {
303 instance_id: instance_id.clone(),
304 hostname: host.clone(),
305 port,
306 capabilities: vec!["registry".to_string(), "query".to_string()],
307 is_leader: true,
308 last_heartbeat: chrono::Utc::now(),
309 created_at: chrono::Utc::now(),
310 agent_profiles: agent_registry.list(),
311 };
312 mesh_registry.register(self_instance).await;
313
314 let heartbeat_instance_id = instance_id.clone();
316 let heartbeat_registry = mesh_registry.clone();
317 let heartbeat_interval = app_config.mesh.heartbeat_interval_secs;
318 tokio::spawn(async move {
319 let mut interval =
320 tokio::time::interval(tokio::time::Duration::from_secs(heartbeat_interval));
321 loop {
322 interval.tick().await;
323 let _ = heartbeat_registry.heartbeat(&heartbeat_instance_id).await;
324 }
325 });
326
327 let cleanup_registry = mesh_registry.clone();
329 let cleanup_timeout = app_config.mesh.leader_timeout_secs;
330 tokio::spawn(async move {
331 let mut interval =
332 tokio::time::interval(tokio::time::Duration::from_secs(cleanup_timeout / 2));
333 loop {
334 interval.tick().await;
335 cleanup_registry.cleanup_stale(cleanup_timeout).await;
336 }
337 });
338
339 let shutdown_instance_id = instance_id.clone();
341 let shutdown_registry = mesh_registry.clone();
342 let shutdown = async move {
343 tokio::signal::ctrl_c()
344 .await
345 .expect("Failed to install Ctrl+C handler");
346 println!("\nShutting down server...");
347 let _ = shutdown_registry.deregister(&shutdown_instance_id).await;
349 };
350
351 server.run_with_shutdown(shutdown).await?;
353
354 println!("Server stopped");
355 Ok(())
356}
357
358#[cfg(feature = "api")]
359async fn start_mesh_member(
360 config_path: Option<PathBuf>,
361 host: String,
362 port: u16,
363 registry_url: String,
364 instance_id: String,
365) -> Result<()> {
366 use spec_ai_api::api::mesh::MeshClient;
367 use spec_ai_config::config::AppConfig;
368 use spec_ai_core::embeddings::EmbeddingsClient;
369
370 println!("Starting as mesh member on {}:{}", host, port);
371 println!("Registry at: {}", registry_url);
372
373 let app_config = if let Some(path) = config_path {
375 AppConfig::load_from_file(&path)?
376 } else {
377 AppConfig::load()?
378 };
379
380 let persistence = Persistence::new(&app_config.database.path)?;
382
383 let embeddings = if let Some(embeddings_model) = &app_config.model.embeddings_model {
385 if let Some(api_key_source) = &app_config.model.api_key_source {
386 let api_key = if api_key_source.starts_with("ENV:") {
387 std::env::var(&api_key_source[4..]).ok()
388 } else {
389 std::fs::read_to_string(api_key_source).ok()
390 };
391 if let Some(key) = api_key {
392 Some(EmbeddingsClient::with_api_key(
393 embeddings_model.clone(),
394 key,
395 ))
396 } else {
397 Some(EmbeddingsClient::new(embeddings_model.clone()))
398 }
399 } else {
400 Some(EmbeddingsClient::new(embeddings_model.clone()))
401 }
402 } else {
403 None
404 };
405
406 let agent_registry = Arc::new(AgentRegistry::new(
408 app_config.agents.clone(),
409 persistence.clone(),
410 ));
411 let tool_registry = Arc::new(ToolRegistry::with_builtin_tools(
412 Some(Arc::new(persistence.clone())),
413 embeddings,
414 ));
415
416 let agent_profiles: Vec<String> = agent_registry.list();
418
419 let mesh_client = MeshClient::new(
421 ®istry_url.split(':').next().unwrap(),
422 registry_url.split(':').nth(1).unwrap().parse()?,
423 );
424
425 let register_response = mesh_client
426 .register(
427 instance_id.clone(),
428 host.clone(),
429 port,
430 vec!["query".to_string()],
431 agent_profiles,
432 )
433 .await?;
434
435 println!("Registered with mesh:");
436 println!(" Leader: {}", register_response.is_leader);
437 println!(" Peers: {}", register_response.peers.len());
438
439 let api_config = ApiConfig::new()
441 .with_host(host.clone())
442 .with_port(port)
443 .with_cors(true);
444
445 let server = ApiServer::new(
446 api_config.clone(),
447 persistence,
448 agent_registry,
449 tool_registry,
450 app_config.clone(),
451 )?;
452
453 println!(
454 "Server running at https://{} (fingerprint: {})",
455 api_config.bind_address(),
456 server.certificate_fingerprint()
457 );
458
459 let heartbeat_instance_id = instance_id.clone();
461 let heartbeat_client = mesh_client.clone();
462 let heartbeat_interval = app_config.mesh.heartbeat_interval_secs;
463 tokio::spawn(async move {
464 let mut interval =
465 tokio::time::interval(tokio::time::Duration::from_secs(heartbeat_interval));
466 loop {
467 interval.tick().await;
468 if let Err(e) = heartbeat_client
469 .heartbeat(&heartbeat_instance_id, None)
470 .await
471 {
472 eprintln!("Heartbeat failed: {}", e);
473 }
474 }
475 });
476
477 let shutdown_instance_id = instance_id.clone();
479 let shutdown_client = mesh_client.clone();
480 let shutdown = async move {
481 tokio::signal::ctrl_c()
482 .await
483 .expect("Failed to install Ctrl+C handler");
484 println!("\nShutting down server...");
485 if let Err(e) = shutdown_client.deregister(&shutdown_instance_id).await {
487 eprintln!("Failed to deregister: {}", e);
488 }
489 };
490
491 server.run_with_shutdown(shutdown).await?;
493
494 println!("Server stopped");
495 Ok(())
496}
497
498async fn run_specs_command(config_path: Option<PathBuf>, spec_paths: Vec<PathBuf>) -> Result<i32> {
499 let specs_to_run = if spec_paths.is_empty() {
501 let default_spec = PathBuf::from("../../../examples/spec/smoke.spec");
502 if !default_spec.exists() {
503 eprintln!("Error: Default spec not found at 'examples/spec/smoke.spec'.");
504 eprintln!("Please provide explicit spec files or create the default spec.");
505 return Ok(1);
506 }
507 vec![default_spec]
508 } else {
509 let mut all_specs = Vec::new();
510 for path in &spec_paths {
511 let specs = collect_spec_files(path)?;
512 all_specs.extend(specs);
513 }
514
515 if all_specs.is_empty() {
516 eprintln!("Error: No .spec files found in provided paths.");
517 return Ok(1);
518 }
519
520 all_specs
521 };
522
523 let mut cli = match CliState::initialize_with_path(config_path) {
525 Ok(cli) => cli,
526 Err(e) => {
527 let error_chain = format!("{:#}", e);
528 if error_chain.contains("Could not set lock")
529 || error_chain.contains("Conflicting lock")
530 {
531 eprintln!("Error: Another instance of spec-ai is already running.");
532 eprintln!();
533 eprintln!("Only one instance can access the database at a time.");
534 eprintln!("Please close the other instance or wait for it to finish.");
535 eprintln!();
536 eprintln!("To run multiple instances, configure a different database path");
537 eprintln!("in your config file: [database] path = \"~/.spec-ai/other.db\"");
538 std::process::exit(1);
539 }
540 return Err(e);
541 }
542 };
543
544 let mut all_success = true;
546 for spec_path in specs_to_run {
547 match run_spec_file(&mut cli, &spec_path).await {
548 Ok(success) => {
549 if !success {
550 all_success = false;
551 }
552 }
553 Err(e) => {
554 eprintln!("Error running spec '{}': {}", spec_path.display(), e);
555 all_success = false;
556 }
557 }
558 }
559
560 Ok(if all_success { 0 } else { 1 })
561}
562
563#[tokio::main]
564pub async fn run() -> Result<()> {
565 let cli = Cli::parse();
566
567 match cli.command {
568 Some(Commands::Run { specs }) => {
569 let exit_code = run_specs_command(cli.config, specs).await?;
570 std::process::exit(exit_code);
571 }
572 #[cfg(feature = "api")]
573 Some(Commands::Server { port, host, join }) => {
574 start_server(cli.config, host, port, join).await?;
575 Ok(())
576 }
577 #[cfg(not(feature = "api"))]
578 Some(Commands::Server { .. }) => {
579 eprintln!("Error: Server functionality requires the 'api' feature");
580 eprintln!("Please rebuild with: cargo build --features api");
581 std::process::exit(1);
582 }
583 None => match cli.mode {
584 TuiMode::New => {
585 spec_ai_tui_app::run_tui(cli.config).await?;
586 Ok(())
587 }
588 TuiMode::Legacy => run_repl_with_config(cli.config).await,
589 },
590 }
591}
592
593async fn run_repl_with_config(config: Option<PathBuf>) -> Result<()> {
594 let mut cli_state = match CliState::initialize_with_path(config) {
595 Ok(cli) => cli,
596 Err(e) => {
597 let error_chain = format!("{:#}", e);
598 if error_chain.contains("Could not set lock")
599 || error_chain.contains("Conflicting lock")
600 {
601 eprintln!("Error: Another instance of spec-ai is already running.");
602 eprintln!();
603 eprintln!("Only one instance can access the database at a time.");
604 eprintln!("Please close the other instance or wait for it to finish.");
605 eprintln!();
606 eprintln!("To run multiple instances, configure a different database path");
607 eprintln!("in your config file: [database] path = \"~/.spec-ai/other.db\"");
608 std::process::exit(1);
609 }
610 return Err(e);
611 }
612 };
613
614 let log_level = cli_state.config.logging.level.to_uppercase();
616 let default_directive = format!(
617 "spec_ai={},tower_http=debug",
618 log_level.to_lowercase()
619 );
620 let env_override = std::env::var("RUST_LOG").unwrap_or_default();
621 let combined_filter = if env_override.trim().is_empty() {
622 default_directive.clone()
623 } else if env_override.contains("spec_ai") {
624 env_override
625 } else {
626 format!("{},{}", env_override, default_directive)
627 };
628
629 tracing_subscriber::fmt()
630 .with_env_filter(combined_filter)
631 .with_target(true)
632 .init();
633
634 cli_state.run_repl().await
635}