Skip to main content

adk_cli/
launcher.rs

1//! Simple launcher for ADK agents with CLI support.
2//!
3//! Provides a one-liner to run agents with console or web server modes,
4//! similar to adk-go's launcher pattern.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use adk_cli::Launcher;
10//! use std::sync::Arc;
11//!
12//! #[tokio::main]
13//! async fn main() -> adk_core::Result<()> {
14//!     let agent = /* create your agent */;
15//!
16//!     // Run with CLI support (console by default, or `serve` for web)
17//!     Launcher::new(Arc::new(agent)).run().await
18//! }
19//! ```
20//!
21//! # CLI Usage
22//!
23//! ```bash
24//! # Interactive console (default)
25//! cargo run
26//!
27//! # Web server with UI
28//! cargo run -- serve
29//! cargo run -- serve --port 3000
30//! ```
31
32use adk_artifact::ArtifactService;
33use adk_core::{
34    Agent, CacheCapable, Content, ContextCacheConfig, EventsCompactionConfig, Memory, Part, Result,
35    RunConfig, SessionId, StreamingMode, UserId,
36};
37use adk_runner::{Runner, RunnerConfig};
38use adk_server::{
39    RequestContextExtractor, SecurityConfig, ServerConfig, create_app, create_app_with_a2a,
40    shutdown_signal,
41};
42use adk_session::{CreateRequest, InMemorySessionService, SessionService};
43use axum::Router;
44use clap::{Parser, Subcommand};
45use futures::StreamExt;
46use rustyline::DefaultEditor;
47use serde_json::Value;
48use std::collections::HashMap;
49use std::io::{self, Write};
50use std::sync::Arc;
51use std::time::Duration;
52use tokio_util::sync::CancellationToken;
53use tracing::{error, warn};
54
55/// CLI arguments for the launcher.
56#[derive(Parser)]
57#[command(name = "agent")]
58#[command(about = "ADK Agent", long_about = None)]
59struct LauncherCli {
60    #[command(subcommand)]
61    command: Option<LauncherCommand>,
62}
63
64#[derive(Subcommand)]
65enum LauncherCommand {
66    /// Run interactive console (default if no command specified)
67    Chat,
68    /// Start web server with UI
69    Serve {
70        /// Server port
71        #[arg(long, default_value_t = 8080)]
72        port: u16,
73    },
74    /// Optimize agent instructions using an evaluation set
75    #[cfg(feature = "optimize")]
76    Optimize(crate::optimize::OptimizeArgs),
77}
78
79/// Controls how the console renders thinking/reasoning content.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
81pub enum ThinkingDisplayMode {
82    /// Show thinking when the model emits it.
83    #[default]
84    Auto,
85    /// Always surface emitted thinking.
86    Show,
87    /// Hide emitted thinking from the console output.
88    Hide,
89}
90
91/// Controls how serve mode initializes telemetry.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub enum TelemetryConfig {
94    /// Use the in-memory ADK span exporter for debug endpoints.
95    AdkExporter { service_name: String },
96    /// Export telemetry to an OTLP collector.
97    Otlp { service_name: String, endpoint: String },
98    /// Disable telemetry initialization in the launcher.
99    None,
100}
101
102impl Default for TelemetryConfig {
103    fn default() -> Self {
104        Self::AdkExporter { service_name: "adk-server".to_string() }
105    }
106}
107
108/// Launcher for running ADK agents with CLI support.
109///
110/// Provides console and web server modes out of the box.
111///
112/// # Console mode
113///
114/// Uses a `rustyline` REPL with history, Ctrl+C handling, and streaming
115/// output that renders `<think>` blocks, tool calls, and inline data.
116///
117/// # Serve mode
118///
119/// Starts an Axum HTTP server with the ADK web UI.
120///
121/// # Note on `with_streaming_mode`
122///
123/// `with_streaming_mode` currently only affects console mode. `ServerConfig`
124/// does not accept a `RunConfig`, so the setting is not forwarded to the
125/// server. This will be addressed when `ServerConfig` gains that field.
126pub struct Launcher {
127    agent: Arc<dyn Agent>,
128    app_name: Option<String>,
129    session_service: Option<Arc<dyn SessionService>>,
130    artifact_service: Option<Arc<dyn ArtifactService>>,
131    memory_service: Option<Arc<dyn Memory>>,
132    compaction_config: Option<EventsCompactionConfig>,
133    context_cache_config: Option<ContextCacheConfig>,
134    cache_capable: Option<Arc<dyn CacheCapable>>,
135    security_config: Option<SecurityConfig>,
136    request_context_extractor: Option<Arc<dyn RequestContextExtractor>>,
137    a2a_base_url: Option<String>,
138    telemetry_config: TelemetryConfig,
139    shutdown_grace_period: Duration,
140    run_config: Option<RunConfig>,
141    thinking_mode: ThinkingDisplayMode,
142}
143
144impl Launcher {
145    /// Create a new launcher with the given agent.
146    pub fn new(agent: Arc<dyn Agent>) -> Self {
147        Self {
148            agent,
149            app_name: None,
150            session_service: None,
151            artifact_service: None,
152            memory_service: None,
153            compaction_config: None,
154            context_cache_config: None,
155            cache_capable: None,
156            security_config: None,
157            request_context_extractor: None,
158            a2a_base_url: None,
159            telemetry_config: TelemetryConfig::default(),
160            shutdown_grace_period: Duration::from_secs(30),
161            run_config: None,
162            thinking_mode: ThinkingDisplayMode::Auto,
163        }
164    }
165
166    /// Set a custom application name (defaults to agent name).
167    pub fn app_name(mut self, name: impl Into<String>) -> Self {
168        self.app_name = Some(name.into());
169        self
170    }
171
172    /// Set a custom artifact service.
173    pub fn with_artifact_service(mut self, service: Arc<dyn ArtifactService>) -> Self {
174        self.artifact_service = Some(service);
175        self
176    }
177
178    /// Set a custom session service.
179    pub fn with_session_service(mut self, service: Arc<dyn SessionService>) -> Self {
180        self.session_service = Some(service);
181        self
182    }
183
184    /// Set a custom memory service.
185    pub fn with_memory_service(mut self, service: Arc<dyn Memory>) -> Self {
186        self.memory_service = Some(service);
187        self
188    }
189
190    /// Enable runner-level context compaction in serve mode.
191    pub fn with_compaction(mut self, config: EventsCompactionConfig) -> Self {
192        self.compaction_config = Some(config);
193        self
194    }
195
196    /// Enable automatic prompt cache lifecycle management in serve mode.
197    pub fn with_context_cache(
198        mut self,
199        config: ContextCacheConfig,
200        cache_capable: Arc<dyn CacheCapable>,
201    ) -> Self {
202        self.context_cache_config = Some(config);
203        self.cache_capable = Some(cache_capable);
204        self
205    }
206
207    /// Set custom server security settings.
208    pub fn with_security_config(mut self, config: SecurityConfig) -> Self {
209        self.security_config = Some(config);
210        self
211    }
212
213    /// Set a request context extractor for authenticated deployments.
214    pub fn with_request_context_extractor(
215        mut self,
216        extractor: Arc<dyn RequestContextExtractor>,
217    ) -> Self {
218        self.request_context_extractor = Some(extractor);
219        self
220    }
221
222    /// Enable A2A routes when building or serving the HTTP app.
223    pub fn with_a2a_base_url(mut self, base_url: impl Into<String>) -> Self {
224        self.a2a_base_url = Some(base_url.into());
225        self
226    }
227
228    /// Configure how serve mode initializes telemetry.
229    pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
230        self.telemetry_config = config;
231        self
232    }
233
234    /// Set the maximum graceful shutdown window for the web server.
235    pub fn with_shutdown_grace_period(mut self, grace_period: Duration) -> Self {
236        self.shutdown_grace_period = grace_period;
237        self
238    }
239
240    /// Set streaming mode for console mode.
241    ///
242    /// Note: this currently only affects console mode. The server does not
243    /// yet accept a `RunConfig`.
244    pub fn with_streaming_mode(mut self, mode: StreamingMode) -> Self {
245        self.run_config = Some(RunConfig { streaming_mode: mode, ..RunConfig::default() });
246        self
247    }
248
249    /// Control how emitted thinking content is rendered in console mode.
250    pub fn with_thinking_mode(mut self, mode: ThinkingDisplayMode) -> Self {
251        self.thinking_mode = mode;
252        self
253    }
254
255    /// Run the launcher, parsing CLI arguments.
256    ///
257    /// - No arguments or `chat`: Interactive console
258    /// - `serve [--port PORT]`: Web server with UI
259    pub async fn run(self) -> Result<()> {
260        let cli = LauncherCli::parse();
261
262        match cli.command.unwrap_or(LauncherCommand::Chat) {
263            LauncherCommand::Chat => self.run_console_directly().await,
264            LauncherCommand::Serve { port } => self.run_serve_directly(port).await,
265            #[cfg(feature = "optimize")]
266            LauncherCommand::Optimize(args) => {
267                tracing::info!(
268                    agent_path = %args.agent_path.display(),
269                    eval_set_path = %args.eval_set_path.display(),
270                    max_iterations = args.max_iterations,
271                    target = args.target,
272                    output = %args.output.display(),
273                    "starting prompt optimization"
274                );
275                // The actual optimization wiring is done by the caller
276                // who has access to the optimizer LLM and evaluator.
277                // This branch serves as the CLI entry point.
278                tracing::warn!(
279                    "adk optimize requires wiring via PromptOptimizer — use adk-eval directly"
280                );
281                Ok(())
282            }
283        }
284    }
285
286    /// Run in interactive console mode without parsing CLI arguments.
287    ///
288    /// Use this when you already know you want console mode (e.g. from
289    /// your own CLI parser). [`run`](Self::run) calls this internally.
290    pub async fn run_console_directly(self) -> Result<()> {
291        let app_name = self.app_name.unwrap_or_else(|| self.agent.name().to_string());
292        let user_id = "user".to_string();
293        let thinking_mode = self.thinking_mode;
294        let agent = self.agent;
295        let artifact_service = self.artifact_service;
296        let memory_service = self.memory_service;
297        let run_config = self.run_config;
298
299        let session_service =
300            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
301
302        let session = session_service
303            .create(CreateRequest {
304                app_name: app_name.clone(),
305                user_id: user_id.clone(),
306                session_id: None,
307                state: HashMap::new(),
308            })
309            .await?;
310
311        let session_id = session.id().to_string();
312
313        let mut rl = DefaultEditor::new()
314            .map_err(|e| adk_core::AdkError::config(format!("failed to init readline: {e}")))?;
315
316        print_banner(agent.name());
317
318        loop {
319            let readline = rl.readline("\x1b[36mYou >\x1b[0m ");
320            match readline {
321                Ok(line) => {
322                    let trimmed = line.trim().to_string();
323                    if trimmed.is_empty() {
324                        continue;
325                    }
326                    if is_exit_command(&trimmed) {
327                        println!("\nGoodbye.\n");
328                        break;
329                    }
330                    if trimmed == "/help" {
331                        print_help();
332                        continue;
333                    }
334                    if trimmed == "/clear" {
335                        println!("(conversation cleared — session state is unchanged)");
336                        continue;
337                    }
338
339                    let _ = rl.add_history_entry(&line);
340
341                    let user_content = Content::new("user").with_text(trimmed);
342                    println!();
343
344                    let cancellation_token = CancellationToken::new();
345                    let runner = Runner::new(RunnerConfig {
346                        app_name: app_name.clone(),
347                        agent: agent.clone(),
348                        session_service: session_service.clone(),
349                        artifact_service: artifact_service.clone(),
350                        memory_service: memory_service.clone(),
351                        plugin_manager: None,
352                        run_config: run_config.clone(),
353                        compaction_config: None,
354                        context_cache_config: None,
355                        cache_capable: None,
356                        request_context: None,
357                        cancellation_token: Some(cancellation_token.clone()),
358                        intra_compaction_config: None,
359                        intra_compaction_summarizer: None,
360                    })?;
361                    let mut events = runner
362                        .run(
363                            UserId::new(user_id.clone())?,
364                            SessionId::new(session_id.clone())?,
365                            user_content,
366                        )
367                        .await?;
368                    let mut printer = StreamPrinter::new(thinking_mode);
369                    let mut current_agent = String::new();
370                    let mut printed_header = false;
371                    let mut interrupted = false;
372
373                    loop {
374                        tokio::select! {
375                            event = events.next() => {
376                                let Some(event) = event else {
377                                    break;
378                                };
379
380                                match event {
381                                    Ok(evt) => {
382                                        // Track agent switches in multi-agent workflows
383                                        if !evt.author.is_empty()
384                                            && evt.author != "user"
385                                            && evt.author != current_agent
386                                        {
387                                            if !current_agent.is_empty() {
388                                                println!();
389                                            }
390                                            current_agent = evt.author.clone();
391                                            // Only show agent label in multi-agent scenarios
392                                            if printed_header {
393                                                print!("\x1b[33m[{current_agent}]\x1b[0m ");
394                                                let _ = io::stdout().flush();
395                                            }
396                                            printed_header = true;
397                                        }
398
399                                        // Show agent transfer requests
400                                        if let Some(target) = &evt.actions.transfer_to_agent {
401                                            print!("\x1b[90m[transfer -> {target}]\x1b[0m ");
402                                            let _ = io::stdout().flush();
403                                        }
404
405                                        if let Some(content) = &evt.llm_response.content {
406                                            for part in &content.parts {
407                                                printer.handle_part(part);
408                                            }
409                                        }
410                                    }
411                                    Err(e) => {
412                                        error!("stream error: {e}");
413                                    }
414                                }
415                            }
416                            signal = tokio::signal::ctrl_c() => {
417                                match signal {
418                                    Ok(()) => {
419                                        cancellation_token.cancel();
420                                        interrupted = true;
421                                        break;
422                                    }
423                                    Err(err) => {
424                                        error!("failed to listen for Ctrl+C: {err}");
425                                    }
426                                }
427                            }
428                        }
429                    }
430
431                    printer.finish();
432                    if interrupted {
433                        println!("\nInterrupted.\n");
434                        continue;
435                    }
436
437                    println!("\n");
438                }
439                Err(rustyline::error::ReadlineError::Interrupted) => {
440                    println!("\nInterrupted. Type exit to quit.\n");
441                    continue;
442                }
443                Err(rustyline::error::ReadlineError::Eof) => {
444                    println!("\nGoodbye.\n");
445                    break;
446                }
447                Err(err) => {
448                    error!("readline error: {err}");
449                    break;
450                }
451            }
452        }
453
454        Ok(())
455    }
456
457    fn init_telemetry(&self) -> Option<Arc<adk_telemetry::AdkSpanExporter>> {
458        match &self.telemetry_config {
459            TelemetryConfig::AdkExporter { service_name } => {
460                match adk_telemetry::init_with_adk_exporter(service_name) {
461                    Ok(exporter) => Some(exporter),
462                    Err(e) => {
463                        warn!("failed to initialize telemetry: {e}");
464                        None
465                    }
466                }
467            }
468            TelemetryConfig::Otlp { service_name, endpoint } => {
469                #[cfg(feature = "telemetry-otlp")]
470                if let Err(e) = adk_telemetry::init_with_otlp(service_name, endpoint) {
471                    warn!("failed to initialize otlp telemetry: {e}");
472                }
473                #[cfg(not(feature = "telemetry-otlp"))]
474                warn!(
475                    service.name = %service_name,
476                    otlp.endpoint = %endpoint,
477                    "otlp telemetry requested but the telemetry-otlp feature is disabled"
478                );
479                None
480            }
481            TelemetryConfig::None => None,
482        }
483    }
484
485    fn into_server_config(
486        self,
487        span_exporter: Option<Arc<adk_telemetry::AdkSpanExporter>>,
488    ) -> ServerConfig {
489        let session_service =
490            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
491        let agent_loader = Arc::new(adk_core::SingleAgentLoader::new(self.agent));
492
493        let mut config = ServerConfig::new(agent_loader, session_service)
494            .with_artifact_service_opt(self.artifact_service);
495
496        if let Some(memory_service) = self.memory_service {
497            config = config.with_memory_service(memory_service);
498        }
499
500        if let Some(compaction_config) = self.compaction_config {
501            config = config.with_compaction(compaction_config);
502        }
503
504        if let (Some(context_cache_config), Some(cache_capable)) =
505            (self.context_cache_config, self.cache_capable)
506        {
507            config = config.with_context_cache(context_cache_config, cache_capable);
508        }
509
510        if let Some(security) = self.security_config {
511            config = config.with_security(security);
512        }
513
514        if let Some(extractor) = self.request_context_extractor {
515            config = config.with_request_context(extractor);
516        }
517
518        if let Some(exporter) = span_exporter {
519            config = config.with_span_exporter(exporter);
520        }
521
522        config
523    }
524
525    /// Build the Axum application without serving it.
526    ///
527    /// This is the production escape hatch for adding custom routes,
528    /// middleware, metrics, or owning the serve loop yourself.
529    pub fn build_app(self) -> Result<Router> {
530        let span_exporter = self.init_telemetry();
531        let a2a_base_url = self.a2a_base_url.clone();
532        let config = self.into_server_config(span_exporter);
533
534        Ok(match a2a_base_url {
535            Some(base_url) => create_app_with_a2a(config, Some(&base_url)),
536            None => create_app(config),
537        })
538    }
539
540    /// Build the Axum application with A2A routes enabled.
541    pub fn build_app_with_a2a(mut self, base_url: impl Into<String>) -> Result<Router> {
542        self.a2a_base_url = Some(base_url.into());
543        self.build_app()
544    }
545
546    /// Run web server without parsing CLI arguments.
547    ///
548    /// Use this when you already know you want serve mode (e.g. from
549    /// your own CLI parser). [`run`](Self::run) calls this internally.
550    pub async fn run_serve_directly(self, port: u16) -> Result<()> {
551        let app = self.build_app()?;
552
553        let addr = format!("0.0.0.0:{port}");
554        let listener = tokio::net::TcpListener::bind(&addr)
555            .await
556            .map_err(|e| adk_core::AdkError::config(format!("failed to bind {addr}: {e}")))?;
557
558        println!("ADK Server starting on http://localhost:{port}");
559        println!("Open http://localhost:{port} in your browser");
560        println!("Press Ctrl+C to stop\n");
561
562        axum::serve(listener, app)
563            .with_graceful_shutdown(shutdown_signal())
564            .await
565            .map_err(|e| adk_core::AdkError::config(format!("server error: {e}")))?;
566
567        Ok(())
568    }
569}
570
571/// Print the ADK-Rust welcome banner.
572fn print_banner(agent_name: &str) {
573    let version = env!("CARGO_PKG_VERSION");
574    let title = format!("ADK-Rust  v{version}");
575    let subtitle = "Rust Agent Development Kit";
576    // Box inner width = 49 (between the ║ chars)
577    let inner: usize = 49;
578    let pad_title = (inner.saturating_sub(title.len())) / 2;
579    let pad_subtitle = (inner.saturating_sub(subtitle.len())) / 2;
580
581    println!();
582    println!("    ╔{:═<inner$}╗", "");
583    println!(
584        "    ║{:>w$}{title}{:<r$}║",
585        "",
586        "",
587        w = pad_title,
588        r = inner - pad_title - title.len()
589    );
590    println!(
591        "    ║{:>w$}{subtitle}{:<r$}║",
592        "",
593        "",
594        w = pad_subtitle,
595        r = inner - pad_subtitle - subtitle.len()
596    );
597    println!("    ╚{:═<inner$}╝", "");
598    println!();
599    println!("  Agent    : {agent_name}");
600    println!("  Runtime  : Tokio async, streaming responses");
601    println!("  Features : tool calling, multi-provider, multi-agent, think blocks");
602    println!();
603    println!("  Type a message to chat. /help for commands.");
604    println!();
605}
606
607/// Print available REPL commands.
608fn print_help() {
609    println!();
610    println!("  Commands:");
611    println!("    /help   Show this help");
612    println!("    /clear  Clear display (session state is kept)");
613    println!("    quit    Exit the REPL");
614    println!("    exit    Exit the REPL");
615    println!("    /quit   Exit the REPL");
616    println!("    /exit   Exit the REPL");
617    println!();
618    println!("  Tips:");
619    println!("    - Up/Down arrows browse history");
620    println!("    - Ctrl+C interrupts the current operation");
621    println!("    - Multi-agent workflows show [agent_name] on handoff");
622    println!();
623}
624
625fn is_exit_command(input: &str) -> bool {
626    matches!(input, "quit" | "exit" | "/quit" | "/exit")
627}
628
629/// Handles streaming output with special rendering for think blocks,
630/// tool calls, function responses, and inline/file data.
631pub struct StreamPrinter {
632    thinking_mode: ThinkingDisplayMode,
633    in_think_block: bool,
634    in_thinking_part_stream: bool,
635    think_buffer: String,
636}
637
638impl StreamPrinter {
639    /// Create a printer with the selected thinking display mode.
640    pub fn new(thinking_mode: ThinkingDisplayMode) -> Self {
641        Self {
642            thinking_mode,
643            in_think_block: false,
644            in_thinking_part_stream: false,
645            think_buffer: String::new(),
646        }
647    }
648
649    /// Process a single response part, rendering it to stdout.
650    pub fn handle_part(&mut self, part: &Part) {
651        match part {
652            Part::Text { text } => {
653                self.flush_part_thinking_if_needed();
654                self.handle_text_chunk(text);
655            }
656            Part::Thinking { thinking, .. } => {
657                if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
658                    return;
659                }
660                if !self.in_thinking_part_stream {
661                    print!("\n[thinking] ");
662                    let _ = io::stdout().flush();
663                    self.in_thinking_part_stream = true;
664                }
665                self.think_buffer.push_str(thinking);
666                print!("{thinking}");
667                let _ = io::stdout().flush();
668            }
669            Part::FunctionCall { name, args, .. } => {
670                self.flush_pending_thinking();
671                print!("\n[tool-call] {name} {args}\n");
672                let _ = io::stdout().flush();
673            }
674            Part::FunctionResponse { function_response, .. } => {
675                self.flush_pending_thinking();
676                self.print_tool_response(&function_response.name, &function_response.response);
677            }
678            Part::InlineData { mime_type, data } => {
679                self.flush_pending_thinking();
680                print!("\n[inline-data] mime={mime_type} bytes={}\n", data.len());
681                let _ = io::stdout().flush();
682            }
683            Part::FileData { mime_type, file_uri } => {
684                self.flush_pending_thinking();
685                print!("\n[file-data] mime={mime_type} uri={file_uri}\n");
686                let _ = io::stdout().flush();
687            }
688            Part::ServerToolCall { server_tool_call } => {
689                self.flush_pending_thinking();
690                print!("\n[server-tool-call] {server_tool_call}\n");
691                let _ = io::stdout().flush();
692            }
693            Part::ServerToolResponse { server_tool_response } => {
694                self.flush_pending_thinking();
695                print!("\n[server-tool-response] {}B\n", server_tool_response.to_string().len());
696                let _ = io::stdout().flush();
697            }
698        }
699    }
700
701    fn handle_text_chunk(&mut self, chunk: &str) {
702        if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
703            let mut visible = String::with_capacity(chunk.len());
704            let mut remaining = chunk;
705
706            while let Some(start_idx) = remaining.find("<think>") {
707                visible.push_str(&remaining[..start_idx]);
708                let after_start = &remaining[start_idx + "<think>".len()..];
709                if let Some(end_idx) = after_start.find("</think>") {
710                    remaining = &after_start[end_idx + "</think>".len()..];
711                } else {
712                    remaining = "";
713                    break;
714                }
715            }
716
717            visible.push_str(remaining);
718            if !visible.is_empty() {
719                print!("{visible}");
720                let _ = io::stdout().flush();
721            }
722            return;
723        }
724
725        const THINK_START: &str = "<think>";
726        const THINK_END: &str = "</think>";
727
728        let mut remaining = chunk;
729
730        while !remaining.is_empty() {
731            if self.in_think_block {
732                if let Some(end_idx) = remaining.find(THINK_END) {
733                    self.think_buffer.push_str(&remaining[..end_idx]);
734                    self.flush_think();
735                    self.in_think_block = false;
736                    remaining = &remaining[end_idx + THINK_END.len()..];
737                } else {
738                    self.think_buffer.push_str(remaining);
739                    break;
740                }
741            } else if let Some(start_idx) = remaining.find(THINK_START) {
742                let visible = &remaining[..start_idx];
743                if !visible.is_empty() {
744                    print!("{visible}");
745                    let _ = io::stdout().flush();
746                }
747                self.in_think_block = true;
748                self.think_buffer.clear();
749                remaining = &remaining[start_idx + THINK_START.len()..];
750            } else {
751                print!("{remaining}");
752                let _ = io::stdout().flush();
753                break;
754            }
755        }
756    }
757
758    fn flush_think(&mut self) {
759        let content = self.think_buffer.trim();
760        if !content.is_empty() {
761            print!("\n[think] {content}\n");
762            let _ = io::stdout().flush();
763        }
764        self.think_buffer.clear();
765    }
766
767    /// Flush any pending think block content.
768    pub fn finish(&mut self) {
769        self.flush_pending_thinking();
770    }
771
772    fn print_tool_response(&self, name: &str, response: &Value) {
773        print!("\n[tool-response] {name} {response}\n");
774        let _ = io::stdout().flush();
775    }
776
777    fn flush_part_thinking_if_needed(&mut self) {
778        if self.in_thinking_part_stream {
779            println!();
780            let _ = io::stdout().flush();
781            self.think_buffer.clear();
782            self.in_thinking_part_stream = false;
783        }
784    }
785
786    fn flush_pending_thinking(&mut self) {
787        self.flush_part_thinking_if_needed();
788        if self.in_think_block {
789            self.flush_think_with_label("think");
790            self.in_think_block = false;
791        }
792    }
793
794    fn flush_think_with_label(&mut self, label: &str) {
795        let content = self.think_buffer.trim();
796        if !content.is_empty() {
797            print!("\n[{label}] {content}\n");
798            let _ = io::stdout().flush();
799        }
800        self.think_buffer.clear();
801    }
802}
803
804impl Default for StreamPrinter {
805    fn default() -> Self {
806        Self::new(ThinkingDisplayMode::Auto)
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use adk_core::{Agent, EventStream, InvocationContext, Result as AdkResult};
814    use async_trait::async_trait;
815    use axum::{
816        body::{Body, to_bytes},
817        http::{Request, StatusCode},
818    };
819    use futures::stream;
820    use std::sync::Arc;
821    use tower::ServiceExt;
822
823    struct TestAgent;
824
825    #[async_trait]
826    impl Agent for TestAgent {
827        fn name(&self) -> &str {
828            "launcher_test_agent"
829        }
830
831        fn description(&self) -> &str {
832            "launcher test agent"
833        }
834
835        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
836            &[]
837        }
838
839        async fn run(&self, _ctx: Arc<dyn InvocationContext>) -> AdkResult<EventStream> {
840            Ok(Box::pin(stream::empty()))
841        }
842    }
843
844    fn test_launcher() -> Launcher {
845        Launcher::new(Arc::new(TestAgent)).with_telemetry(TelemetryConfig::None)
846    }
847
848    #[test]
849    fn stream_printer_tracks_think_block_state() {
850        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
851        assert!(!printer.in_think_block);
852
853        // Opening a think block sets the flag
854        printer.handle_text_chunk("<think>reasoning");
855        assert!(printer.in_think_block);
856        assert_eq!(printer.think_buffer, "reasoning");
857
858        // Closing the think block clears the flag and buffer
859        printer.handle_text_chunk(" more</think>visible");
860        assert!(!printer.in_think_block);
861        assert!(printer.think_buffer.is_empty());
862    }
863
864    #[test]
865    fn stream_printer_handles_think_block_across_chunks() {
866        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
867
868        printer.handle_text_chunk("before<think>start");
869        assert!(printer.in_think_block);
870        assert_eq!(printer.think_buffer, "start");
871
872        printer.handle_text_chunk(" middle");
873        assert!(printer.in_think_block);
874        assert_eq!(printer.think_buffer, "start middle");
875
876        printer.handle_text_chunk(" end</think>after");
877        assert!(!printer.in_think_block);
878        assert!(printer.think_buffer.is_empty());
879    }
880
881    #[test]
882    fn stream_printer_finish_flushes_open_think_block() {
883        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
884
885        printer.handle_text_chunk("<think>unclosed reasoning");
886        assert!(printer.in_think_block);
887
888        printer.finish();
889        assert!(!printer.in_think_block);
890        assert!(printer.think_buffer.is_empty());
891    }
892
893    #[test]
894    fn stream_printer_finish_is_noop_when_no_think_block() {
895        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
896        printer.finish();
897        assert!(!printer.in_think_block);
898        assert!(printer.think_buffer.is_empty());
899    }
900
901    #[test]
902    fn stream_printer_handles_multiple_think_blocks_in_one_chunk() {
903        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
904
905        printer.handle_text_chunk("a<think>first</think>b<think>second</think>c");
906        assert!(!printer.in_think_block);
907        assert!(printer.think_buffer.is_empty());
908    }
909
910    #[test]
911    fn stream_printer_handles_empty_think_block() {
912        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
913
914        printer.handle_text_chunk("<think></think>after");
915        assert!(!printer.in_think_block);
916        assert!(printer.think_buffer.is_empty());
917    }
918
919    #[test]
920    fn stream_printer_handles_all_part_types() {
921        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
922
923        // Text
924        printer.handle_part(&Part::Text { text: "hello".into() });
925        assert!(!printer.in_think_block);
926
927        // Thinking
928        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
929        assert!(printer.in_thinking_part_stream);
930
931        // FunctionCall
932        printer.handle_part(&Part::FunctionCall {
933            name: "get_weather".into(),
934            args: serde_json::json!({"city": "Seattle"}),
935            id: None,
936            thought_signature: None,
937        });
938
939        // FunctionResponse
940        printer.handle_part(&Part::FunctionResponse {
941            function_response: adk_core::FunctionResponseData::new(
942                "get_weather",
943                serde_json::json!({"temp": 72}),
944            ),
945            id: None,
946        });
947
948        // InlineData
949        printer
950            .handle_part(&Part::InlineData { mime_type: "image/png".into(), data: vec![0u8; 100] });
951
952        // FileData
953        printer.handle_part(&Part::FileData {
954            mime_type: "audio/wav".into(),
955            file_uri: "gs://bucket/file.wav".into(),
956        });
957
958        // No panics, no state corruption
959        assert!(!printer.in_think_block);
960        assert!(!printer.in_thinking_part_stream);
961    }
962
963    #[test]
964    fn stream_printer_text_without_think_tags_leaves_state_clean() {
965        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
966        printer.handle_text_chunk("just plain text with no tags");
967        assert!(!printer.in_think_block);
968        assert!(printer.think_buffer.is_empty());
969    }
970
971    #[test]
972    fn stream_printer_coalesces_streamed_thinking_parts() {
973        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
974
975        printer.handle_part(&Part::Thinking { thinking: "Okay".into(), signature: None });
976        printer.handle_part(&Part::Thinking { thinking: ", the".into(), signature: None });
977        printer.handle_part(&Part::Thinking { thinking: " user".into(), signature: None });
978
979        assert!(printer.in_thinking_part_stream);
980        assert_eq!(printer.think_buffer, "Okay, the user");
981
982        printer.handle_part(&Part::Text { text: "hello".into() });
983
984        assert!(!printer.in_thinking_part_stream);
985        assert!(printer.think_buffer.is_empty());
986    }
987
988    #[test]
989    fn stream_printer_finish_closes_streamed_thinking_state() {
990        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
991
992        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
993        assert!(printer.in_thinking_part_stream);
994
995        printer.finish();
996
997        assert!(!printer.in_thinking_part_stream);
998        assert!(printer.think_buffer.is_empty());
999    }
1000
1001    #[test]
1002    fn stream_printer_hide_mode_ignores_emitted_thinking_state() {
1003        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
1004
1005        printer.handle_part(&Part::Thinking { thinking: "secret".into(), signature: None });
1006
1007        assert!(!printer.in_thinking_part_stream);
1008        assert!(printer.think_buffer.is_empty());
1009    }
1010
1011    #[test]
1012    fn stream_printer_hide_mode_drops_think_tags_from_text() {
1013        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
1014
1015        printer.handle_text_chunk("visible<think>hidden</think>after");
1016
1017        assert!(!printer.in_think_block);
1018        assert!(printer.think_buffer.is_empty());
1019    }
1020
1021    #[test]
1022    fn exit_command_helper_accepts_plain_and_slash_variants() {
1023        for command in ["quit", "exit", "/quit", "/exit"] {
1024            assert!(is_exit_command(command));
1025        }
1026
1027        assert!(!is_exit_command("hello"));
1028    }
1029
1030    #[tokio::test]
1031    async fn build_app_includes_health_route() {
1032        let app = test_launcher().build_app().expect("launcher app should build");
1033
1034        let response = app
1035            .oneshot(Request::builder().uri("/api/health").body(Body::empty()).unwrap())
1036            .await
1037            .expect("health request should succeed");
1038
1039        assert_eq!(response.status(), StatusCode::OK);
1040
1041        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1042        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1043        assert_eq!(json["status"], "healthy");
1044    }
1045
1046    #[tokio::test]
1047    async fn build_app_does_not_enable_a2a_routes_by_default() {
1048        let app = test_launcher().build_app().expect("launcher app should build");
1049
1050        let response = app
1051            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1052            .await
1053            .expect("agent card request should complete");
1054
1055        assert_eq!(response.status(), StatusCode::NOT_FOUND);
1056    }
1057
1058    #[tokio::test]
1059    async fn build_app_with_a2a_enables_agent_card_route() {
1060        let app = test_launcher()
1061            .build_app_with_a2a("http://localhost:8080")
1062            .expect("launcher app with a2a should build");
1063
1064        let response = app
1065            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1066            .await
1067            .expect("agent card request should complete");
1068
1069        assert_eq!(response.status(), StatusCode::OK);
1070
1071        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1072        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1073        assert_eq!(json["name"], "launcher_test_agent");
1074        assert_eq!(json["description"], "launcher test agent");
1075    }
1076}