Skip to main content

adk_cli/
launcher.rs

1//! Simple launcher for ADK agents with CLI support.
2//!
3//! Provides a one-liner to run agents with console or web server modes,
4//! similar to adk-go's launcher pattern.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use adk_cli::Launcher;
10//! use std::sync::Arc;
11//!
12//! #[tokio::main]
13//! async fn main() -> adk_core::Result<()> {
14//!     let agent = /* create your agent */;
15//!
16//!     // Run with CLI support (console by default, or `serve` for web)
17//!     Launcher::new(Arc::new(agent)).run().await
18//! }
19//! ```
20//!
21//! # CLI Usage
22//!
23//! ```bash
24//! # Interactive console (default)
25//! cargo run
26//!
27//! # Web server with UI
28//! cargo run -- serve
29//! cargo run -- serve --port 3000
30//! ```
31
32use adk_artifact::ArtifactService;
33use adk_core::{
34    Agent, CacheCapable, Content, ContextCacheConfig, EventsCompactionConfig, Memory, Part, Result,
35    RunConfig, SessionId, StreamingMode, UserId,
36};
37use adk_runner::Runner;
38use adk_server::{
39    RequestContextExtractor, SecurityConfig, ServerConfig, create_app, create_app_with_a2a,
40    shutdown_signal,
41};
42use adk_session::{CreateRequest, InMemorySessionService, SessionService};
43use axum::Router;
44use clap::{Parser, Subcommand};
45use futures::StreamExt;
46use rustyline::DefaultEditor;
47use serde_json::Value;
48use std::collections::HashMap;
49use std::io::{self, Write};
50use std::sync::Arc;
51use std::time::Duration;
52use tokio_util::sync::CancellationToken;
53use tracing::{error, warn};
54
55/// CLI arguments for the launcher.
56#[derive(Parser)]
57#[command(name = "agent")]
58#[command(about = "ADK Agent", long_about = None)]
59struct LauncherCli {
60    #[command(subcommand)]
61    command: Option<LauncherCommand>,
62}
63
64#[derive(Subcommand)]
65enum LauncherCommand {
66    /// Run interactive console (default if no command specified)
67    Chat,
68    /// Start web server with UI
69    Serve {
70        /// Server port
71        #[arg(long, default_value_t = 8080)]
72        port: u16,
73    },
74    /// Optimize agent instructions using an evaluation set
75    #[cfg(feature = "optimize")]
76    Optimize(crate::optimize::OptimizeArgs),
77}
78
79/// Controls how the console renders thinking/reasoning content.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
81pub enum ThinkingDisplayMode {
82    /// Show thinking when the model emits it.
83    #[default]
84    Auto,
85    /// Always surface emitted thinking.
86    Show,
87    /// Hide emitted thinking from the console output.
88    Hide,
89}
90
91/// Controls how serve mode initializes telemetry.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub enum TelemetryConfig {
94    /// Use the in-memory ADK span exporter for debug endpoints.
95    AdkExporter { service_name: String },
96    /// Export telemetry to an OTLP collector.
97    Otlp { service_name: String, endpoint: String },
98    /// Disable telemetry initialization in the launcher.
99    None,
100}
101
102impl Default for TelemetryConfig {
103    fn default() -> Self {
104        Self::AdkExporter { service_name: "adk-server".to_string() }
105    }
106}
107
108/// Launcher for running ADK agents with CLI support.
109///
110/// Provides console and web server modes out of the box.
111///
112/// # Console mode
113///
114/// Uses a `rustyline` REPL with history, Ctrl+C handling, and streaming
115/// output that renders `<think>` blocks, tool calls, and inline data.
116///
117/// # Serve mode
118///
119/// Starts an Axum HTTP server with the ADK web UI.
120///
121/// # Note on `with_streaming_mode`
122///
123/// `with_streaming_mode` currently only affects console mode. `ServerConfig`
124/// does not accept a `RunConfig`, so the setting is not forwarded to the
125/// server. This will be addressed when `ServerConfig` gains that field.
126pub struct Launcher {
127    agent: Arc<dyn Agent>,
128    app_name: Option<String>,
129    session_service: Option<Arc<dyn SessionService>>,
130    artifact_service: Option<Arc<dyn ArtifactService>>,
131    memory_service: Option<Arc<dyn Memory>>,
132    compaction_config: Option<EventsCompactionConfig>,
133    context_cache_config: Option<ContextCacheConfig>,
134    cache_capable: Option<Arc<dyn CacheCapable>>,
135    security_config: Option<SecurityConfig>,
136    request_context_extractor: Option<Arc<dyn RequestContextExtractor>>,
137    a2a_base_url: Option<String>,
138    telemetry_config: TelemetryConfig,
139    shutdown_grace_period: Duration,
140    run_config: Option<RunConfig>,
141    thinking_mode: ThinkingDisplayMode,
142}
143
144impl Launcher {
145    /// Create a new launcher with the given agent.
146    pub fn new(agent: Arc<dyn Agent>) -> Self {
147        Self {
148            agent,
149            app_name: None,
150            session_service: None,
151            artifact_service: None,
152            memory_service: None,
153            compaction_config: None,
154            context_cache_config: None,
155            cache_capable: None,
156            security_config: None,
157            request_context_extractor: None,
158            a2a_base_url: None,
159            telemetry_config: TelemetryConfig::default(),
160            shutdown_grace_period: Duration::from_secs(30),
161            run_config: None,
162            thinking_mode: ThinkingDisplayMode::Auto,
163        }
164    }
165
166    /// Set a custom application name (defaults to agent name).
167    pub fn app_name(mut self, name: impl Into<String>) -> Self {
168        self.app_name = Some(name.into());
169        self
170    }
171
172    /// Set a custom artifact service.
173    pub fn with_artifact_service(mut self, service: Arc<dyn ArtifactService>) -> Self {
174        self.artifact_service = Some(service);
175        self
176    }
177
178    /// Set a custom session service.
179    pub fn with_session_service(mut self, service: Arc<dyn SessionService>) -> Self {
180        self.session_service = Some(service);
181        self
182    }
183
184    /// Set a custom memory service.
185    pub fn with_memory_service(mut self, service: Arc<dyn Memory>) -> Self {
186        self.memory_service = Some(service);
187        self
188    }
189
190    /// Enable runner-level context compaction in serve mode.
191    pub fn with_compaction(mut self, config: EventsCompactionConfig) -> Self {
192        self.compaction_config = Some(config);
193        self
194    }
195
196    /// Enable automatic prompt cache lifecycle management in serve mode.
197    pub fn with_context_cache(
198        mut self,
199        config: ContextCacheConfig,
200        cache_capable: Arc<dyn CacheCapable>,
201    ) -> Self {
202        self.context_cache_config = Some(config);
203        self.cache_capable = Some(cache_capable);
204        self
205    }
206
207    /// Set custom server security settings.
208    pub fn with_security_config(mut self, config: SecurityConfig) -> Self {
209        self.security_config = Some(config);
210        self
211    }
212
213    /// Set a request context extractor for authenticated deployments.
214    pub fn with_request_context_extractor(
215        mut self,
216        extractor: Arc<dyn RequestContextExtractor>,
217    ) -> Self {
218        self.request_context_extractor = Some(extractor);
219        self
220    }
221
222    /// Enable A2A routes when building or serving the HTTP app.
223    pub fn with_a2a_base_url(mut self, base_url: impl Into<String>) -> Self {
224        self.a2a_base_url = Some(base_url.into());
225        self
226    }
227
228    /// Configure how serve mode initializes telemetry.
229    pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
230        self.telemetry_config = config;
231        self
232    }
233
234    /// Set the maximum graceful shutdown window for the web server.
235    pub fn with_shutdown_grace_period(mut self, grace_period: Duration) -> Self {
236        self.shutdown_grace_period = grace_period;
237        self
238    }
239
240    /// Set streaming mode for console mode.
241    ///
242    /// Note: this currently only affects console mode. The server does not
243    /// yet accept a `RunConfig`.
244    pub fn with_streaming_mode(mut self, mode: StreamingMode) -> Self {
245        self.run_config = Some(RunConfig::builder().streaming_mode(mode).build());
246        self
247    }
248
249    /// Control how emitted thinking content is rendered in console mode.
250    pub fn with_thinking_mode(mut self, mode: ThinkingDisplayMode) -> Self {
251        self.thinking_mode = mode;
252        self
253    }
254
255    /// Run the launcher, parsing CLI arguments.
256    ///
257    /// - No arguments or `chat`: Interactive console
258    /// - `serve [--port PORT]`: Web server with UI
259    pub async fn run(self) -> Result<()> {
260        let cli = LauncherCli::parse();
261
262        match cli.command.unwrap_or(LauncherCommand::Chat) {
263            LauncherCommand::Chat => self.run_console_directly().await,
264            LauncherCommand::Serve { port } => self.run_serve_directly(port).await,
265            #[cfg(feature = "optimize")]
266            LauncherCommand::Optimize(args) => {
267                tracing::info!(
268                    agent_path = %args.agent_path.display(),
269                    eval_set_path = %args.eval_set_path.display(),
270                    max_iterations = args.max_iterations,
271                    target = args.target,
272                    output = %args.output.display(),
273                    "starting prompt optimization"
274                );
275                // The actual optimization wiring is done by the caller
276                // who has access to the optimizer LLM and evaluator.
277                // This branch serves as the CLI entry point.
278                tracing::warn!(
279                    "adk optimize requires wiring via PromptOptimizer — use adk-eval directly"
280                );
281                Ok(())
282            }
283        }
284    }
285
286    /// Run in interactive console mode without parsing CLI arguments.
287    ///
288    /// Use this when you already know you want console mode (e.g. from
289    /// your own CLI parser). [`run`](Self::run) calls this internally.
290    pub async fn run_console_directly(self) -> Result<()> {
291        let app_name = self.app_name.unwrap_or_else(|| self.agent.name().to_string());
292        let user_id = "user".to_string();
293        let thinking_mode = self.thinking_mode;
294        let agent = self.agent;
295        let artifact_service = self.artifact_service;
296        let memory_service = self.memory_service;
297        let run_config = self.run_config;
298
299        let session_service =
300            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
301
302        let session = session_service
303            .create(CreateRequest {
304                app_name: app_name.clone(),
305                user_id: user_id.clone(),
306                session_id: None,
307                state: HashMap::new(),
308            })
309            .await?;
310
311        let session_id = session.id().to_string();
312
313        let mut rl = DefaultEditor::new()
314            .map_err(|e| adk_core::AdkError::config(format!("failed to init readline: {e}")))?;
315
316        print_banner(agent.name());
317
318        loop {
319            let readline = rl.readline("\x1b[36mYou >\x1b[0m ");
320            match readline {
321                Ok(line) => {
322                    let trimmed = line.trim().to_string();
323                    if trimmed.is_empty() {
324                        continue;
325                    }
326                    if is_exit_command(&trimmed) {
327                        println!("\nGoodbye.\n");
328                        break;
329                    }
330                    if trimmed == "/help" {
331                        print_help();
332                        continue;
333                    }
334                    if trimmed == "/clear" {
335                        println!("(conversation cleared — session state is unchanged)");
336                        continue;
337                    }
338
339                    let _ = rl.add_history_entry(&line);
340
341                    let user_content = Content::new("user").with_text(trimmed);
342                    println!();
343
344                    let cancellation_token = CancellationToken::new();
345                    let mut runner_builder = Runner::builder()
346                        .app_name(app_name.clone())
347                        .agent(agent.clone())
348                        .session_service(session_service.clone())
349                        .cancellation_token(cancellation_token.clone());
350                    if let Some(ref artifact_service) = artifact_service {
351                        runner_builder = runner_builder.artifact_service(artifact_service.clone());
352                    }
353                    if let Some(ref memory_service) = memory_service {
354                        runner_builder = runner_builder.memory_service(memory_service.clone());
355                    }
356                    if let Some(ref run_config) = run_config {
357                        runner_builder = runner_builder.run_config(run_config.clone());
358                    }
359                    let runner = runner_builder.build()?;
360                    let mut events = runner
361                        .run(
362                            UserId::new(user_id.clone())?,
363                            SessionId::new(session_id.clone())?,
364                            user_content,
365                        )
366                        .await?;
367                    let mut printer = StreamPrinter::new(thinking_mode);
368                    let mut current_agent = String::new();
369                    let mut printed_header = false;
370                    let mut interrupted = false;
371
372                    loop {
373                        tokio::select! {
374                            event = events.next() => {
375                                let Some(event) = event else {
376                                    break;
377                                };
378
379                                match event {
380                                    Ok(evt) => {
381                                        // Track agent switches in multi-agent workflows
382                                        if !evt.author.is_empty()
383                                            && evt.author != "user"
384                                            && evt.author != current_agent
385                                        {
386                                            if !current_agent.is_empty() {
387                                                println!();
388                                            }
389                                            current_agent = evt.author.clone();
390                                            // Only show agent label in multi-agent scenarios
391                                            if printed_header {
392                                                print!("\x1b[33m[{current_agent}]\x1b[0m ");
393                                                let _ = io::stdout().flush();
394                                            }
395                                            printed_header = true;
396                                        }
397
398                                        // Show agent transfer requests
399                                        if let Some(target) = &evt.actions.transfer_to_agent {
400                                            print!("\x1b[90m[transfer -> {target}]\x1b[0m ");
401                                            let _ = io::stdout().flush();
402                                        }
403
404                                        if let Some(content) = &evt.llm_response.content {
405                                            for part in &content.parts {
406                                                printer.handle_part(part);
407                                            }
408                                        }
409                                    }
410                                    Err(e) => {
411                                        error!("stream error: {e}");
412                                    }
413                                }
414                            }
415                            signal = tokio::signal::ctrl_c() => {
416                                match signal {
417                                    Ok(()) => {
418                                        cancellation_token.cancel();
419                                        interrupted = true;
420                                        break;
421                                    }
422                                    Err(err) => {
423                                        error!("failed to listen for Ctrl+C: {err}");
424                                    }
425                                }
426                            }
427                        }
428                    }
429
430                    printer.finish();
431                    if interrupted {
432                        println!("\nInterrupted.\n");
433                        continue;
434                    }
435
436                    println!("\n");
437                }
438                Err(rustyline::error::ReadlineError::Interrupted) => {
439                    println!("\nInterrupted. Type exit to quit.\n");
440                    continue;
441                }
442                Err(rustyline::error::ReadlineError::Eof) => {
443                    println!("\nGoodbye.\n");
444                    break;
445                }
446                Err(err) => {
447                    error!("readline error: {err}");
448                    break;
449                }
450            }
451        }
452
453        Ok(())
454    }
455
456    fn init_telemetry(&self) -> Option<Arc<adk_telemetry::AdkSpanExporter>> {
457        match &self.telemetry_config {
458            TelemetryConfig::AdkExporter { service_name } => {
459                match adk_telemetry::init_with_adk_exporter(service_name) {
460                    Ok(exporter) => Some(exporter),
461                    Err(e) => {
462                        warn!("failed to initialize telemetry: {e}");
463                        None
464                    }
465                }
466            }
467            TelemetryConfig::Otlp { service_name, endpoint } => {
468                #[cfg(feature = "telemetry-otlp")]
469                if let Err(e) = adk_telemetry::init_with_otlp(service_name, endpoint) {
470                    warn!("failed to initialize otlp telemetry: {e}");
471                }
472                #[cfg(not(feature = "telemetry-otlp"))]
473                warn!(
474                    service.name = %service_name,
475                    otlp.endpoint = %endpoint,
476                    "otlp telemetry requested but the telemetry-otlp feature is disabled"
477                );
478                None
479            }
480            TelemetryConfig::None => None,
481        }
482    }
483
484    fn into_server_config(
485        self,
486        span_exporter: Option<Arc<adk_telemetry::AdkSpanExporter>>,
487    ) -> ServerConfig {
488        let session_service =
489            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
490        let agent_loader = Arc::new(adk_core::SingleAgentLoader::new(self.agent));
491
492        let mut config = ServerConfig::new(agent_loader, session_service)
493            .with_artifact_service_opt(self.artifact_service);
494
495        if let Some(memory_service) = self.memory_service {
496            config = config.with_memory_service(memory_service);
497        }
498
499        if let Some(compaction_config) = self.compaction_config {
500            config = config.with_compaction(compaction_config);
501        }
502
503        if let (Some(context_cache_config), Some(cache_capable)) =
504            (self.context_cache_config, self.cache_capable)
505        {
506            config = config.with_context_cache(context_cache_config, cache_capable);
507        }
508
509        if let Some(security) = self.security_config {
510            config = config.with_security(security);
511        }
512
513        if let Some(extractor) = self.request_context_extractor {
514            config = config.with_request_context(extractor);
515        }
516
517        if let Some(exporter) = span_exporter {
518            config = config.with_span_exporter(exporter);
519        }
520
521        config
522    }
523
524    /// Build the Axum application without serving it.
525    ///
526    /// This is the production escape hatch for adding custom routes,
527    /// middleware, metrics, or owning the serve loop yourself.
528    pub fn build_app(self) -> Result<Router> {
529        let span_exporter = self.init_telemetry();
530        let a2a_base_url = self.a2a_base_url.clone();
531        let config = self.into_server_config(span_exporter);
532
533        Ok(match a2a_base_url {
534            Some(base_url) => create_app_with_a2a(config, Some(&base_url)),
535            None => create_app(config),
536        })
537    }
538
539    /// Build the Axum application with A2A routes enabled.
540    pub fn build_app_with_a2a(mut self, base_url: impl Into<String>) -> Result<Router> {
541        self.a2a_base_url = Some(base_url.into());
542        self.build_app()
543    }
544
545    /// Run web server without parsing CLI arguments.
546    ///
547    /// Use this when you already know you want serve mode (e.g. from
548    /// your own CLI parser). [`run`](Self::run) calls this internally.
549    pub async fn run_serve_directly(self, port: u16) -> Result<()> {
550        let app = self.build_app()?;
551
552        let addr = format!("0.0.0.0:{port}");
553        let listener = tokio::net::TcpListener::bind(&addr)
554            .await
555            .map_err(|e| adk_core::AdkError::config(format!("failed to bind {addr}: {e}")))?;
556
557        println!("ADK Server starting on http://localhost:{port}");
558        println!("Open http://localhost:{port} in your browser");
559        println!("Press Ctrl+C to stop\n");
560
561        axum::serve(listener, app)
562            .with_graceful_shutdown(shutdown_signal())
563            .await
564            .map_err(|e| adk_core::AdkError::config(format!("server error: {e}")))?;
565
566        Ok(())
567    }
568}
569
570/// Print the ADK-Rust welcome banner.
571fn print_banner(agent_name: &str) {
572    let version = env!("CARGO_PKG_VERSION");
573    let title = format!("ADK-Rust  v{version}");
574    let subtitle = "Rust Agent Development Kit";
575    // Box inner width = 49 (between the ║ chars)
576    let inner: usize = 49;
577    let pad_title = (inner.saturating_sub(title.len())) / 2;
578    let pad_subtitle = (inner.saturating_sub(subtitle.len())) / 2;
579
580    println!();
581    println!("    ╔{:═<inner$}╗", "");
582    println!(
583        "    ║{:>w$}{title}{:<r$}║",
584        "",
585        "",
586        w = pad_title,
587        r = inner - pad_title - title.len()
588    );
589    println!(
590        "    ║{:>w$}{subtitle}{:<r$}║",
591        "",
592        "",
593        w = pad_subtitle,
594        r = inner - pad_subtitle - subtitle.len()
595    );
596    println!("    ╚{:═<inner$}╝", "");
597    println!();
598    println!("  Agent    : {agent_name}");
599    println!("  Runtime  : Tokio async, streaming responses");
600    println!("  Features : tool calling, multi-provider, multi-agent, think blocks");
601    println!();
602    println!("  Type a message to chat. /help for commands.");
603    println!();
604}
605
606/// Print available REPL commands.
607fn print_help() {
608    println!();
609    println!("  Commands:");
610    println!("    /help   Show this help");
611    println!("    /clear  Clear display (session state is kept)");
612    println!("    quit    Exit the REPL");
613    println!("    exit    Exit the REPL");
614    println!("    /quit   Exit the REPL");
615    println!("    /exit   Exit the REPL");
616    println!();
617    println!("  Tips:");
618    println!("    - Up/Down arrows browse history");
619    println!("    - Ctrl+C interrupts the current operation");
620    println!("    - Multi-agent workflows show [agent_name] on handoff");
621    println!();
622}
623
624fn is_exit_command(input: &str) -> bool {
625    matches!(input, "quit" | "exit" | "/quit" | "/exit")
626}
627
628/// Handles streaming output with special rendering for think blocks,
629/// tool calls, function responses, and inline/file data.
630pub struct StreamPrinter {
631    thinking_mode: ThinkingDisplayMode,
632    in_think_block: bool,
633    in_thinking_part_stream: bool,
634    think_buffer: String,
635}
636
637impl StreamPrinter {
638    /// Create a printer with the selected thinking display mode.
639    pub fn new(thinking_mode: ThinkingDisplayMode) -> Self {
640        Self {
641            thinking_mode,
642            in_think_block: false,
643            in_thinking_part_stream: false,
644            think_buffer: String::new(),
645        }
646    }
647
648    /// Process a single response part, rendering it to stdout.
649    pub fn handle_part(&mut self, part: &Part) {
650        match part {
651            Part::Text { text } => {
652                self.flush_part_thinking_if_needed();
653                self.handle_text_chunk(text);
654            }
655            Part::Thinking { thinking, .. } => {
656                if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
657                    return;
658                }
659                if !self.in_thinking_part_stream {
660                    print!("\n[thinking] ");
661                    let _ = io::stdout().flush();
662                    self.in_thinking_part_stream = true;
663                }
664                self.think_buffer.push_str(thinking);
665                print!("{thinking}");
666                let _ = io::stdout().flush();
667            }
668            Part::FunctionCall { name, args, .. } => {
669                self.flush_pending_thinking();
670                print!("\n[tool-call] {name} {args}\n");
671                let _ = io::stdout().flush();
672            }
673            Part::FunctionResponse { function_response, .. } => {
674                self.flush_pending_thinking();
675                self.print_tool_response(&function_response.name, &function_response.response);
676            }
677            Part::InlineData { mime_type, data } => {
678                self.flush_pending_thinking();
679                print!("\n[inline-data] mime={mime_type} bytes={}\n", data.len());
680                let _ = io::stdout().flush();
681            }
682            Part::FileData { mime_type, file_uri } => {
683                self.flush_pending_thinking();
684                print!("\n[file-data] mime={mime_type} uri={file_uri}\n");
685                let _ = io::stdout().flush();
686            }
687            Part::ServerToolCall { server_tool_call } => {
688                self.flush_pending_thinking();
689                print!("\n[server-tool-call] {server_tool_call}\n");
690                let _ = io::stdout().flush();
691            }
692            Part::ServerToolResponse { server_tool_response } => {
693                self.flush_pending_thinking();
694                print!("\n[server-tool-response] {}B\n", server_tool_response.to_string().len());
695                let _ = io::stdout().flush();
696            }
697        }
698    }
699
700    fn handle_text_chunk(&mut self, chunk: &str) {
701        if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
702            let mut visible = String::with_capacity(chunk.len());
703            let mut remaining = chunk;
704
705            while let Some(start_idx) = remaining.find("<think>") {
706                visible.push_str(&remaining[..start_idx]);
707                let after_start = &remaining[start_idx + "<think>".len()..];
708                if let Some(end_idx) = after_start.find("</think>") {
709                    remaining = &after_start[end_idx + "</think>".len()..];
710                } else {
711                    remaining = "";
712                    break;
713                }
714            }
715
716            visible.push_str(remaining);
717            if !visible.is_empty() {
718                print!("{visible}");
719                let _ = io::stdout().flush();
720            }
721            return;
722        }
723
724        const THINK_START: &str = "<think>";
725        const THINK_END: &str = "</think>";
726
727        let mut remaining = chunk;
728
729        while !remaining.is_empty() {
730            if self.in_think_block {
731                if let Some(end_idx) = remaining.find(THINK_END) {
732                    self.think_buffer.push_str(&remaining[..end_idx]);
733                    self.flush_think();
734                    self.in_think_block = false;
735                    remaining = &remaining[end_idx + THINK_END.len()..];
736                } else {
737                    self.think_buffer.push_str(remaining);
738                    break;
739                }
740            } else if let Some(start_idx) = remaining.find(THINK_START) {
741                let visible = &remaining[..start_idx];
742                if !visible.is_empty() {
743                    print!("{visible}");
744                    let _ = io::stdout().flush();
745                }
746                self.in_think_block = true;
747                self.think_buffer.clear();
748                remaining = &remaining[start_idx + THINK_START.len()..];
749            } else {
750                print!("{remaining}");
751                let _ = io::stdout().flush();
752                break;
753            }
754        }
755    }
756
757    fn flush_think(&mut self) {
758        let content = self.think_buffer.trim();
759        if !content.is_empty() {
760            print!("\n[think] {content}\n");
761            let _ = io::stdout().flush();
762        }
763        self.think_buffer.clear();
764    }
765
766    /// Flush any pending think block content.
767    pub fn finish(&mut self) {
768        self.flush_pending_thinking();
769    }
770
771    fn print_tool_response(&self, name: &str, response: &Value) {
772        print!("\n[tool-response] {name} {response}\n");
773        let _ = io::stdout().flush();
774    }
775
776    fn flush_part_thinking_if_needed(&mut self) {
777        if self.in_thinking_part_stream {
778            println!();
779            let _ = io::stdout().flush();
780            self.think_buffer.clear();
781            self.in_thinking_part_stream = false;
782        }
783    }
784
785    fn flush_pending_thinking(&mut self) {
786        self.flush_part_thinking_if_needed();
787        if self.in_think_block {
788            self.flush_think_with_label("think");
789            self.in_think_block = false;
790        }
791    }
792
793    fn flush_think_with_label(&mut self, label: &str) {
794        let content = self.think_buffer.trim();
795        if !content.is_empty() {
796            print!("\n[{label}] {content}\n");
797            let _ = io::stdout().flush();
798        }
799        self.think_buffer.clear();
800    }
801}
802
803impl Default for StreamPrinter {
804    fn default() -> Self {
805        Self::new(ThinkingDisplayMode::Auto)
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812    use adk_core::{Agent, EventStream, InvocationContext, Result as AdkResult};
813    use async_trait::async_trait;
814    use axum::{
815        body::{Body, to_bytes},
816        http::{Request, StatusCode},
817    };
818    use futures::stream;
819    use std::sync::Arc;
820    use tower::ServiceExt;
821
822    struct TestAgent;
823
824    #[async_trait]
825    impl Agent for TestAgent {
826        fn name(&self) -> &str {
827            "launcher_test_agent"
828        }
829
830        fn description(&self) -> &str {
831            "launcher test agent"
832        }
833
834        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
835            &[]
836        }
837
838        async fn run(&self, _ctx: Arc<dyn InvocationContext>) -> AdkResult<EventStream> {
839            Ok(Box::pin(stream::empty()))
840        }
841    }
842
843    fn test_launcher() -> Launcher {
844        Launcher::new(Arc::new(TestAgent)).with_telemetry(TelemetryConfig::None)
845    }
846
847    #[test]
848    fn stream_printer_tracks_think_block_state() {
849        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
850        assert!(!printer.in_think_block);
851
852        // Opening a think block sets the flag
853        printer.handle_text_chunk("<think>reasoning");
854        assert!(printer.in_think_block);
855        assert_eq!(printer.think_buffer, "reasoning");
856
857        // Closing the think block clears the flag and buffer
858        printer.handle_text_chunk(" more</think>visible");
859        assert!(!printer.in_think_block);
860        assert!(printer.think_buffer.is_empty());
861    }
862
863    #[test]
864    fn stream_printer_handles_think_block_across_chunks() {
865        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
866
867        printer.handle_text_chunk("before<think>start");
868        assert!(printer.in_think_block);
869        assert_eq!(printer.think_buffer, "start");
870
871        printer.handle_text_chunk(" middle");
872        assert!(printer.in_think_block);
873        assert_eq!(printer.think_buffer, "start middle");
874
875        printer.handle_text_chunk(" end</think>after");
876        assert!(!printer.in_think_block);
877        assert!(printer.think_buffer.is_empty());
878    }
879
880    #[test]
881    fn stream_printer_finish_flushes_open_think_block() {
882        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
883
884        printer.handle_text_chunk("<think>unclosed reasoning");
885        assert!(printer.in_think_block);
886
887        printer.finish();
888        assert!(!printer.in_think_block);
889        assert!(printer.think_buffer.is_empty());
890    }
891
892    #[test]
893    fn stream_printer_finish_is_noop_when_no_think_block() {
894        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
895        printer.finish();
896        assert!(!printer.in_think_block);
897        assert!(printer.think_buffer.is_empty());
898    }
899
900    #[test]
901    fn stream_printer_handles_multiple_think_blocks_in_one_chunk() {
902        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
903
904        printer.handle_text_chunk("a<think>first</think>b<think>second</think>c");
905        assert!(!printer.in_think_block);
906        assert!(printer.think_buffer.is_empty());
907    }
908
909    #[test]
910    fn stream_printer_handles_empty_think_block() {
911        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
912
913        printer.handle_text_chunk("<think></think>after");
914        assert!(!printer.in_think_block);
915        assert!(printer.think_buffer.is_empty());
916    }
917
918    #[test]
919    fn stream_printer_handles_all_part_types() {
920        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
921
922        // Text
923        printer.handle_part(&Part::Text { text: "hello".into() });
924        assert!(!printer.in_think_block);
925
926        // Thinking
927        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
928        assert!(printer.in_thinking_part_stream);
929
930        // FunctionCall
931        printer.handle_part(&Part::FunctionCall {
932            name: "get_weather".into(),
933            args: serde_json::json!({"city": "Seattle"}),
934            id: None,
935            thought_signature: None,
936        });
937
938        // FunctionResponse
939        printer.handle_part(&Part::FunctionResponse {
940            function_response: adk_core::FunctionResponseData::new(
941                "get_weather",
942                serde_json::json!({"temp": 72}),
943            ),
944            id: None,
945        });
946
947        // InlineData
948        printer
949            .handle_part(&Part::InlineData { mime_type: "image/png".into(), data: vec![0u8; 100] });
950
951        // FileData
952        printer.handle_part(&Part::FileData {
953            mime_type: "audio/wav".into(),
954            file_uri: "gs://bucket/file.wav".into(),
955        });
956
957        // No panics, no state corruption
958        assert!(!printer.in_think_block);
959        assert!(!printer.in_thinking_part_stream);
960    }
961
962    #[test]
963    fn stream_printer_text_without_think_tags_leaves_state_clean() {
964        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
965        printer.handle_text_chunk("just plain text with no tags");
966        assert!(!printer.in_think_block);
967        assert!(printer.think_buffer.is_empty());
968    }
969
970    #[test]
971    fn stream_printer_coalesces_streamed_thinking_parts() {
972        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
973
974        printer.handle_part(&Part::Thinking { thinking: "Okay".into(), signature: None });
975        printer.handle_part(&Part::Thinking { thinking: ", the".into(), signature: None });
976        printer.handle_part(&Part::Thinking { thinking: " user".into(), signature: None });
977
978        assert!(printer.in_thinking_part_stream);
979        assert_eq!(printer.think_buffer, "Okay, the user");
980
981        printer.handle_part(&Part::Text { text: "hello".into() });
982
983        assert!(!printer.in_thinking_part_stream);
984        assert!(printer.think_buffer.is_empty());
985    }
986
987    #[test]
988    fn stream_printer_finish_closes_streamed_thinking_state() {
989        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
990
991        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
992        assert!(printer.in_thinking_part_stream);
993
994        printer.finish();
995
996        assert!(!printer.in_thinking_part_stream);
997        assert!(printer.think_buffer.is_empty());
998    }
999
1000    #[test]
1001    fn stream_printer_hide_mode_ignores_emitted_thinking_state() {
1002        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
1003
1004        printer.handle_part(&Part::Thinking { thinking: "secret".into(), signature: None });
1005
1006        assert!(!printer.in_thinking_part_stream);
1007        assert!(printer.think_buffer.is_empty());
1008    }
1009
1010    #[test]
1011    fn stream_printer_hide_mode_drops_think_tags_from_text() {
1012        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
1013
1014        printer.handle_text_chunk("visible<think>hidden</think>after");
1015
1016        assert!(!printer.in_think_block);
1017        assert!(printer.think_buffer.is_empty());
1018    }
1019
1020    #[test]
1021    fn exit_command_helper_accepts_plain_and_slash_variants() {
1022        for command in ["quit", "exit", "/quit", "/exit"] {
1023            assert!(is_exit_command(command));
1024        }
1025
1026        assert!(!is_exit_command("hello"));
1027    }
1028
1029    #[tokio::test]
1030    async fn build_app_includes_health_route() {
1031        let app = test_launcher().build_app().expect("launcher app should build");
1032
1033        let response = app
1034            .oneshot(Request::builder().uri("/api/health").body(Body::empty()).unwrap())
1035            .await
1036            .expect("health request should succeed");
1037
1038        assert_eq!(response.status(), StatusCode::OK);
1039
1040        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1041        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1042        assert_eq!(json["status"], "healthy");
1043    }
1044
1045    #[tokio::test]
1046    async fn build_app_does_not_enable_a2a_routes_by_default() {
1047        let app = test_launcher().build_app().expect("launcher app should build");
1048
1049        let response = app
1050            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1051            .await
1052            .expect("agent card request should complete");
1053
1054        assert_eq!(response.status(), StatusCode::NOT_FOUND);
1055    }
1056
1057    #[tokio::test]
1058    async fn build_app_with_a2a_enables_agent_card_route() {
1059        let app = test_launcher()
1060            .build_app_with_a2a("http://localhost:8080")
1061            .expect("launcher app with a2a should build");
1062
1063        let response = app
1064            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1065            .await
1066            .expect("agent card request should complete");
1067
1068        assert_eq!(response.status(), StatusCode::OK);
1069
1070        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1071        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1072        assert_eq!(json["name"], "launcher_test_agent");
1073        assert_eq!(json["description"], "launcher test agent");
1074    }
1075}