Skip to main content

adk_cli/
launcher.rs

1//! Simple launcher for ADK agents with CLI support.
2//!
3//! Provides a one-liner to run agents with console or web server modes,
4//! similar to adk-go's launcher pattern.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use adk_cli::Launcher;
10//! use std::sync::Arc;
11//!
12//! #[tokio::main]
13//! async fn main() -> adk_core::Result<()> {
14//!     let agent = /* create your agent */;
15//!
16//!     // Run with CLI support (console by default, or `serve` for web)
17//!     Launcher::new(Arc::new(agent)).run().await
18//! }
19//! ```
20//!
21//! # CLI Usage
22//!
23//! ```bash
24//! # Interactive console (default)
25//! cargo run
26//!
27//! # Web server with UI
28//! cargo run -- serve
29//! cargo run -- serve --port 3000
30//! ```
31
32use adk_artifact::ArtifactService;
33use adk_core::{
34    Agent, CacheCapable, Content, ContextCacheConfig, EventsCompactionConfig, Memory, Part, Result,
35    RunConfig, SessionId, StreamingMode, UserId,
36};
37use adk_runner::{Runner, RunnerConfig};
38use adk_server::{
39    RequestContextExtractor, SecurityConfig, ServerConfig, create_app, create_app_with_a2a,
40    shutdown_signal,
41};
42use adk_session::{CreateRequest, InMemorySessionService, SessionService};
43use axum::Router;
44use clap::{Parser, Subcommand};
45use futures::StreamExt;
46use rustyline::DefaultEditor;
47use serde_json::Value;
48use std::collections::HashMap;
49use std::io::{self, Write};
50use std::sync::Arc;
51use std::time::Duration;
52use tokio_util::sync::CancellationToken;
53use tracing::{error, warn};
54
55/// CLI arguments for the launcher.
56#[derive(Parser)]
57#[command(name = "agent")]
58#[command(about = "ADK Agent", long_about = None)]
59struct LauncherCli {
60    #[command(subcommand)]
61    command: Option<LauncherCommand>,
62}
63
64#[derive(Subcommand)]
65enum LauncherCommand {
66    /// Run interactive console (default if no command specified)
67    Chat,
68    /// Start web server with UI
69    Serve {
70        /// Server port
71        #[arg(long, default_value_t = 8080)]
72        port: u16,
73    },
74    /// Optimize agent instructions using an evaluation set
75    #[cfg(feature = "optimize")]
76    Optimize(crate::optimize::OptimizeArgs),
77}
78
79/// Controls how the console renders thinking/reasoning content.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
81pub enum ThinkingDisplayMode {
82    /// Show thinking when the model emits it.
83    #[default]
84    Auto,
85    /// Always surface emitted thinking.
86    Show,
87    /// Hide emitted thinking from the console output.
88    Hide,
89}
90
91/// Controls how serve mode initializes telemetry.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub enum TelemetryConfig {
94    /// Use the in-memory ADK span exporter for debug endpoints.
95    AdkExporter { service_name: String },
96    /// Export telemetry to an OTLP collector.
97    Otlp { service_name: String, endpoint: String },
98    /// Disable telemetry initialization in the launcher.
99    None,
100}
101
102impl Default for TelemetryConfig {
103    fn default() -> Self {
104        Self::AdkExporter { service_name: "adk-server".to_string() }
105    }
106}
107
108/// Launcher for running ADK agents with CLI support.
109///
110/// Provides console and web server modes out of the box.
111///
112/// # Console mode
113///
114/// Uses a `rustyline` REPL with history, Ctrl+C handling, and streaming
115/// output that renders `<think>` blocks, tool calls, and inline data.
116///
117/// # Serve mode
118///
119/// Starts an Axum HTTP server with the ADK web UI.
120///
121/// # Note on `with_streaming_mode`
122///
123/// `with_streaming_mode` currently only affects console mode. `ServerConfig`
124/// does not accept a `RunConfig`, so the setting is not forwarded to the
125/// server. This will be addressed when `ServerConfig` gains that field.
126pub struct Launcher {
127    agent: Arc<dyn Agent>,
128    app_name: Option<String>,
129    session_service: Option<Arc<dyn SessionService>>,
130    artifact_service: Option<Arc<dyn ArtifactService>>,
131    memory_service: Option<Arc<dyn Memory>>,
132    compaction_config: Option<EventsCompactionConfig>,
133    context_cache_config: Option<ContextCacheConfig>,
134    cache_capable: Option<Arc<dyn CacheCapable>>,
135    security_config: Option<SecurityConfig>,
136    request_context_extractor: Option<Arc<dyn RequestContextExtractor>>,
137    a2a_base_url: Option<String>,
138    telemetry_config: TelemetryConfig,
139    shutdown_grace_period: Duration,
140    run_config: Option<RunConfig>,
141    thinking_mode: ThinkingDisplayMode,
142}
143
144impl Launcher {
145    /// Create a new launcher with the given agent.
146    pub fn new(agent: Arc<dyn Agent>) -> Self {
147        Self {
148            agent,
149            app_name: None,
150            session_service: None,
151            artifact_service: None,
152            memory_service: None,
153            compaction_config: None,
154            context_cache_config: None,
155            cache_capable: None,
156            security_config: None,
157            request_context_extractor: None,
158            a2a_base_url: None,
159            telemetry_config: TelemetryConfig::default(),
160            shutdown_grace_period: Duration::from_secs(30),
161            run_config: None,
162            thinking_mode: ThinkingDisplayMode::Auto,
163        }
164    }
165
166    /// Set a custom application name (defaults to agent name).
167    pub fn app_name(mut self, name: impl Into<String>) -> Self {
168        self.app_name = Some(name.into());
169        self
170    }
171
172    /// Set a custom artifact service.
173    pub fn with_artifact_service(mut self, service: Arc<dyn ArtifactService>) -> Self {
174        self.artifact_service = Some(service);
175        self
176    }
177
178    /// Set a custom session service.
179    pub fn with_session_service(mut self, service: Arc<dyn SessionService>) -> Self {
180        self.session_service = Some(service);
181        self
182    }
183
184    /// Set a custom memory service.
185    pub fn with_memory_service(mut self, service: Arc<dyn Memory>) -> Self {
186        self.memory_service = Some(service);
187        self
188    }
189
190    /// Enable runner-level context compaction in serve mode.
191    pub fn with_compaction(mut self, config: EventsCompactionConfig) -> Self {
192        self.compaction_config = Some(config);
193        self
194    }
195
196    /// Enable automatic prompt cache lifecycle management in serve mode.
197    pub fn with_context_cache(
198        mut self,
199        config: ContextCacheConfig,
200        cache_capable: Arc<dyn CacheCapable>,
201    ) -> Self {
202        self.context_cache_config = Some(config);
203        self.cache_capable = Some(cache_capable);
204        self
205    }
206
207    /// Set custom server security settings.
208    pub fn with_security_config(mut self, config: SecurityConfig) -> Self {
209        self.security_config = Some(config);
210        self
211    }
212
213    /// Set a request context extractor for authenticated deployments.
214    pub fn with_request_context_extractor(
215        mut self,
216        extractor: Arc<dyn RequestContextExtractor>,
217    ) -> Self {
218        self.request_context_extractor = Some(extractor);
219        self
220    }
221
222    /// Enable A2A routes when building or serving the HTTP app.
223    pub fn with_a2a_base_url(mut self, base_url: impl Into<String>) -> Self {
224        self.a2a_base_url = Some(base_url.into());
225        self
226    }
227
228    /// Configure how serve mode initializes telemetry.
229    pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
230        self.telemetry_config = config;
231        self
232    }
233
234    /// Set the maximum graceful shutdown window for the web server.
235    pub fn with_shutdown_grace_period(mut self, grace_period: Duration) -> Self {
236        self.shutdown_grace_period = grace_period;
237        self
238    }
239
240    /// Set streaming mode for console mode.
241    ///
242    /// Note: this currently only affects console mode. The server does not
243    /// yet accept a `RunConfig`.
244    pub fn with_streaming_mode(mut self, mode: StreamingMode) -> Self {
245        self.run_config = Some(RunConfig { streaming_mode: mode, ..RunConfig::default() });
246        self
247    }
248
249    /// Control how emitted thinking content is rendered in console mode.
250    pub fn with_thinking_mode(mut self, mode: ThinkingDisplayMode) -> Self {
251        self.thinking_mode = mode;
252        self
253    }
254
255    /// Run the launcher, parsing CLI arguments.
256    ///
257    /// - No arguments or `chat`: Interactive console
258    /// - `serve [--port PORT]`: Web server with UI
259    pub async fn run(self) -> Result<()> {
260        let cli = LauncherCli::parse();
261
262        match cli.command.unwrap_or(LauncherCommand::Chat) {
263            LauncherCommand::Chat => self.run_console_directly().await,
264            LauncherCommand::Serve { port } => self.run_serve_directly(port).await,
265            #[cfg(feature = "optimize")]
266            LauncherCommand::Optimize(args) => {
267                tracing::info!(
268                    agent_path = %args.agent_path.display(),
269                    eval_set_path = %args.eval_set_path.display(),
270                    max_iterations = args.max_iterations,
271                    target = args.target,
272                    output = %args.output.display(),
273                    "starting prompt optimization"
274                );
275                // The actual optimization wiring is done by the caller
276                // who has access to the optimizer LLM and evaluator.
277                // This branch serves as the CLI entry point.
278                tracing::warn!(
279                    "adk optimize requires wiring via PromptOptimizer — use adk-eval directly"
280                );
281                Ok(())
282            }
283        }
284    }
285
286    /// Run in interactive console mode without parsing CLI arguments.
287    ///
288    /// Use this when you already know you want console mode (e.g. from
289    /// your own CLI parser). [`run`](Self::run) calls this internally.
290    pub async fn run_console_directly(self) -> Result<()> {
291        let app_name = self.app_name.unwrap_or_else(|| self.agent.name().to_string());
292        let user_id = "user".to_string();
293        let thinking_mode = self.thinking_mode;
294        let agent = self.agent;
295        let artifact_service = self.artifact_service;
296        let memory_service = self.memory_service;
297        let run_config = self.run_config;
298
299        let session_service =
300            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
301
302        let session = session_service
303            .create(CreateRequest {
304                app_name: app_name.clone(),
305                user_id: user_id.clone(),
306                session_id: None,
307                state: HashMap::new(),
308            })
309            .await?;
310
311        let session_id = session.id().to_string();
312
313        let mut rl = DefaultEditor::new()
314            .map_err(|e| adk_core::AdkError::config(format!("failed to init readline: {e}")))?;
315
316        print_banner(agent.name());
317
318        loop {
319            let readline = rl.readline("\x1b[36mYou >\x1b[0m ");
320            match readline {
321                Ok(line) => {
322                    let trimmed = line.trim().to_string();
323                    if trimmed.is_empty() {
324                        continue;
325                    }
326                    if is_exit_command(&trimmed) {
327                        println!("\nGoodbye.\n");
328                        break;
329                    }
330                    if trimmed == "/help" {
331                        print_help();
332                        continue;
333                    }
334                    if trimmed == "/clear" {
335                        println!("(conversation cleared — session state is unchanged)");
336                        continue;
337                    }
338
339                    let _ = rl.add_history_entry(&line);
340
341                    let user_content = Content::new("user").with_text(trimmed);
342                    println!();
343
344                    let cancellation_token = CancellationToken::new();
345                    let runner = Runner::new(RunnerConfig {
346                        app_name: app_name.clone(),
347                        agent: agent.clone(),
348                        session_service: session_service.clone(),
349                        artifact_service: artifact_service.clone(),
350                        memory_service: memory_service.clone(),
351                        plugin_manager: None,
352                        run_config: run_config.clone(),
353                        compaction_config: None,
354                        context_cache_config: None,
355                        cache_capable: None,
356                        request_context: None,
357                        cancellation_token: Some(cancellation_token.clone()),
358                        intra_compaction_config: None,
359                        intra_compaction_summarizer: None,
360                    })?;
361                    let mut events = runner
362                        .run(
363                            UserId::new(user_id.clone())?,
364                            SessionId::new(session_id.clone())?,
365                            user_content,
366                        )
367                        .await?;
368                    let mut printer = StreamPrinter::new(thinking_mode);
369                    let mut current_agent = String::new();
370                    let mut printed_header = false;
371                    let mut interrupted = false;
372
373                    loop {
374                        tokio::select! {
375                            event = events.next() => {
376                                let Some(event) = event else {
377                                    break;
378                                };
379
380                                match event {
381                                    Ok(evt) => {
382                                        // Track agent switches in multi-agent workflows
383                                        if !evt.author.is_empty()
384                                            && evt.author != "user"
385                                            && evt.author != current_agent
386                                        {
387                                            if !current_agent.is_empty() {
388                                                println!();
389                                            }
390                                            current_agent = evt.author.clone();
391                                            // Only show agent label in multi-agent scenarios
392                                            if printed_header {
393                                                print!("\x1b[33m[{current_agent}]\x1b[0m ");
394                                                let _ = io::stdout().flush();
395                                            }
396                                            printed_header = true;
397                                        }
398
399                                        // Show agent transfer requests
400                                        if let Some(target) = &evt.actions.transfer_to_agent {
401                                            print!("\x1b[90m[transfer -> {target}]\x1b[0m ");
402                                            let _ = io::stdout().flush();
403                                        }
404
405                                        if let Some(content) = &evt.llm_response.content {
406                                            for part in &content.parts {
407                                                printer.handle_part(part);
408                                            }
409                                        }
410                                    }
411                                    Err(e) => {
412                                        error!("stream error: {e}");
413                                    }
414                                }
415                            }
416                            signal = tokio::signal::ctrl_c() => {
417                                match signal {
418                                    Ok(()) => {
419                                        cancellation_token.cancel();
420                                        interrupted = true;
421                                        break;
422                                    }
423                                    Err(err) => {
424                                        error!("failed to listen for Ctrl+C: {err}");
425                                    }
426                                }
427                            }
428                        }
429                    }
430
431                    printer.finish();
432                    if interrupted {
433                        println!("\nInterrupted.\n");
434                        continue;
435                    }
436
437                    println!("\n");
438                }
439                Err(rustyline::error::ReadlineError::Interrupted) => {
440                    println!("\nInterrupted. Type exit to quit.\n");
441                    continue;
442                }
443                Err(rustyline::error::ReadlineError::Eof) => {
444                    println!("\nGoodbye.\n");
445                    break;
446                }
447                Err(err) => {
448                    error!("readline error: {err}");
449                    break;
450                }
451            }
452        }
453
454        Ok(())
455    }
456
457    fn init_telemetry(&self) -> Option<Arc<adk_telemetry::AdkSpanExporter>> {
458        match &self.telemetry_config {
459            TelemetryConfig::AdkExporter { service_name } => {
460                match adk_telemetry::init_with_adk_exporter(service_name) {
461                    Ok(exporter) => Some(exporter),
462                    Err(e) => {
463                        warn!("failed to initialize telemetry: {e}");
464                        None
465                    }
466                }
467            }
468            TelemetryConfig::Otlp { service_name, endpoint } => {
469                if let Err(e) = adk_telemetry::init_with_otlp(service_name, endpoint) {
470                    warn!("failed to initialize otlp telemetry: {e}");
471                }
472                None
473            }
474            TelemetryConfig::None => None,
475        }
476    }
477
478    fn into_server_config(
479        self,
480        span_exporter: Option<Arc<adk_telemetry::AdkSpanExporter>>,
481    ) -> ServerConfig {
482        let session_service =
483            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
484        let agent_loader = Arc::new(adk_core::SingleAgentLoader::new(self.agent));
485
486        let mut config = ServerConfig::new(agent_loader, session_service)
487            .with_artifact_service_opt(self.artifact_service);
488
489        if let Some(memory_service) = self.memory_service {
490            config = config.with_memory_service(memory_service);
491        }
492
493        if let Some(compaction_config) = self.compaction_config {
494            config = config.with_compaction(compaction_config);
495        }
496
497        if let (Some(context_cache_config), Some(cache_capable)) =
498            (self.context_cache_config, self.cache_capable)
499        {
500            config = config.with_context_cache(context_cache_config, cache_capable);
501        }
502
503        if let Some(security) = self.security_config {
504            config = config.with_security(security);
505        }
506
507        if let Some(extractor) = self.request_context_extractor {
508            config = config.with_request_context(extractor);
509        }
510
511        if let Some(exporter) = span_exporter {
512            config = config.with_span_exporter(exporter);
513        }
514
515        config
516    }
517
518    /// Build the Axum application without serving it.
519    ///
520    /// This is the production escape hatch for adding custom routes,
521    /// middleware, metrics, or owning the serve loop yourself.
522    pub fn build_app(self) -> Result<Router> {
523        let span_exporter = self.init_telemetry();
524        let a2a_base_url = self.a2a_base_url.clone();
525        let config = self.into_server_config(span_exporter);
526
527        Ok(match a2a_base_url {
528            Some(base_url) => create_app_with_a2a(config, Some(&base_url)),
529            None => create_app(config),
530        })
531    }
532
533    /// Build the Axum application with A2A routes enabled.
534    pub fn build_app_with_a2a(mut self, base_url: impl Into<String>) -> Result<Router> {
535        self.a2a_base_url = Some(base_url.into());
536        self.build_app()
537    }
538
539    /// Run web server without parsing CLI arguments.
540    ///
541    /// Use this when you already know you want serve mode (e.g. from
542    /// your own CLI parser). [`run`](Self::run) calls this internally.
543    pub async fn run_serve_directly(self, port: u16) -> Result<()> {
544        let app = self.build_app()?;
545
546        let addr = format!("0.0.0.0:{port}");
547        let listener = tokio::net::TcpListener::bind(&addr)
548            .await
549            .map_err(|e| adk_core::AdkError::config(format!("failed to bind {addr}: {e}")))?;
550
551        println!("ADK Server starting on http://localhost:{port}");
552        println!("Open http://localhost:{port} in your browser");
553        println!("Press Ctrl+C to stop\n");
554
555        axum::serve(listener, app)
556            .with_graceful_shutdown(shutdown_signal())
557            .await
558            .map_err(|e| adk_core::AdkError::config(format!("server error: {e}")))?;
559
560        Ok(())
561    }
562}
563
564/// Print the ADK-Rust welcome banner.
565fn print_banner(agent_name: &str) {
566    let version = env!("CARGO_PKG_VERSION");
567    let title = format!("ADK-Rust  v{version}");
568    let subtitle = "Rust Agent Development Kit";
569    // Box inner width = 49 (between the ║ chars)
570    let inner: usize = 49;
571    let pad_title = (inner.saturating_sub(title.len())) / 2;
572    let pad_subtitle = (inner.saturating_sub(subtitle.len())) / 2;
573
574    println!();
575    println!("    ╔{:═<inner$}╗", "");
576    println!(
577        "    ║{:>w$}{title}{:<r$}║",
578        "",
579        "",
580        w = pad_title,
581        r = inner - pad_title - title.len()
582    );
583    println!(
584        "    ║{:>w$}{subtitle}{:<r$}║",
585        "",
586        "",
587        w = pad_subtitle,
588        r = inner - pad_subtitle - subtitle.len()
589    );
590    println!("    ╚{:═<inner$}╝", "");
591    println!();
592    println!("  Agent    : {agent_name}");
593    println!("  Runtime  : Tokio async, streaming responses");
594    println!("  Features : tool calling, multi-provider, multi-agent, think blocks");
595    println!();
596    println!("  Type a message to chat. /help for commands.");
597    println!();
598}
599
600/// Print available REPL commands.
601fn print_help() {
602    println!();
603    println!("  Commands:");
604    println!("    /help   Show this help");
605    println!("    /clear  Clear display (session state is kept)");
606    println!("    quit    Exit the REPL");
607    println!("    exit    Exit the REPL");
608    println!("    /quit   Exit the REPL");
609    println!("    /exit   Exit the REPL");
610    println!();
611    println!("  Tips:");
612    println!("    - Up/Down arrows browse history");
613    println!("    - Ctrl+C interrupts the current operation");
614    println!("    - Multi-agent workflows show [agent_name] on handoff");
615    println!();
616}
617
618fn is_exit_command(input: &str) -> bool {
619    matches!(input, "quit" | "exit" | "/quit" | "/exit")
620}
621
622/// Handles streaming output with special rendering for think blocks,
623/// tool calls, function responses, and inline/file data.
624pub struct StreamPrinter {
625    thinking_mode: ThinkingDisplayMode,
626    in_think_block: bool,
627    in_thinking_part_stream: bool,
628    think_buffer: String,
629}
630
631impl StreamPrinter {
632    /// Create a printer with the selected thinking display mode.
633    pub fn new(thinking_mode: ThinkingDisplayMode) -> Self {
634        Self {
635            thinking_mode,
636            in_think_block: false,
637            in_thinking_part_stream: false,
638            think_buffer: String::new(),
639        }
640    }
641
642    /// Process a single response part, rendering it to stdout.
643    pub fn handle_part(&mut self, part: &Part) {
644        match part {
645            Part::Text { text } => {
646                self.flush_part_thinking_if_needed();
647                self.handle_text_chunk(text);
648            }
649            Part::Thinking { thinking, .. } => {
650                if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
651                    return;
652                }
653                if !self.in_thinking_part_stream {
654                    print!("\n[thinking] ");
655                    let _ = io::stdout().flush();
656                    self.in_thinking_part_stream = true;
657                }
658                self.think_buffer.push_str(thinking);
659                print!("{thinking}");
660                let _ = io::stdout().flush();
661            }
662            Part::FunctionCall { name, args, .. } => {
663                self.flush_pending_thinking();
664                print!("\n[tool-call] {name} {args}\n");
665                let _ = io::stdout().flush();
666            }
667            Part::FunctionResponse { function_response, .. } => {
668                self.flush_pending_thinking();
669                self.print_tool_response(&function_response.name, &function_response.response);
670            }
671            Part::InlineData { mime_type, data } => {
672                self.flush_pending_thinking();
673                print!("\n[inline-data] mime={mime_type} bytes={}\n", data.len());
674                let _ = io::stdout().flush();
675            }
676            Part::FileData { mime_type, file_uri } => {
677                self.flush_pending_thinking();
678                print!("\n[file-data] mime={mime_type} uri={file_uri}\n");
679                let _ = io::stdout().flush();
680            }
681            Part::ServerToolCall { server_tool_call } => {
682                self.flush_pending_thinking();
683                print!("\n[server-tool-call] {server_tool_call}\n");
684                let _ = io::stdout().flush();
685            }
686            Part::ServerToolResponse { server_tool_response } => {
687                self.flush_pending_thinking();
688                print!("\n[server-tool-response] {}B\n", server_tool_response.to_string().len());
689                let _ = io::stdout().flush();
690            }
691        }
692    }
693
694    fn handle_text_chunk(&mut self, chunk: &str) {
695        if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
696            let mut visible = String::with_capacity(chunk.len());
697            let mut remaining = chunk;
698
699            while let Some(start_idx) = remaining.find("<think>") {
700                visible.push_str(&remaining[..start_idx]);
701                let after_start = &remaining[start_idx + "<think>".len()..];
702                if let Some(end_idx) = after_start.find("</think>") {
703                    remaining = &after_start[end_idx + "</think>".len()..];
704                } else {
705                    remaining = "";
706                    break;
707                }
708            }
709
710            visible.push_str(remaining);
711            if !visible.is_empty() {
712                print!("{visible}");
713                let _ = io::stdout().flush();
714            }
715            return;
716        }
717
718        const THINK_START: &str = "<think>";
719        const THINK_END: &str = "</think>";
720
721        let mut remaining = chunk;
722
723        while !remaining.is_empty() {
724            if self.in_think_block {
725                if let Some(end_idx) = remaining.find(THINK_END) {
726                    self.think_buffer.push_str(&remaining[..end_idx]);
727                    self.flush_think();
728                    self.in_think_block = false;
729                    remaining = &remaining[end_idx + THINK_END.len()..];
730                } else {
731                    self.think_buffer.push_str(remaining);
732                    break;
733                }
734            } else if let Some(start_idx) = remaining.find(THINK_START) {
735                let visible = &remaining[..start_idx];
736                if !visible.is_empty() {
737                    print!("{visible}");
738                    let _ = io::stdout().flush();
739                }
740                self.in_think_block = true;
741                self.think_buffer.clear();
742                remaining = &remaining[start_idx + THINK_START.len()..];
743            } else {
744                print!("{remaining}");
745                let _ = io::stdout().flush();
746                break;
747            }
748        }
749    }
750
751    fn flush_think(&mut self) {
752        let content = self.think_buffer.trim();
753        if !content.is_empty() {
754            print!("\n[think] {content}\n");
755            let _ = io::stdout().flush();
756        }
757        self.think_buffer.clear();
758    }
759
760    /// Flush any pending think block content.
761    pub fn finish(&mut self) {
762        self.flush_pending_thinking();
763    }
764
765    fn print_tool_response(&self, name: &str, response: &Value) {
766        print!("\n[tool-response] {name} {response}\n");
767        let _ = io::stdout().flush();
768    }
769
770    fn flush_part_thinking_if_needed(&mut self) {
771        if self.in_thinking_part_stream {
772            println!();
773            let _ = io::stdout().flush();
774            self.think_buffer.clear();
775            self.in_thinking_part_stream = false;
776        }
777    }
778
779    fn flush_pending_thinking(&mut self) {
780        self.flush_part_thinking_if_needed();
781        if self.in_think_block {
782            self.flush_think_with_label("think");
783            self.in_think_block = false;
784        }
785    }
786
787    fn flush_think_with_label(&mut self, label: &str) {
788        let content = self.think_buffer.trim();
789        if !content.is_empty() {
790            print!("\n[{label}] {content}\n");
791            let _ = io::stdout().flush();
792        }
793        self.think_buffer.clear();
794    }
795}
796
797impl Default for StreamPrinter {
798    fn default() -> Self {
799        Self::new(ThinkingDisplayMode::Auto)
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806    use adk_core::{Agent, EventStream, InvocationContext, Result as AdkResult};
807    use async_trait::async_trait;
808    use axum::{
809        body::{Body, to_bytes},
810        http::{Request, StatusCode},
811    };
812    use futures::stream;
813    use std::sync::Arc;
814    use tower::ServiceExt;
815
816    struct TestAgent;
817
818    #[async_trait]
819    impl Agent for TestAgent {
820        fn name(&self) -> &str {
821            "launcher_test_agent"
822        }
823
824        fn description(&self) -> &str {
825            "launcher test agent"
826        }
827
828        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
829            &[]
830        }
831
832        async fn run(&self, _ctx: Arc<dyn InvocationContext>) -> AdkResult<EventStream> {
833            Ok(Box::pin(stream::empty()))
834        }
835    }
836
837    fn test_launcher() -> Launcher {
838        Launcher::new(Arc::new(TestAgent)).with_telemetry(TelemetryConfig::None)
839    }
840
841    #[test]
842    fn stream_printer_tracks_think_block_state() {
843        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
844        assert!(!printer.in_think_block);
845
846        // Opening a think block sets the flag
847        printer.handle_text_chunk("<think>reasoning");
848        assert!(printer.in_think_block);
849        assert_eq!(printer.think_buffer, "reasoning");
850
851        // Closing the think block clears the flag and buffer
852        printer.handle_text_chunk(" more</think>visible");
853        assert!(!printer.in_think_block);
854        assert!(printer.think_buffer.is_empty());
855    }
856
857    #[test]
858    fn stream_printer_handles_think_block_across_chunks() {
859        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
860
861        printer.handle_text_chunk("before<think>start");
862        assert!(printer.in_think_block);
863        assert_eq!(printer.think_buffer, "start");
864
865        printer.handle_text_chunk(" middle");
866        assert!(printer.in_think_block);
867        assert_eq!(printer.think_buffer, "start middle");
868
869        printer.handle_text_chunk(" end</think>after");
870        assert!(!printer.in_think_block);
871        assert!(printer.think_buffer.is_empty());
872    }
873
874    #[test]
875    fn stream_printer_finish_flushes_open_think_block() {
876        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
877
878        printer.handle_text_chunk("<think>unclosed reasoning");
879        assert!(printer.in_think_block);
880
881        printer.finish();
882        assert!(!printer.in_think_block);
883        assert!(printer.think_buffer.is_empty());
884    }
885
886    #[test]
887    fn stream_printer_finish_is_noop_when_no_think_block() {
888        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
889        printer.finish();
890        assert!(!printer.in_think_block);
891        assert!(printer.think_buffer.is_empty());
892    }
893
894    #[test]
895    fn stream_printer_handles_multiple_think_blocks_in_one_chunk() {
896        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
897
898        printer.handle_text_chunk("a<think>first</think>b<think>second</think>c");
899        assert!(!printer.in_think_block);
900        assert!(printer.think_buffer.is_empty());
901    }
902
903    #[test]
904    fn stream_printer_handles_empty_think_block() {
905        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
906
907        printer.handle_text_chunk("<think></think>after");
908        assert!(!printer.in_think_block);
909        assert!(printer.think_buffer.is_empty());
910    }
911
912    #[test]
913    fn stream_printer_handles_all_part_types() {
914        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
915
916        // Text
917        printer.handle_part(&Part::Text { text: "hello".into() });
918        assert!(!printer.in_think_block);
919
920        // Thinking
921        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
922        assert!(printer.in_thinking_part_stream);
923
924        // FunctionCall
925        printer.handle_part(&Part::FunctionCall {
926            name: "get_weather".into(),
927            args: serde_json::json!({"city": "Seattle"}),
928            id: None,
929            thought_signature: None,
930        });
931
932        // FunctionResponse
933        printer.handle_part(&Part::FunctionResponse {
934            function_response: adk_core::FunctionResponseData::new(
935                "get_weather",
936                serde_json::json!({"temp": 72}),
937            ),
938            id: None,
939        });
940
941        // InlineData
942        printer
943            .handle_part(&Part::InlineData { mime_type: "image/png".into(), data: vec![0u8; 100] });
944
945        // FileData
946        printer.handle_part(&Part::FileData {
947            mime_type: "audio/wav".into(),
948            file_uri: "gs://bucket/file.wav".into(),
949        });
950
951        // No panics, no state corruption
952        assert!(!printer.in_think_block);
953        assert!(!printer.in_thinking_part_stream);
954    }
955
956    #[test]
957    fn stream_printer_text_without_think_tags_leaves_state_clean() {
958        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
959        printer.handle_text_chunk("just plain text with no tags");
960        assert!(!printer.in_think_block);
961        assert!(printer.think_buffer.is_empty());
962    }
963
964    #[test]
965    fn stream_printer_coalesces_streamed_thinking_parts() {
966        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
967
968        printer.handle_part(&Part::Thinking { thinking: "Okay".into(), signature: None });
969        printer.handle_part(&Part::Thinking { thinking: ", the".into(), signature: None });
970        printer.handle_part(&Part::Thinking { thinking: " user".into(), signature: None });
971
972        assert!(printer.in_thinking_part_stream);
973        assert_eq!(printer.think_buffer, "Okay, the user");
974
975        printer.handle_part(&Part::Text { text: "hello".into() });
976
977        assert!(!printer.in_thinking_part_stream);
978        assert!(printer.think_buffer.is_empty());
979    }
980
981    #[test]
982    fn stream_printer_finish_closes_streamed_thinking_state() {
983        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
984
985        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
986        assert!(printer.in_thinking_part_stream);
987
988        printer.finish();
989
990        assert!(!printer.in_thinking_part_stream);
991        assert!(printer.think_buffer.is_empty());
992    }
993
994    #[test]
995    fn stream_printer_hide_mode_ignores_emitted_thinking_state() {
996        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
997
998        printer.handle_part(&Part::Thinking { thinking: "secret".into(), signature: None });
999
1000        assert!(!printer.in_thinking_part_stream);
1001        assert!(printer.think_buffer.is_empty());
1002    }
1003
1004    #[test]
1005    fn stream_printer_hide_mode_drops_think_tags_from_text() {
1006        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
1007
1008        printer.handle_text_chunk("visible<think>hidden</think>after");
1009
1010        assert!(!printer.in_think_block);
1011        assert!(printer.think_buffer.is_empty());
1012    }
1013
1014    #[test]
1015    fn exit_command_helper_accepts_plain_and_slash_variants() {
1016        for command in ["quit", "exit", "/quit", "/exit"] {
1017            assert!(is_exit_command(command));
1018        }
1019
1020        assert!(!is_exit_command("hello"));
1021    }
1022
1023    #[tokio::test]
1024    async fn build_app_includes_health_route() {
1025        let app = test_launcher().build_app().expect("launcher app should build");
1026
1027        let response = app
1028            .oneshot(Request::builder().uri("/api/health").body(Body::empty()).unwrap())
1029            .await
1030            .expect("health request should succeed");
1031
1032        assert_eq!(response.status(), StatusCode::OK);
1033
1034        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1035        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1036        assert_eq!(json["status"], "healthy");
1037    }
1038
1039    #[tokio::test]
1040    async fn build_app_does_not_enable_a2a_routes_by_default() {
1041        let app = test_launcher().build_app().expect("launcher app should build");
1042
1043        let response = app
1044            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1045            .await
1046            .expect("agent card request should complete");
1047
1048        assert_eq!(response.status(), StatusCode::NOT_FOUND);
1049    }
1050
1051    #[tokio::test]
1052    async fn build_app_with_a2a_enables_agent_card_route() {
1053        let app = test_launcher()
1054            .build_app_with_a2a("http://localhost:8080")
1055            .expect("launcher app with a2a should build");
1056
1057        let response = app
1058            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1059            .await
1060            .expect("agent card request should complete");
1061
1062        assert_eq!(response.status(), StatusCode::OK);
1063
1064        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1065        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1066        assert_eq!(json["name"], "launcher_test_agent");
1067        assert_eq!(json["description"], "launcher test agent");
1068    }
1069}