spec_ai_cli/
lib.rs

1use anyhow::{Context, Result};
2use clap::{Parser, Subcommand, ValueEnum};
3use spec_ai_core::cli::CliState;
4use spec_ai_core::spec::AgentSpec;
5use std::path::PathBuf;
6use walkdir::WalkDir;
7
8#[cfg(feature = "api")]
9use {
10    spec_ai_api::api::server::{ApiConfig, ApiServer},
11    spec_ai_config::config::AgentRegistry,
12    spec_ai_config::persistence::Persistence,
13    spec_ai_core::tools::ToolRegistry,
14    std::sync::Arc,
15};
16
17#[derive(Parser)]
18#[command(name = "spec-ai")]
19#[command(about = "SpecAI - AI agent framework with spec execution", long_about = None)]
20struct Cli {
21    /// Path to config file
22    #[arg(short, long, global = true)]
23    config: Option<PathBuf>,
24
25    /// Launch mode. Defaults to the new TUI; use `--mode legacy` for the legacy REPL.
26    #[arg(
27        long = "mode",
28        value_enum,
29        num_args = 0..=1,
30        default_value = "new",
31        default_missing_value = "new",
32        global = true
33    )]
34    mode: TuiMode,
35
36    #[command(subcommand)]
37    command: Option<Commands>,
38}
39
40#[derive(Subcommand)]
41enum Commands {
42    /// Run one or more spec files
43    Run {
44        /// Spec files or directories to run. If not provided, uses examples/spec/smoke.spec
45        #[arg(value_name = "SPEC_OR_DIR")]
46        specs: Vec<PathBuf>,
47    },
48    /// Start the API server for agent mesh functionality
49    Server {
50        /// Port to bind the server to
51        #[arg(short, long, default_value = "3000")]
52        port: u16,
53        /// Host address to bind to
54        #[arg(long, default_value = "127.0.0.1")]
55        host: String,
56        /// Join existing mesh at specified address
57        #[arg(long)]
58        join: Option<String>,
59    },
60}
61
62#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
63enum TuiMode {
64    New,
65    Legacy,
66}
67
68fn collect_spec_files(path: &PathBuf) -> Result<Vec<PathBuf>> {
69    let mut specs = Vec::new();
70
71    if path.is_file() {
72        if path.extension().and_then(|s| s.to_str()) == Some("spec") {
73            specs.push(path.clone());
74        } else {
75            eprintln!(
76                "Warning: Skipping '{}' (expected .spec extension)",
77                path.display()
78            );
79        }
80    } else if path.is_dir() {
81        for entry in WalkDir::new(path)
82            .follow_links(true)
83            .into_iter()
84            .filter_map(|e| e.ok())
85        {
86            if entry.file_type().is_file() {
87                if let Some(ext) = entry.path().extension() {
88                    if ext == "spec" {
89                        specs.push(entry.path().to_path_buf());
90                    }
91                }
92            }
93        }
94        specs.sort();
95    } else {
96        anyhow::bail!("Path '{}' does not exist", path.display());
97    }
98
99    Ok(specs)
100}
101
102async fn run_spec_file(cli: &mut CliState, spec_path: &PathBuf) -> Result<bool> {
103    if !spec_path.exists() {
104        eprintln!("Error: Spec file '{}' not found", spec_path.display());
105        return Ok(false);
106    }
107
108    let abs_path = spec_path.canonicalize().with_context(|| {
109        format!(
110            "Failed to resolve absolute path for '{}'",
111            spec_path.display()
112        )
113    })?;
114
115    println!("=== Running spec: {} ===", abs_path.display());
116
117    let spec = AgentSpec::from_file(&abs_path)?;
118    let output = cli.agent.run_spec(&spec).await?;
119    cli.maybe_speak_response(&output.response);
120
121    // Print the response
122    println!("{}", output.response);
123
124    // If execution completes without throwing an error, consider it successful
125    // The agent will handle reporting any issues in the response
126    Ok(true)
127}
128
129#[cfg(feature = "api")]
130async fn start_server(
131    config_path: Option<PathBuf>,
132    host: String,
133    port: u16,
134    join: Option<String>,
135) -> Result<()> {
136    use spec_ai_api::api::mesh::MeshClient;
137    use spec_ai_config::config::AppConfig;
138    use spec_ai_core::embeddings::EmbeddingsClient;
139    use std::net::TcpListener;
140
141    // Initialize tracing subscriber for HTTP request logging
142    // Always include tower_http=debug for request logging, merge with RUST_LOG if set
143    let base_filter = "tower_http=debug";
144    let filter = match std::env::var("RUST_LOG") {
145        Ok(env_filter) if !env_filter.is_empty() => format!("{},{}", env_filter, base_filter),
146        _ => format!("spec_ai=info,{}", base_filter),
147    };
148    tracing_subscriber::fmt()
149        .with_env_filter(filter)
150        .with_target(true)
151        .init();
152
153    // Generate unique instance ID
154    let instance_id = MeshClient::generate_instance_id();
155    println!("Instance ID: {}", instance_id);
156
157    // Determine if we should join an existing mesh or start as leader
158    if let Some(ref registry_addr) = join {
159        // Explicit join - find an available port for ourselves
160        let mut test_port = port;
161        let max_attempts = 100;
162        for _ in 0..max_attempts {
163            if TcpListener::bind(format!("{}:{}", host, test_port)).is_ok() {
164                println!("Joining mesh at {} on port {}", registry_addr, test_port);
165                return start_mesh_member(
166                    config_path,
167                    host,
168                    test_port,
169                    registry_addr.clone(),
170                    instance_id,
171                )
172                .await;
173            }
174            test_port += 1;
175        }
176        anyhow::bail!(
177            "Could not find available port after {} attempts",
178            max_attempts
179        );
180    }
181
182    // Check if port is available
183    match TcpListener::bind(format!("{}:{}", host, port)) {
184        Ok(_listener) => {
185            // Port is available, we'll be the mesh leader/registry
186            println!(
187                "Starting spec-ai server as mesh leader on {}:{}",
188                host, port
189            );
190            drop(_listener); // Release the port before starting the actual server
191        }
192        Err(_) => {
193            // Port is in use - try to detect and join existing mesh
194            println!(
195                "Port {} is in use. Checking for existing mesh registry...",
196                port
197            );
198            let health_url = format!("http://{}:{}/health", host, port);
199            match reqwest::get(&health_url).await {
200                Ok(response) if response.status().is_success() => {
201                    println!("Found existing spec-ai mesh registry at {}:{}", host, port);
202                    // Find an available port for ourselves
203                    let mut test_port = port + 1;
204                    let max_attempts = 100;
205                    for _ in 0..max_attempts {
206                        if TcpListener::bind(format!("{}:{}", host, test_port)).is_ok() {
207                            println!("Joining mesh on port {}", test_port);
208                            let registry_url = format!("{}:{}", host, port);
209                            return start_mesh_member(
210                                config_path,
211                                host,
212                                test_port,
213                                registry_url,
214                                instance_id,
215                            )
216                            .await;
217                        }
218                        test_port += 1;
219                    }
220                    anyhow::bail!(
221                        "Could not find available port after {} attempts",
222                        max_attempts
223                    );
224                }
225                _ => {
226                    eprintln!("Error: Port {} is in use by another process", port);
227                    eprintln!("Please specify a different port with --port");
228                    std::process::exit(1);
229                }
230            }
231        }
232    }
233
234    // Load configuration
235    let app_config = if let Some(path) = config_path {
236        AppConfig::load_from_file(&path)?
237    } else {
238        AppConfig::load()?
239    };
240
241    // Initialize persistence
242    let persistence = Persistence::new(&app_config.database.path)?;
243
244    // Initialize embeddings client if configured
245    let embeddings = if let Some(embeddings_model) = &app_config.model.embeddings_model {
246        if let Some(api_key_source) = &app_config.model.api_key_source {
247            // Resolve API key from environment or file
248            let api_key = if api_key_source.starts_with("ENV:") {
249                std::env::var(&api_key_source[4..]).ok()
250            } else {
251                std::fs::read_to_string(api_key_source).ok()
252            };
253            if let Some(key) = api_key {
254                Some(EmbeddingsClient::with_api_key(
255                    embeddings_model.clone(),
256                    key,
257                ))
258            } else {
259                Some(EmbeddingsClient::new(embeddings_model.clone()))
260            }
261        } else {
262            Some(EmbeddingsClient::new(embeddings_model.clone()))
263        }
264    } else {
265        None
266    };
267
268    // Create registries
269    let agent_registry = Arc::new(AgentRegistry::new(
270        app_config.agents.clone(),
271        persistence.clone(),
272    ));
273    let tool_registry = Arc::new(ToolRegistry::with_builtin_tools(
274        Some(Arc::new(persistence.clone())),
275        embeddings,
276    ));
277
278    // Configure and start API server
279    let api_config = ApiConfig::new()
280        .with_host(host.clone())
281        .with_port(port)
282        .with_cors(true);
283
284    let server = ApiServer::new(
285        api_config.clone(),
286        persistence.clone(),
287        agent_registry.clone(),
288        tool_registry.clone(),
289        app_config.clone(),
290    )?;
291
292    println!(
293        "Server running at https://{} (fingerprint: {})",
294        api_config.bind_address(),
295        server.certificate_fingerprint()
296    );
297    println!("Health check: https://{}/health", api_config.bind_address());
298    println!("Press Ctrl+C to stop the server");
299
300    // Self-register as leader in the mesh registry
301    let mesh_registry = server.mesh_registry();
302    let self_instance = spec_ai_api::api::mesh::MeshInstance {
303        instance_id: instance_id.clone(),
304        hostname: host.clone(),
305        port,
306        capabilities: vec!["registry".to_string(), "query".to_string()],
307        is_leader: true,
308        last_heartbeat: chrono::Utc::now(),
309        created_at: chrono::Utc::now(),
310        agent_profiles: agent_registry.list(),
311    };
312    mesh_registry.register(self_instance).await;
313
314    // Start background heartbeat for self (keeps our own timestamp fresh)
315    let heartbeat_instance_id = instance_id.clone();
316    let heartbeat_registry = mesh_registry.clone();
317    let heartbeat_interval = app_config.mesh.heartbeat_interval_secs;
318    tokio::spawn(async move {
319        let mut interval =
320            tokio::time::interval(tokio::time::Duration::from_secs(heartbeat_interval));
321        loop {
322            interval.tick().await;
323            let _ = heartbeat_registry.heartbeat(&heartbeat_instance_id).await;
324        }
325    });
326
327    // Start stale instance cleanup task
328    let cleanup_registry = mesh_registry.clone();
329    let cleanup_timeout = app_config.mesh.leader_timeout_secs;
330    tokio::spawn(async move {
331        let mut interval =
332            tokio::time::interval(tokio::time::Duration::from_secs(cleanup_timeout / 2));
333        loop {
334            interval.tick().await;
335            cleanup_registry.cleanup_stale(cleanup_timeout).await;
336        }
337    });
338
339    // Setup shutdown signal
340    let shutdown_instance_id = instance_id.clone();
341    let shutdown_registry = mesh_registry.clone();
342    let shutdown = async move {
343        tokio::signal::ctrl_c()
344            .await
345            .expect("Failed to install Ctrl+C handler");
346        println!("\nShutting down server...");
347        // Deregister from mesh
348        let _ = shutdown_registry.deregister(&shutdown_instance_id).await;
349    };
350
351    // Run server with graceful shutdown
352    server.run_with_shutdown(shutdown).await?;
353
354    println!("Server stopped");
355    Ok(())
356}
357
358#[cfg(feature = "api")]
359async fn start_mesh_member(
360    config_path: Option<PathBuf>,
361    host: String,
362    port: u16,
363    registry_url: String,
364    instance_id: String,
365) -> Result<()> {
366    use spec_ai_api::api::mesh::MeshClient;
367    use spec_ai_config::config::AppConfig;
368    use spec_ai_core::embeddings::EmbeddingsClient;
369
370    println!("Starting as mesh member on {}:{}", host, port);
371    println!("Registry at: {}", registry_url);
372
373    // Load configuration
374    let app_config = if let Some(path) = config_path {
375        AppConfig::load_from_file(&path)?
376    } else {
377        AppConfig::load()?
378    };
379
380    // Initialize persistence
381    let persistence = Persistence::new(&app_config.database.path)?;
382
383    // Initialize embeddings client if configured
384    let embeddings = if let Some(embeddings_model) = &app_config.model.embeddings_model {
385        if let Some(api_key_source) = &app_config.model.api_key_source {
386            let api_key = if api_key_source.starts_with("ENV:") {
387                std::env::var(&api_key_source[4..]).ok()
388            } else {
389                std::fs::read_to_string(api_key_source).ok()
390            };
391            if let Some(key) = api_key {
392                Some(EmbeddingsClient::with_api_key(
393                    embeddings_model.clone(),
394                    key,
395                ))
396            } else {
397                Some(EmbeddingsClient::new(embeddings_model.clone()))
398            }
399        } else {
400            Some(EmbeddingsClient::new(embeddings_model.clone()))
401        }
402    } else {
403        None
404    };
405
406    // Create registries
407    let agent_registry = Arc::new(AgentRegistry::new(
408        app_config.agents.clone(),
409        persistence.clone(),
410    ));
411    let tool_registry = Arc::new(ToolRegistry::with_builtin_tools(
412        Some(Arc::new(persistence.clone())),
413        embeddings,
414    ));
415
416    // Get agent profiles for registration
417    let agent_profiles: Vec<String> = agent_registry.list();
418
419    // Register with the mesh
420    let mesh_client = MeshClient::new(
421        &registry_url.split(':').next().unwrap(),
422        registry_url.split(':').nth(1).unwrap().parse()?,
423    );
424
425    let register_response = mesh_client
426        .register(
427            instance_id.clone(),
428            host.clone(),
429            port,
430            vec!["query".to_string()],
431            agent_profiles,
432        )
433        .await?;
434
435    println!("Registered with mesh:");
436    println!("  Leader: {}", register_response.is_leader);
437    println!("  Peers: {}", register_response.peers.len());
438
439    // Start our API server
440    let api_config = ApiConfig::new()
441        .with_host(host.clone())
442        .with_port(port)
443        .with_cors(true);
444
445    let server = ApiServer::new(
446        api_config.clone(),
447        persistence,
448        agent_registry,
449        tool_registry,
450        app_config.clone(),
451    )?;
452
453    println!(
454        "Server running at https://{} (fingerprint: {})",
455        api_config.bind_address(),
456        server.certificate_fingerprint()
457    );
458
459    // Start background heartbeat to registry
460    let heartbeat_instance_id = instance_id.clone();
461    let heartbeat_client = mesh_client.clone();
462    let heartbeat_interval = app_config.mesh.heartbeat_interval_secs;
463    tokio::spawn(async move {
464        let mut interval =
465            tokio::time::interval(tokio::time::Duration::from_secs(heartbeat_interval));
466        loop {
467            interval.tick().await;
468            if let Err(e) = heartbeat_client
469                .heartbeat(&heartbeat_instance_id, None)
470                .await
471            {
472                eprintln!("Heartbeat failed: {}", e);
473            }
474        }
475    });
476
477    // Setup shutdown signal with deregistration
478    let shutdown_instance_id = instance_id.clone();
479    let shutdown_client = mesh_client.clone();
480    let shutdown = async move {
481        tokio::signal::ctrl_c()
482            .await
483            .expect("Failed to install Ctrl+C handler");
484        println!("\nShutting down server...");
485        // Deregister from mesh
486        if let Err(e) = shutdown_client.deregister(&shutdown_instance_id).await {
487            eprintln!("Failed to deregister: {}", e);
488        }
489    };
490
491    // Run server with graceful shutdown
492    server.run_with_shutdown(shutdown).await?;
493
494    println!("Server stopped");
495    Ok(())
496}
497
498async fn run_specs_command(config_path: Option<PathBuf>, spec_paths: Vec<PathBuf>) -> Result<i32> {
499    // Determine which spec to run
500    let specs_to_run = if spec_paths.is_empty() {
501        let default_spec = PathBuf::from("../../../examples/spec/smoke.spec");
502        if !default_spec.exists() {
503            eprintln!("Error: Default spec not found at 'examples/spec/smoke.spec'.");
504            eprintln!("Please provide explicit spec files or create the default spec.");
505            return Ok(1);
506        }
507        vec![default_spec]
508    } else {
509        let mut all_specs = Vec::new();
510        for path in &spec_paths {
511            let specs = collect_spec_files(path)?;
512            all_specs.extend(specs);
513        }
514
515        if all_specs.is_empty() {
516            eprintln!("Error: No .spec files found in provided paths.");
517            return Ok(1);
518        }
519
520        all_specs
521    };
522
523    // Initialize CLI state
524    let mut cli = match CliState::initialize_with_path(config_path) {
525        Ok(cli) => cli,
526        Err(e) => {
527            let error_chain = format!("{:#}", e);
528            if error_chain.contains("Could not set lock")
529                || error_chain.contains("Conflicting lock")
530            {
531                eprintln!("Error: Another instance of spec-ai is already running.");
532                eprintln!();
533                eprintln!("Only one instance can access the database at a time.");
534                eprintln!("Please close the other instance or wait for it to finish.");
535                eprintln!();
536                eprintln!("To run multiple instances, configure a different database path");
537                eprintln!("in your config file: [database] path = \"~/.spec-ai/other.db\"");
538                std::process::exit(1);
539            }
540            return Err(e);
541        }
542    };
543
544    // Run each spec file
545    let mut all_success = true;
546    for spec_path in specs_to_run {
547        match run_spec_file(&mut cli, &spec_path).await {
548            Ok(success) => {
549                if !success {
550                    all_success = false;
551                }
552            }
553            Err(e) => {
554                eprintln!("Error running spec '{}': {}", spec_path.display(), e);
555                all_success = false;
556            }
557        }
558    }
559
560    Ok(if all_success { 0 } else { 1 })
561}
562
563#[tokio::main]
564pub async fn run() -> Result<()> {
565    let cli = Cli::parse();
566
567    match cli.command {
568        Some(Commands::Run { specs }) => {
569            let exit_code = run_specs_command(cli.config, specs).await?;
570            std::process::exit(exit_code);
571        }
572        #[cfg(feature = "api")]
573        Some(Commands::Server { port, host, join }) => {
574            start_server(cli.config, host, port, join).await?;
575            Ok(())
576        }
577        #[cfg(not(feature = "api"))]
578        Some(Commands::Server { .. }) => {
579            eprintln!("Error: Server functionality requires the 'api' feature");
580            eprintln!("Please rebuild with: cargo build --features api");
581            std::process::exit(1);
582        }
583        None => match cli.mode {
584            TuiMode::New => {
585                spec_ai_tui_app::run_tui(cli.config).await?;
586                Ok(())
587            }
588            TuiMode::Legacy => run_repl_with_config(cli.config).await,
589        },
590    }
591}
592
593async fn run_repl_with_config(config: Option<PathBuf>) -> Result<()> {
594    let mut cli_state = match CliState::initialize_with_path(config) {
595        Ok(cli) => cli,
596        Err(e) => {
597            let error_chain = format!("{:#}", e);
598            if error_chain.contains("Could not set lock")
599                || error_chain.contains("Conflicting lock")
600            {
601                eprintln!("Error: Another instance of spec-ai is already running.");
602                eprintln!();
603                eprintln!("Only one instance can access the database at a time.");
604                eprintln!("Please close the other instance or wait for it to finish.");
605                eprintln!();
606                eprintln!("To run multiple instances, configure a different database path");
607                eprintln!("in your config file: [database] path = \"~/.spec-ai/other.db\"");
608                std::process::exit(1);
609            }
610            return Err(e);
611        }
612    };
613
614    // Initialize logging based on config
615    let log_level = cli_state.config.logging.level.to_uppercase();
616    let default_directive = format!(
617        "spec_ai={},tower_http=debug",
618        log_level.to_lowercase()
619    );
620    let env_override = std::env::var("RUST_LOG").unwrap_or_default();
621    let combined_filter = if env_override.trim().is_empty() {
622        default_directive.clone()
623    } else if env_override.contains("spec_ai") {
624        env_override
625    } else {
626        format!("{},{}", env_override, default_directive)
627    };
628
629    tracing_subscriber::fmt()
630        .with_env_filter(combined_filter)
631        .with_target(true)
632        .init();
633
634    cli_state.run_repl().await
635}