Skip to main content

adk_cli/
launcher.rs

1//! Simple launcher for ADK agents with CLI support.
2//!
3//! Provides a one-liner to run agents with console or web server modes,
4//! similar to adk-go's launcher pattern.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use adk_cli::Launcher;
10//! use std::sync::Arc;
11//!
12//! #[tokio::main]
13//! async fn main() -> adk_core::Result<()> {
14//!     let agent = /* create your agent */;
15//!
16//!     // Run with CLI support (console by default, or `serve` for web)
17//!     Launcher::new(Arc::new(agent)).run().await
18//! }
19//! ```
20//!
21//! # CLI Usage
22//!
23//! ```bash
24//! # Interactive console (default)
25//! cargo run
26//!
27//! # Web server with UI
28//! cargo run -- serve
29//! cargo run -- serve --port 3000
30//! ```
31
32use adk_artifact::ArtifactService;
33use adk_core::{
34    Agent, CacheCapable, Content, ContextCacheConfig, EventsCompactionConfig, Memory, Part, Result,
35    RunConfig, SessionId, StreamingMode, UserId,
36};
37use adk_runner::{Runner, RunnerConfig};
38use adk_server::{
39    RequestContextExtractor, SecurityConfig, ServerConfig, create_app, create_app_with_a2a,
40    shutdown_signal,
41};
42use adk_session::{CreateRequest, InMemorySessionService, SessionService};
43use axum::Router;
44use clap::{Parser, Subcommand};
45use futures::StreamExt;
46use rustyline::DefaultEditor;
47use serde_json::Value;
48use std::collections::HashMap;
49use std::io::{self, Write};
50use std::sync::Arc;
51use std::time::Duration;
52use tokio_util::sync::CancellationToken;
53use tracing::{error, warn};
54
55/// CLI arguments for the launcher.
56#[derive(Parser)]
57#[command(name = "agent")]
58#[command(about = "ADK Agent", long_about = None)]
59struct LauncherCli {
60    #[command(subcommand)]
61    command: Option<LauncherCommand>,
62}
63
64#[derive(Subcommand)]
65enum LauncherCommand {
66    /// Run interactive console (default if no command specified)
67    Chat,
68    /// Start web server with UI
69    Serve {
70        /// Server port
71        #[arg(long, default_value_t = 8080)]
72        port: u16,
73    },
74}
75
76/// Controls how the console renders thinking/reasoning content.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
78pub enum ThinkingDisplayMode {
79    /// Show thinking when the model emits it.
80    #[default]
81    Auto,
82    /// Always surface emitted thinking.
83    Show,
84    /// Hide emitted thinking from the console output.
85    Hide,
86}
87
88/// Controls how serve mode initializes telemetry.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum TelemetryConfig {
91    /// Use the in-memory ADK span exporter for debug endpoints.
92    AdkExporter { service_name: String },
93    /// Export telemetry to an OTLP collector.
94    Otlp { service_name: String, endpoint: String },
95    /// Disable telemetry initialization in the launcher.
96    None,
97}
98
99impl Default for TelemetryConfig {
100    fn default() -> Self {
101        Self::AdkExporter { service_name: "adk-server".to_string() }
102    }
103}
104
105/// Launcher for running ADK agents with CLI support.
106///
107/// Provides console and web server modes out of the box.
108///
109/// # Console mode
110///
111/// Uses a `rustyline` REPL with history, Ctrl+C handling, and streaming
112/// output that renders `<think>` blocks, tool calls, and inline data.
113///
114/// # Serve mode
115///
116/// Starts an Axum HTTP server with the ADK web UI.
117///
118/// # Note on `with_streaming_mode`
119///
120/// `with_streaming_mode` currently only affects console mode. `ServerConfig`
121/// does not accept a `RunConfig`, so the setting is not forwarded to the
122/// server. This will be addressed when `ServerConfig` gains that field.
123pub struct Launcher {
124    agent: Arc<dyn Agent>,
125    app_name: Option<String>,
126    session_service: Option<Arc<dyn SessionService>>,
127    artifact_service: Option<Arc<dyn ArtifactService>>,
128    memory_service: Option<Arc<dyn Memory>>,
129    compaction_config: Option<EventsCompactionConfig>,
130    context_cache_config: Option<ContextCacheConfig>,
131    cache_capable: Option<Arc<dyn CacheCapable>>,
132    security_config: Option<SecurityConfig>,
133    request_context_extractor: Option<Arc<dyn RequestContextExtractor>>,
134    a2a_base_url: Option<String>,
135    telemetry_config: TelemetryConfig,
136    shutdown_grace_period: Duration,
137    run_config: Option<RunConfig>,
138    thinking_mode: ThinkingDisplayMode,
139}
140
141impl Launcher {
142    /// Create a new launcher with the given agent.
143    pub fn new(agent: Arc<dyn Agent>) -> Self {
144        Self {
145            agent,
146            app_name: None,
147            session_service: None,
148            artifact_service: None,
149            memory_service: None,
150            compaction_config: None,
151            context_cache_config: None,
152            cache_capable: None,
153            security_config: None,
154            request_context_extractor: None,
155            a2a_base_url: None,
156            telemetry_config: TelemetryConfig::default(),
157            shutdown_grace_period: Duration::from_secs(30),
158            run_config: None,
159            thinking_mode: ThinkingDisplayMode::Auto,
160        }
161    }
162
163    /// Set a custom application name (defaults to agent name).
164    pub fn app_name(mut self, name: impl Into<String>) -> Self {
165        self.app_name = Some(name.into());
166        self
167    }
168
169    /// Set a custom artifact service.
170    pub fn with_artifact_service(mut self, service: Arc<dyn ArtifactService>) -> Self {
171        self.artifact_service = Some(service);
172        self
173    }
174
175    /// Set a custom session service.
176    pub fn with_session_service(mut self, service: Arc<dyn SessionService>) -> Self {
177        self.session_service = Some(service);
178        self
179    }
180
181    /// Set a custom memory service.
182    pub fn with_memory_service(mut self, service: Arc<dyn Memory>) -> Self {
183        self.memory_service = Some(service);
184        self
185    }
186
187    /// Enable runner-level context compaction in serve mode.
188    pub fn with_compaction(mut self, config: EventsCompactionConfig) -> Self {
189        self.compaction_config = Some(config);
190        self
191    }
192
193    /// Enable automatic prompt cache lifecycle management in serve mode.
194    pub fn with_context_cache(
195        mut self,
196        config: ContextCacheConfig,
197        cache_capable: Arc<dyn CacheCapable>,
198    ) -> Self {
199        self.context_cache_config = Some(config);
200        self.cache_capable = Some(cache_capable);
201        self
202    }
203
204    /// Set custom server security settings.
205    pub fn with_security_config(mut self, config: SecurityConfig) -> Self {
206        self.security_config = Some(config);
207        self
208    }
209
210    /// Set a request context extractor for authenticated deployments.
211    pub fn with_request_context_extractor(
212        mut self,
213        extractor: Arc<dyn RequestContextExtractor>,
214    ) -> Self {
215        self.request_context_extractor = Some(extractor);
216        self
217    }
218
219    /// Enable A2A routes when building or serving the HTTP app.
220    pub fn with_a2a_base_url(mut self, base_url: impl Into<String>) -> Self {
221        self.a2a_base_url = Some(base_url.into());
222        self
223    }
224
225    /// Configure how serve mode initializes telemetry.
226    pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
227        self.telemetry_config = config;
228        self
229    }
230
231    /// Set the maximum graceful shutdown window for the web server.
232    pub fn with_shutdown_grace_period(mut self, grace_period: Duration) -> Self {
233        self.shutdown_grace_period = grace_period;
234        self
235    }
236
237    /// Set streaming mode for console mode.
238    ///
239    /// Note: this currently only affects console mode. The server does not
240    /// yet accept a `RunConfig`.
241    pub fn with_streaming_mode(mut self, mode: StreamingMode) -> Self {
242        self.run_config = Some(RunConfig { streaming_mode: mode, ..RunConfig::default() });
243        self
244    }
245
246    /// Control how emitted thinking content is rendered in console mode.
247    pub fn with_thinking_mode(mut self, mode: ThinkingDisplayMode) -> Self {
248        self.thinking_mode = mode;
249        self
250    }
251
252    /// Run the launcher, parsing CLI arguments.
253    ///
254    /// - No arguments or `chat`: Interactive console
255    /// - `serve [--port PORT]`: Web server with UI
256    pub async fn run(self) -> Result<()> {
257        let cli = LauncherCli::parse();
258
259        match cli.command.unwrap_or(LauncherCommand::Chat) {
260            LauncherCommand::Chat => self.run_console_directly().await,
261            LauncherCommand::Serve { port } => self.run_serve_directly(port).await,
262        }
263    }
264
265    /// Run in interactive console mode without parsing CLI arguments.
266    ///
267    /// Use this when you already know you want console mode (e.g. from
268    /// your own CLI parser). [`run`](Self::run) calls this internally.
269    pub async fn run_console_directly(self) -> Result<()> {
270        let app_name = self.app_name.unwrap_or_else(|| self.agent.name().to_string());
271        let user_id = "user".to_string();
272        let thinking_mode = self.thinking_mode;
273        let agent = self.agent;
274        let artifact_service = self.artifact_service;
275        let memory_service = self.memory_service;
276        let run_config = self.run_config;
277
278        let session_service =
279            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
280
281        let session = session_service
282            .create(CreateRequest {
283                app_name: app_name.clone(),
284                user_id: user_id.clone(),
285                session_id: None,
286                state: HashMap::new(),
287            })
288            .await?;
289
290        let session_id = session.id().to_string();
291
292        let mut rl = DefaultEditor::new()
293            .map_err(|e| adk_core::AdkError::config(format!("failed to init readline: {e}")))?;
294
295        print_banner(agent.name());
296
297        loop {
298            let readline = rl.readline("\x1b[36mYou >\x1b[0m ");
299            match readline {
300                Ok(line) => {
301                    let trimmed = line.trim().to_string();
302                    if trimmed.is_empty() {
303                        continue;
304                    }
305                    if is_exit_command(&trimmed) {
306                        println!("\nGoodbye.\n");
307                        break;
308                    }
309                    if trimmed == "/help" {
310                        print_help();
311                        continue;
312                    }
313                    if trimmed == "/clear" {
314                        println!("(conversation cleared — session state is unchanged)");
315                        continue;
316                    }
317
318                    let _ = rl.add_history_entry(&line);
319
320                    let user_content = Content::new("user").with_text(trimmed);
321                    println!();
322
323                    let cancellation_token = CancellationToken::new();
324                    let runner = Runner::new(RunnerConfig {
325                        app_name: app_name.clone(),
326                        agent: agent.clone(),
327                        session_service: session_service.clone(),
328                        artifact_service: artifact_service.clone(),
329                        memory_service: memory_service.clone(),
330                        plugin_manager: None,
331                        run_config: run_config.clone(),
332                        compaction_config: None,
333                        context_cache_config: None,
334                        cache_capable: None,
335                        request_context: None,
336                        cancellation_token: Some(cancellation_token.clone()),
337                    })?;
338                    let mut events = runner
339                        .run(
340                            UserId::new(user_id.clone())?,
341                            SessionId::new(session_id.clone())?,
342                            user_content,
343                        )
344                        .await?;
345                    let mut printer = StreamPrinter::new(thinking_mode);
346                    let mut current_agent = String::new();
347                    let mut printed_header = false;
348                    let mut interrupted = false;
349
350                    loop {
351                        tokio::select! {
352                            event = events.next() => {
353                                let Some(event) = event else {
354                                    break;
355                                };
356
357                                match event {
358                                    Ok(evt) => {
359                                        // Track agent switches in multi-agent workflows
360                                        if !evt.author.is_empty()
361                                            && evt.author != "user"
362                                            && evt.author != current_agent
363                                        {
364                                            if !current_agent.is_empty() {
365                                                println!();
366                                            }
367                                            current_agent = evt.author.clone();
368                                            // Only show agent label in multi-agent scenarios
369                                            if printed_header {
370                                                print!("\x1b[33m[{current_agent}]\x1b[0m ");
371                                                let _ = io::stdout().flush();
372                                            }
373                                            printed_header = true;
374                                        }
375
376                                        // Show agent transfer requests
377                                        if let Some(target) = &evt.actions.transfer_to_agent {
378                                            print!("\x1b[90m[transfer -> {target}]\x1b[0m ");
379                                            let _ = io::stdout().flush();
380                                        }
381
382                                        if let Some(content) = &evt.llm_response.content {
383                                            for part in &content.parts {
384                                                printer.handle_part(part);
385                                            }
386                                        }
387                                    }
388                                    Err(e) => {
389                                        error!("stream error: {e}");
390                                    }
391                                }
392                            }
393                            signal = tokio::signal::ctrl_c() => {
394                                match signal {
395                                    Ok(()) => {
396                                        cancellation_token.cancel();
397                                        interrupted = true;
398                                        break;
399                                    }
400                                    Err(err) => {
401                                        error!("failed to listen for Ctrl+C: {err}");
402                                    }
403                                }
404                            }
405                        }
406                    }
407
408                    printer.finish();
409                    if interrupted {
410                        println!("\nInterrupted.\n");
411                        continue;
412                    }
413
414                    println!("\n");
415                }
416                Err(rustyline::error::ReadlineError::Interrupted) => {
417                    println!("\nInterrupted. Type exit to quit.\n");
418                    continue;
419                }
420                Err(rustyline::error::ReadlineError::Eof) => {
421                    println!("\nGoodbye.\n");
422                    break;
423                }
424                Err(err) => {
425                    error!("readline error: {err}");
426                    break;
427                }
428            }
429        }
430
431        Ok(())
432    }
433
434    fn init_telemetry(&self) -> Option<Arc<adk_telemetry::AdkSpanExporter>> {
435        match &self.telemetry_config {
436            TelemetryConfig::AdkExporter { service_name } => {
437                match adk_telemetry::init_with_adk_exporter(service_name) {
438                    Ok(exporter) => Some(exporter),
439                    Err(e) => {
440                        warn!("failed to initialize telemetry: {e}");
441                        None
442                    }
443                }
444            }
445            TelemetryConfig::Otlp { service_name, endpoint } => {
446                if let Err(e) = adk_telemetry::init_with_otlp(service_name, endpoint) {
447                    warn!("failed to initialize otlp telemetry: {e}");
448                }
449                None
450            }
451            TelemetryConfig::None => None,
452        }
453    }
454
455    fn into_server_config(
456        self,
457        span_exporter: Option<Arc<adk_telemetry::AdkSpanExporter>>,
458    ) -> ServerConfig {
459        let session_service =
460            self.session_service.unwrap_or_else(|| Arc::new(InMemorySessionService::new()));
461        let agent_loader = Arc::new(adk_core::SingleAgentLoader::new(self.agent));
462
463        let mut config = ServerConfig::new(agent_loader, session_service)
464            .with_artifact_service_opt(self.artifact_service);
465
466        if let Some(memory_service) = self.memory_service {
467            config = config.with_memory_service(memory_service);
468        }
469
470        if let Some(compaction_config) = self.compaction_config {
471            config = config.with_compaction(compaction_config);
472        }
473
474        if let (Some(context_cache_config), Some(cache_capable)) =
475            (self.context_cache_config, self.cache_capable)
476        {
477            config = config.with_context_cache(context_cache_config, cache_capable);
478        }
479
480        if let Some(security) = self.security_config {
481            config = config.with_security(security);
482        }
483
484        if let Some(extractor) = self.request_context_extractor {
485            config = config.with_request_context(extractor);
486        }
487
488        if let Some(exporter) = span_exporter {
489            config = config.with_span_exporter(exporter);
490        }
491
492        config
493    }
494
495    /// Build the Axum application without serving it.
496    ///
497    /// This is the production escape hatch for adding custom routes,
498    /// middleware, metrics, or owning the serve loop yourself.
499    pub fn build_app(self) -> Result<Router> {
500        let span_exporter = self.init_telemetry();
501        let a2a_base_url = self.a2a_base_url.clone();
502        let config = self.into_server_config(span_exporter);
503
504        Ok(match a2a_base_url {
505            Some(base_url) => create_app_with_a2a(config, Some(&base_url)),
506            None => create_app(config),
507        })
508    }
509
510    /// Build the Axum application with A2A routes enabled.
511    pub fn build_app_with_a2a(mut self, base_url: impl Into<String>) -> Result<Router> {
512        self.a2a_base_url = Some(base_url.into());
513        self.build_app()
514    }
515
516    /// Run web server without parsing CLI arguments.
517    ///
518    /// Use this when you already know you want serve mode (e.g. from
519    /// your own CLI parser). [`run`](Self::run) calls this internally.
520    pub async fn run_serve_directly(self, port: u16) -> Result<()> {
521        let app = self.build_app()?;
522
523        let addr = format!("0.0.0.0:{port}");
524        let listener = tokio::net::TcpListener::bind(&addr)
525            .await
526            .map_err(|e| adk_core::AdkError::config(format!("failed to bind {addr}: {e}")))?;
527
528        println!("ADK Server starting on http://localhost:{port}");
529        println!("Open http://localhost:{port} in your browser");
530        println!("Press Ctrl+C to stop\n");
531
532        axum::serve(listener, app)
533            .with_graceful_shutdown(shutdown_signal())
534            .await
535            .map_err(|e| adk_core::AdkError::config(format!("server error: {e}")))?;
536
537        Ok(())
538    }
539}
540
541/// Print the ADK-Rust welcome banner.
542fn print_banner(agent_name: &str) {
543    let version = env!("CARGO_PKG_VERSION");
544    let title = format!("ADK-Rust  v{version}");
545    let subtitle = "Rust Agent Development Kit";
546    // Box inner width = 49 (between the ║ chars)
547    let inner: usize = 49;
548    let pad_title = (inner.saturating_sub(title.len())) / 2;
549    let pad_subtitle = (inner.saturating_sub(subtitle.len())) / 2;
550
551    println!();
552    println!("    ╔{:═<inner$}╗", "");
553    println!(
554        "    ║{:>w$}{title}{:<r$}║",
555        "",
556        "",
557        w = pad_title,
558        r = inner - pad_title - title.len()
559    );
560    println!(
561        "    ║{:>w$}{subtitle}{:<r$}║",
562        "",
563        "",
564        w = pad_subtitle,
565        r = inner - pad_subtitle - subtitle.len()
566    );
567    println!("    ╚{:═<inner$}╝", "");
568    println!();
569    println!("  Agent    : {agent_name}");
570    println!("  Runtime  : Tokio async, streaming responses");
571    println!("  Features : tool calling, multi-provider, multi-agent, think blocks");
572    println!();
573    println!("  Type a message to chat. /help for commands.");
574    println!();
575}
576
577/// Print available REPL commands.
578fn print_help() {
579    println!();
580    println!("  Commands:");
581    println!("    /help   Show this help");
582    println!("    /clear  Clear display (session state is kept)");
583    println!("    quit    Exit the REPL");
584    println!("    exit    Exit the REPL");
585    println!("    /quit   Exit the REPL");
586    println!("    /exit   Exit the REPL");
587    println!();
588    println!("  Tips:");
589    println!("    - Up/Down arrows browse history");
590    println!("    - Ctrl+C interrupts the current operation");
591    println!("    - Multi-agent workflows show [agent_name] on handoff");
592    println!();
593}
594
595fn is_exit_command(input: &str) -> bool {
596    matches!(input, "quit" | "exit" | "/quit" | "/exit")
597}
598
599/// Handles streaming output with special rendering for think blocks,
600/// tool calls, function responses, and inline/file data.
601pub struct StreamPrinter {
602    thinking_mode: ThinkingDisplayMode,
603    in_think_block: bool,
604    in_thinking_part_stream: bool,
605    think_buffer: String,
606}
607
608impl StreamPrinter {
609    /// Create a printer with the selected thinking display mode.
610    pub fn new(thinking_mode: ThinkingDisplayMode) -> Self {
611        Self {
612            thinking_mode,
613            in_think_block: false,
614            in_thinking_part_stream: false,
615            think_buffer: String::new(),
616        }
617    }
618
619    /// Process a single response part, rendering it to stdout.
620    pub fn handle_part(&mut self, part: &Part) {
621        match part {
622            Part::Text { text } => {
623                self.flush_part_thinking_if_needed();
624                self.handle_text_chunk(text);
625            }
626            Part::Thinking { thinking, .. } => {
627                if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
628                    return;
629                }
630                if !self.in_thinking_part_stream {
631                    print!("\n[thinking] ");
632                    let _ = io::stdout().flush();
633                    self.in_thinking_part_stream = true;
634                }
635                self.think_buffer.push_str(thinking);
636                print!("{thinking}");
637                let _ = io::stdout().flush();
638            }
639            Part::FunctionCall { name, args, .. } => {
640                self.flush_pending_thinking();
641                print!("\n[tool-call] {name} {args}\n");
642                let _ = io::stdout().flush();
643            }
644            Part::FunctionResponse { function_response, .. } => {
645                self.flush_pending_thinking();
646                self.print_tool_response(&function_response.name, &function_response.response);
647            }
648            Part::InlineData { mime_type, data } => {
649                self.flush_pending_thinking();
650                print!("\n[inline-data] mime={mime_type} bytes={}\n", data.len());
651                let _ = io::stdout().flush();
652            }
653            Part::FileData { mime_type, file_uri } => {
654                self.flush_pending_thinking();
655                print!("\n[file-data] mime={mime_type} uri={file_uri}\n");
656                let _ = io::stdout().flush();
657            }
658            Part::ServerToolCall { server_tool_call } => {
659                self.flush_pending_thinking();
660                print!("\n[server-tool-call] {server_tool_call}\n");
661                let _ = io::stdout().flush();
662            }
663            Part::ServerToolResponse { server_tool_response } => {
664                self.flush_pending_thinking();
665                print!("\n[server-tool-response] {}B\n", server_tool_response.to_string().len());
666                let _ = io::stdout().flush();
667            }
668        }
669    }
670
671    fn handle_text_chunk(&mut self, chunk: &str) {
672        if matches!(self.thinking_mode, ThinkingDisplayMode::Hide) {
673            let mut visible = String::with_capacity(chunk.len());
674            let mut remaining = chunk;
675
676            while let Some(start_idx) = remaining.find("<think>") {
677                visible.push_str(&remaining[..start_idx]);
678                let after_start = &remaining[start_idx + "<think>".len()..];
679                if let Some(end_idx) = after_start.find("</think>") {
680                    remaining = &after_start[end_idx + "</think>".len()..];
681                } else {
682                    remaining = "";
683                    break;
684                }
685            }
686
687            visible.push_str(remaining);
688            if !visible.is_empty() {
689                print!("{visible}");
690                let _ = io::stdout().flush();
691            }
692            return;
693        }
694
695        const THINK_START: &str = "<think>";
696        const THINK_END: &str = "</think>";
697
698        let mut remaining = chunk;
699
700        while !remaining.is_empty() {
701            if self.in_think_block {
702                if let Some(end_idx) = remaining.find(THINK_END) {
703                    self.think_buffer.push_str(&remaining[..end_idx]);
704                    self.flush_think();
705                    self.in_think_block = false;
706                    remaining = &remaining[end_idx + THINK_END.len()..];
707                } else {
708                    self.think_buffer.push_str(remaining);
709                    break;
710                }
711            } else if let Some(start_idx) = remaining.find(THINK_START) {
712                let visible = &remaining[..start_idx];
713                if !visible.is_empty() {
714                    print!("{visible}");
715                    let _ = io::stdout().flush();
716                }
717                self.in_think_block = true;
718                self.think_buffer.clear();
719                remaining = &remaining[start_idx + THINK_START.len()..];
720            } else {
721                print!("{remaining}");
722                let _ = io::stdout().flush();
723                break;
724            }
725        }
726    }
727
728    fn flush_think(&mut self) {
729        let content = self.think_buffer.trim();
730        if !content.is_empty() {
731            print!("\n[think] {content}\n");
732            let _ = io::stdout().flush();
733        }
734        self.think_buffer.clear();
735    }
736
737    /// Flush any pending think block content.
738    pub fn finish(&mut self) {
739        self.flush_pending_thinking();
740    }
741
742    fn print_tool_response(&self, name: &str, response: &Value) {
743        print!("\n[tool-response] {name} {response}\n");
744        let _ = io::stdout().flush();
745    }
746
747    fn flush_part_thinking_if_needed(&mut self) {
748        if self.in_thinking_part_stream {
749            println!();
750            let _ = io::stdout().flush();
751            self.think_buffer.clear();
752            self.in_thinking_part_stream = false;
753        }
754    }
755
756    fn flush_pending_thinking(&mut self) {
757        self.flush_part_thinking_if_needed();
758        if self.in_think_block {
759            self.flush_think_with_label("think");
760            self.in_think_block = false;
761        }
762    }
763
764    fn flush_think_with_label(&mut self, label: &str) {
765        let content = self.think_buffer.trim();
766        if !content.is_empty() {
767            print!("\n[{label}] {content}\n");
768            let _ = io::stdout().flush();
769        }
770        self.think_buffer.clear();
771    }
772}
773
774impl Default for StreamPrinter {
775    fn default() -> Self {
776        Self::new(ThinkingDisplayMode::Auto)
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use adk_core::{Agent, EventStream, InvocationContext, Result as AdkResult};
784    use async_trait::async_trait;
785    use axum::{
786        body::{Body, to_bytes},
787        http::{Request, StatusCode},
788    };
789    use futures::stream;
790    use std::sync::Arc;
791    use tower::ServiceExt;
792
793    struct TestAgent;
794
795    #[async_trait]
796    impl Agent for TestAgent {
797        fn name(&self) -> &str {
798            "launcher_test_agent"
799        }
800
801        fn description(&self) -> &str {
802            "launcher test agent"
803        }
804
805        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
806            &[]
807        }
808
809        async fn run(&self, _ctx: Arc<dyn InvocationContext>) -> AdkResult<EventStream> {
810            Ok(Box::pin(stream::empty()))
811        }
812    }
813
814    fn test_launcher() -> Launcher {
815        Launcher::new(Arc::new(TestAgent)).with_telemetry(TelemetryConfig::None)
816    }
817
818    #[test]
819    fn stream_printer_tracks_think_block_state() {
820        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
821        assert!(!printer.in_think_block);
822
823        // Opening a think block sets the flag
824        printer.handle_text_chunk("<think>reasoning");
825        assert!(printer.in_think_block);
826        assert_eq!(printer.think_buffer, "reasoning");
827
828        // Closing the think block clears the flag and buffer
829        printer.handle_text_chunk(" more</think>visible");
830        assert!(!printer.in_think_block);
831        assert!(printer.think_buffer.is_empty());
832    }
833
834    #[test]
835    fn stream_printer_handles_think_block_across_chunks() {
836        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
837
838        printer.handle_text_chunk("before<think>start");
839        assert!(printer.in_think_block);
840        assert_eq!(printer.think_buffer, "start");
841
842        printer.handle_text_chunk(" middle");
843        assert!(printer.in_think_block);
844        assert_eq!(printer.think_buffer, "start middle");
845
846        printer.handle_text_chunk(" end</think>after");
847        assert!(!printer.in_think_block);
848        assert!(printer.think_buffer.is_empty());
849    }
850
851    #[test]
852    fn stream_printer_finish_flushes_open_think_block() {
853        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
854
855        printer.handle_text_chunk("<think>unclosed reasoning");
856        assert!(printer.in_think_block);
857
858        printer.finish();
859        assert!(!printer.in_think_block);
860        assert!(printer.think_buffer.is_empty());
861    }
862
863    #[test]
864    fn stream_printer_finish_is_noop_when_no_think_block() {
865        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
866        printer.finish();
867        assert!(!printer.in_think_block);
868        assert!(printer.think_buffer.is_empty());
869    }
870
871    #[test]
872    fn stream_printer_handles_multiple_think_blocks_in_one_chunk() {
873        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
874
875        printer.handle_text_chunk("a<think>first</think>b<think>second</think>c");
876        assert!(!printer.in_think_block);
877        assert!(printer.think_buffer.is_empty());
878    }
879
880    #[test]
881    fn stream_printer_handles_empty_think_block() {
882        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
883
884        printer.handle_text_chunk("<think></think>after");
885        assert!(!printer.in_think_block);
886        assert!(printer.think_buffer.is_empty());
887    }
888
889    #[test]
890    fn stream_printer_handles_all_part_types() {
891        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
892
893        // Text
894        printer.handle_part(&Part::Text { text: "hello".into() });
895        assert!(!printer.in_think_block);
896
897        // Thinking
898        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
899        assert!(printer.in_thinking_part_stream);
900
901        // FunctionCall
902        printer.handle_part(&Part::FunctionCall {
903            name: "get_weather".into(),
904            args: serde_json::json!({"city": "Seattle"}),
905            id: None,
906            thought_signature: None,
907        });
908
909        // FunctionResponse
910        printer.handle_part(&Part::FunctionResponse {
911            function_response: adk_core::FunctionResponseData {
912                name: "get_weather".into(),
913                response: serde_json::json!({"temp": 72}),
914            },
915            id: None,
916        });
917
918        // InlineData
919        printer
920            .handle_part(&Part::InlineData { mime_type: "image/png".into(), data: vec![0u8; 100] });
921
922        // FileData
923        printer.handle_part(&Part::FileData {
924            mime_type: "audio/wav".into(),
925            file_uri: "gs://bucket/file.wav".into(),
926        });
927
928        // No panics, no state corruption
929        assert!(!printer.in_think_block);
930        assert!(!printer.in_thinking_part_stream);
931    }
932
933    #[test]
934    fn stream_printer_text_without_think_tags_leaves_state_clean() {
935        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
936        printer.handle_text_chunk("just plain text with no tags");
937        assert!(!printer.in_think_block);
938        assert!(printer.think_buffer.is_empty());
939    }
940
941    #[test]
942    fn stream_printer_coalesces_streamed_thinking_parts() {
943        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
944
945        printer.handle_part(&Part::Thinking { thinking: "Okay".into(), signature: None });
946        printer.handle_part(&Part::Thinking { thinking: ", the".into(), signature: None });
947        printer.handle_part(&Part::Thinking { thinking: " user".into(), signature: None });
948
949        assert!(printer.in_thinking_part_stream);
950        assert_eq!(printer.think_buffer, "Okay, the user");
951
952        printer.handle_part(&Part::Text { text: "hello".into() });
953
954        assert!(!printer.in_thinking_part_stream);
955        assert!(printer.think_buffer.is_empty());
956    }
957
958    #[test]
959    fn stream_printer_finish_closes_streamed_thinking_state() {
960        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Auto);
961
962        printer.handle_part(&Part::Thinking { thinking: "reasoning".into(), signature: None });
963        assert!(printer.in_thinking_part_stream);
964
965        printer.finish();
966
967        assert!(!printer.in_thinking_part_stream);
968        assert!(printer.think_buffer.is_empty());
969    }
970
971    #[test]
972    fn stream_printer_hide_mode_ignores_emitted_thinking_state() {
973        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
974
975        printer.handle_part(&Part::Thinking { thinking: "secret".into(), signature: None });
976
977        assert!(!printer.in_thinking_part_stream);
978        assert!(printer.think_buffer.is_empty());
979    }
980
981    #[test]
982    fn stream_printer_hide_mode_drops_think_tags_from_text() {
983        let mut printer = StreamPrinter::new(ThinkingDisplayMode::Hide);
984
985        printer.handle_text_chunk("visible<think>hidden</think>after");
986
987        assert!(!printer.in_think_block);
988        assert!(printer.think_buffer.is_empty());
989    }
990
991    #[test]
992    fn exit_command_helper_accepts_plain_and_slash_variants() {
993        for command in ["quit", "exit", "/quit", "/exit"] {
994            assert!(is_exit_command(command));
995        }
996
997        assert!(!is_exit_command("hello"));
998    }
999
1000    #[tokio::test]
1001    async fn build_app_includes_health_route() {
1002        let app = test_launcher().build_app().expect("launcher app should build");
1003
1004        let response = app
1005            .oneshot(Request::builder().uri("/api/health").body(Body::empty()).unwrap())
1006            .await
1007            .expect("health request should succeed");
1008
1009        assert_eq!(response.status(), StatusCode::OK);
1010
1011        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1012        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1013        assert_eq!(json["status"], "healthy");
1014    }
1015
1016    #[tokio::test]
1017    async fn build_app_does_not_enable_a2a_routes_by_default() {
1018        let app = test_launcher().build_app().expect("launcher app should build");
1019
1020        let response = app
1021            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1022            .await
1023            .expect("agent card request should complete");
1024
1025        assert_eq!(response.status(), StatusCode::NOT_FOUND);
1026    }
1027
1028    #[tokio::test]
1029    async fn build_app_with_a2a_enables_agent_card_route() {
1030        let app = test_launcher()
1031            .build_app_with_a2a("http://localhost:8080")
1032            .expect("launcher app with a2a should build");
1033
1034        let response = app
1035            .oneshot(Request::builder().uri("/.well-known/agent.json").body(Body::empty()).unwrap())
1036            .await
1037            .expect("agent card request should complete");
1038
1039        assert_eq!(response.status(), StatusCode::OK);
1040
1041        let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1042        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1043        assert_eq!(json["name"], "launcher_test_agent");
1044        assert_eq!(json["description"], "launcher test agent");
1045    }
1046}