1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3#![deny(rustdoc::broken_intra_doc_links)]
4#![cfg_attr(test, allow(clippy::unwrap_used))]
5
6pub mod canvas;
8mod canvas_dispatch;
9#[cfg(feature = "bundled-cli")]
11pub(crate) mod embeddedcli;
12mod errors;
13pub use errors::*;
14pub mod copilot_request_handler;
18pub mod handler;
20pub mod hooks;
22mod jsonrpc;
23pub mod permission;
25pub mod provider_token;
27mod provider_token_dispatch;
28pub(crate) mod resolve;
30mod router;
31pub mod session;
33pub mod session_fs;
35mod session_fs_dispatch;
36pub mod subscription;
38pub mod tool;
40pub mod trace_context;
42pub mod transforms;
44pub mod types;
46mod wire;
47
48pub mod session_events;
50
51pub mod rpc;
54
55pub(crate) mod generated;
60
61pub mod mode;
64
65use std::ffi::OsString;
66use std::path::{Path, PathBuf};
67use std::process::Stdio;
68use std::sync::{Arc, OnceLock};
69use std::time::{Duration, Instant};
70
71use async_trait::async_trait;
72pub(crate) use jsonrpc::{
75 JsonRpcClient, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, error_codes,
76};
77pub use mode::{BUILTIN_TOOLS_ISOLATED, ClientMode, ToolSet};
78pub use provider_token::{BearerTokenError, BearerTokenProvider, ProviderTokenArgs};
79
80#[cfg(feature = "test-support")]
82pub mod test_support {
83 pub use crate::jsonrpc::{
84 JsonRpcClient, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
85 error_codes,
86 };
87}
88use serde::{Deserialize, Serialize};
89use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader};
90use tokio::net::TcpStream;
91use tokio::process::{Child, Command};
92use tokio::sync::{broadcast, mpsc, oneshot};
93use tracing::{Instrument, debug, error, info, warn};
94pub use types::*;
95
96mod sdk_protocol_version;
97pub use sdk_protocol_version::{SDK_PROTOCOL_VERSION, get_sdk_protocol_version};
98pub use subscription::{EventSubscription, LifecycleSubscription};
99
100const MIN_PROTOCOL_VERSION: u32 = 3;
102const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
103
104#[derive(Debug, Default)]
106#[non_exhaustive]
107pub enum Transport {
108 #[default]
110 Stdio,
111 Tcp {
113 port: u16,
115 connection_token: Option<String>,
119 },
120 External {
122 host: String,
124 port: u16,
126 connection_token: Option<String>,
129 },
130}
131
132#[derive(Debug, Clone, Default)]
134pub enum CliProgram {
135 #[default]
138 Resolve,
139 Path(PathBuf),
141}
142
143impl From<PathBuf> for CliProgram {
144 fn from(path: PathBuf) -> Self {
145 Self::Path(path)
146 }
147}
148
149pub const HAS_BUNDLED_CLI: bool = cfg!(has_bundled_cli);
156
157pub fn install_bundled_cli() -> Option<PathBuf> {
181 #[cfg(feature = "bundled-cli")]
182 {
183 embeddedcli::path()
184 }
185 #[cfg(not(feature = "bundled-cli"))]
186 {
187 None
188 }
189}
190
191#[non_exhaustive]
201pub struct ClientOptions {
202 pub program: CliProgram,
204 pub prefix_args: Vec<OsString>,
206 pub working_directory: PathBuf,
208 pub env: Vec<(OsString, OsString)>,
210 pub env_remove: Vec<OsString>,
212 pub extra_args: Vec<String>,
214 pub transport: Transport,
216 pub github_token: Option<String>,
221 pub use_logged_in_user: Option<bool>,
225 pub log_level: Option<LogLevel>,
229 pub session_idle_timeout_seconds: Option<u64>,
235 pub on_list_models: Option<Arc<dyn ListModelsHandler>>,
243 pub session_fs: Option<SessionFsConfig>,
251 pub request_handler: Option<Arc<dyn crate::copilot_request_handler::CopilotRequestHandler>>,
260 pub on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
270 pub telemetry: Option<TelemetryConfig>,
274 pub base_directory: Option<PathBuf>,
279 pub enable_remote_sessions: bool,
285 pub bundled_cli_extract_dir: Option<PathBuf>,
304 pub mode: ClientMode,
308}
309
310impl std::fmt::Debug for ClientOptions {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 f.debug_struct("ClientOptions")
313 .field("program", &self.program)
314 .field("prefix_args", &self.prefix_args)
315 .field("working_directory", &self.working_directory)
316 .field("env", &self.env)
317 .field("env_remove", &self.env_remove)
318 .field("extra_args", &self.extra_args)
319 .field("transport", &self.transport)
320 .field(
321 "github_token",
322 &self.github_token.as_ref().map(|_| "<redacted>"),
323 )
324 .field("use_logged_in_user", &self.use_logged_in_user)
325 .field("log_level", &self.log_level)
326 .field(
327 "session_idle_timeout_seconds",
328 &self.session_idle_timeout_seconds,
329 )
330 .field(
331 "on_list_models",
332 &self.on_list_models.as_ref().map(|_| "<set>"),
333 )
334 .field("session_fs", &self.session_fs)
335 .field(
336 "request_handler",
337 &self.request_handler.as_ref().map(|_| "<set>"),
338 )
339 .field(
340 "on_get_trace_context",
341 &self.on_get_trace_context.as_ref().map(|_| "<set>"),
342 )
343 .field("telemetry", &self.telemetry)
344 .field("base_directory", &self.base_directory)
345 .field("enable_remote_sessions", &self.enable_remote_sessions)
346 .field("bundled_cli_extract_dir", &self.bundled_cli_extract_dir)
347 .finish()
348 }
349}
350
351#[async_trait]
360pub trait ListModelsHandler: Send + Sync + 'static {
361 async fn list_models(&self) -> Result<Vec<Model>>;
363}
364
365#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
367#[serde(rename_all = "lowercase")]
368pub enum LogLevel {
369 None,
371 Error,
373 Warning,
375 Info,
377 Debug,
379 All,
381}
382
383impl LogLevel {
384 pub fn as_str(self) -> &'static str {
386 match self {
387 Self::None => "none",
388 Self::Error => "error",
389 Self::Warning => "warning",
390 Self::Info => "info",
391 Self::Debug => "debug",
392 Self::All => "all",
393 }
394 }
395}
396
397impl std::fmt::Display for LogLevel {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 f.write_str(self.as_str())
400 }
401}
402
403#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
408#[serde(rename_all = "kebab-case")]
409#[non_exhaustive]
410pub enum OtelExporterType {
411 OtlpHttp,
414 File,
417}
418
419impl OtelExporterType {
420 pub fn as_str(self) -> &'static str {
422 match self {
423 Self::OtlpHttp => "otlp-http",
424 Self::File => "file",
425 }
426 }
427}
428
429#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
435#[non_exhaustive]
436pub enum OtlpHttpProtocol {
437 #[serde(rename = "http/json")]
439 HttpJson,
440 #[serde(rename = "http/protobuf")]
442 HttpProtobuf,
443}
444
445impl OtlpHttpProtocol {
446 pub fn as_str(self) -> &'static str {
448 match self {
449 Self::HttpJson => "http/json",
450 Self::HttpProtobuf => "http/protobuf",
451 }
452 }
453}
454
455#[derive(Debug, Clone, Default)]
490#[non_exhaustive]
491pub struct TelemetryConfig {
492 pub otlp_endpoint: Option<String>,
494 pub otlp_protocol: Option<OtlpHttpProtocol>,
496 pub file_path: Option<PathBuf>,
498 pub exporter_type: Option<OtelExporterType>,
501 pub source_name: Option<String>,
505 pub capture_content: Option<bool>,
509}
510
511impl TelemetryConfig {
512 pub fn new() -> Self {
515 Self::default()
516 }
517
518 pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
520 self.otlp_endpoint = Some(endpoint.into());
521 self
522 }
523
524 pub fn with_otlp_protocol(mut self, protocol: OtlpHttpProtocol) -> Self {
526 self.otlp_protocol = Some(protocol);
527 self
528 }
529
530 pub fn with_file_path(mut self, path: impl Into<PathBuf>) -> Self {
532 self.file_path = Some(path.into());
533 self
534 }
535
536 pub fn with_exporter_type(mut self, exporter_type: OtelExporterType) -> Self {
538 self.exporter_type = Some(exporter_type);
539 self
540 }
541
542 pub fn with_source_name(mut self, source_name: impl Into<String>) -> Self {
546 self.source_name = Some(source_name.into());
547 self
548 }
549
550 pub fn with_capture_content(mut self, capture: bool) -> Self {
554 self.capture_content = Some(capture);
555 self
556 }
557
558 pub fn is_empty(&self) -> bool {
561 self.otlp_endpoint.is_none()
562 && self.otlp_protocol.is_none()
563 && self.file_path.is_none()
564 && self.exporter_type.is_none()
565 && self.source_name.is_none()
566 && self.capture_content.is_none()
567 }
568}
569
570impl Default for ClientOptions {
571 fn default() -> Self {
572 Self {
573 program: CliProgram::Resolve,
574 prefix_args: Vec::new(),
575 working_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
576 env: Vec::new(),
577 env_remove: Vec::new(),
578 extra_args: Vec::new(),
579 transport: Transport::default(),
580 github_token: None,
581 use_logged_in_user: None,
582 log_level: None,
583 session_idle_timeout_seconds: None,
584 on_list_models: None,
585 session_fs: None,
586 request_handler: None,
587 on_get_trace_context: None,
588 telemetry: None,
589 base_directory: None,
590 enable_remote_sessions: false,
591 bundled_cli_extract_dir: None,
592 mode: ClientMode::default(),
593 }
594 }
595}
596
597impl ClientOptions {
598 pub fn new() -> Self {
614 Self::default()
615 }
616
617 pub fn with_program(mut self, program: impl Into<CliProgram>) -> Self {
619 self.program = program.into();
620 self
621 }
622
623 pub fn with_prefix_args<I, S>(mut self, args: I) -> Self
625 where
626 I: IntoIterator<Item = S>,
627 S: Into<OsString>,
628 {
629 self.prefix_args = args.into_iter().map(Into::into).collect();
630 self
631 }
632
633 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
635 self.working_directory = cwd.into();
636 self
637 }
638
639 pub fn with_env<I, K, V>(mut self, env: I) -> Self
641 where
642 I: IntoIterator<Item = (K, V)>,
643 K: Into<OsString>,
644 V: Into<OsString>,
645 {
646 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
647 self
648 }
649
650 pub fn with_env_remove<I, S>(mut self, names: I) -> Self
652 where
653 I: IntoIterator<Item = S>,
654 S: Into<OsString>,
655 {
656 self.env_remove = names.into_iter().map(Into::into).collect();
657 self
658 }
659
660 pub fn with_extra_args<I, S>(mut self, args: I) -> Self
662 where
663 I: IntoIterator<Item = S>,
664 S: Into<String>,
665 {
666 self.extra_args = args.into_iter().map(Into::into).collect();
667 self
668 }
669
670 pub fn with_transport(mut self, transport: Transport) -> Self {
672 self.transport = transport;
673 self
674 }
675
676 pub fn with_github_token(mut self, token: impl Into<String>) -> Self {
679 self.github_token = Some(token.into());
680 self
681 }
682
683 pub fn with_use_logged_in_user(mut self, use_logged_in: bool) -> Self {
686 self.use_logged_in_user = Some(use_logged_in);
687 self
688 }
689
690 pub fn with_log_level(mut self, level: LogLevel) -> Self {
692 self.log_level = Some(level);
693 self
694 }
695
696 pub fn with_session_idle_timeout_seconds(mut self, seconds: u64) -> Self {
699 self.session_idle_timeout_seconds = Some(seconds);
700 self
701 }
702
703 pub fn with_list_models_handler<H>(mut self, handler: H) -> Self
706 where
707 H: ListModelsHandler + 'static,
708 {
709 self.on_list_models = Some(Arc::new(handler));
710 self
711 }
712
713 pub fn with_session_fs(mut self, config: SessionFsConfig) -> Self {
715 self.session_fs = Some(config);
716 self
717 }
718
719 pub fn with_request_handler<H>(mut self, handler: H) -> Self
724 where
725 H: crate::copilot_request_handler::CopilotRequestHandler,
726 {
727 self.request_handler = Some(Arc::new(handler));
728 self
729 }
730
731 pub fn with_trace_context_provider<P>(mut self, provider: P) -> Self
735 where
736 P: TraceContextProvider + 'static,
737 {
738 self.on_get_trace_context = Some(Arc::new(provider));
739 self
740 }
741
742 pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
744 self.telemetry = Some(config);
745 self
746 }
747
748 pub fn with_base_directory(mut self, dir: impl Into<PathBuf>) -> Self {
751 self.base_directory = Some(dir.into());
752 self
753 }
754
755 pub fn with_enable_remote_sessions(mut self, enabled: bool) -> Self {
758 self.enable_remote_sessions = enabled;
759 self
760 }
761
762 pub fn with_bundled_cli_extract_dir(mut self, dir: impl Into<PathBuf>) -> Self {
772 self.bundled_cli_extract_dir = Some(dir.into());
773 self
774 }
775
776 pub fn with_mode(mut self, mode: ClientMode) -> Self {
781 self.mode = mode;
782 self
783 }
784}
785
786fn validate_session_fs_config(cfg: &SessionFsConfig) -> Result<()> {
788 if cfg.initial_cwd.trim().is_empty() {
789 return Err(Error::with_message(
790 ErrorKind::Session(SessionErrorKind::InvalidSessionFsConfig),
791 "invalid SessionFsConfig: initial_cwd must not be empty",
792 ));
793 }
794 if cfg.session_state_path.trim().is_empty() {
795 return Err(Error::with_message(
796 ErrorKind::Session(SessionErrorKind::InvalidSessionFsConfig),
797 "invalid SessionFsConfig: session_state_path must not be empty",
798 ));
799 }
800 Ok(())
801}
802
803fn generate_connection_token() -> String {
810 let mut bytes = [0u8; 16];
811 getrandom::getrandom(&mut bytes)
812 .expect("OS CSPRNG (getrandom) is unavailable; cannot generate connection token");
813 let mut hex = String::with_capacity(32);
814 for byte in bytes {
815 use std::fmt::Write;
816 let _ = write!(hex, "{byte:02x}");
817 }
818 hex
819}
820
821#[derive(Clone)]
826pub struct Client {
827 inner: Arc<ClientInner>,
828}
829
830impl std::fmt::Debug for Client {
831 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
832 f.debug_struct("Client")
833 .field("working_directory", &self.inner.cwd)
834 .field("pid", &self.pid())
835 .finish()
836 }
837}
838
839struct ClientInner {
840 child: parking_lot::Mutex<Option<Child>>,
841 rpc: JsonRpcClient,
842 cwd: PathBuf,
843 request_rx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<JsonRpcRequest>>>,
844 notification_tx: broadcast::Sender<JsonRpcNotification>,
845 router: router::SessionRouter,
846 negotiated_protocol_version: OnceLock<u32>,
847 state: parking_lot::Mutex<ConnectionState>,
848 lifecycle_tx: broadcast::Sender<SessionLifecycleEvent>,
849 on_list_models: Option<Arc<dyn ListModelsHandler>>,
850 models_cache: parking_lot::Mutex<Arc<tokio::sync::OnceCell<Vec<Model>>>>,
851 session_fs_configured: bool,
852 session_fs_sqlite_declared: bool,
853 llm_inference: OnceLock<Arc<copilot_request_handler::CopilotRequestDispatcher>>,
856 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
857 effective_connection_token: Option<String>,
862 pub(crate) mode: ClientMode,
865}
866
867impl Client {
868 pub async fn start(options: ClientOptions) -> Result<Self> {
881 let start_time = Instant::now();
882 if options.mode == ClientMode::Empty
883 && options.base_directory.is_none()
884 && options.session_fs.is_none()
885 {
886 return Err(Error::with_message(
887 ErrorKind::InvalidConfig,
888 "ClientMode::Empty requires either `base_directory` or \
889 `session_fs` to be set (no implicit ~/.copilot fallback).",
890 ));
891 }
892 if let Some(cfg) = &options.session_fs {
893 validate_session_fs_config(cfg)?;
894 }
895 if matches!(options.transport, Transport::External { .. }) {
898 if options.github_token.is_some() {
899 return Err(Error::with_message(
900 ErrorKind::InvalidConfig,
901 "invalid client configuration: github_token cannot be used with \
902 Transport::External (external server manages its own auth)",
903 ));
904 }
905 if options.use_logged_in_user == Some(true) {
906 return Err(Error::with_message(
907 ErrorKind::InvalidConfig,
908 "invalid client configuration: use_logged_in_user cannot be used with \
909 Transport::External (external server manages its own auth)",
910 ));
911 }
912 }
913 match &options.transport {
917 Transport::Tcp {
918 connection_token: Some(t),
919 ..
920 }
921 | Transport::External {
922 connection_token: Some(t),
923 ..
924 } if t.is_empty() => {
925 return Err(Error::with_message(
926 ErrorKind::InvalidConfig,
927 "invalid client configuration: connection_token must be a non-empty string",
928 ));
929 }
930 _ => {}
931 }
932 let mut options = options;
937 let effective_connection_token: Option<String> = match &mut options.transport {
938 Transport::Stdio => None,
939 Transport::Tcp {
940 connection_token, ..
941 } => Some(
942 connection_token
943 .get_or_insert_with(generate_connection_token)
944 .clone(),
945 ),
946 Transport::External {
947 connection_token, ..
948 } => connection_token.clone(),
949 };
950 let session_fs_config = options.session_fs.clone();
951 let request_handler = options.request_handler.clone();
952 let session_fs_sqlite_declared = session_fs_config
953 .as_ref()
954 .and_then(|c| c.capabilities.as_ref())
955 .is_some_and(|caps| caps.sqlite);
956 let program = match &options.program {
957 CliProgram::Path(path) => {
958 info!(path = %path.display(), "using explicit copilot CLI path");
959 path.clone()
960 }
961 CliProgram::Resolve => {
962 let resolved = resolve::copilot_binary_with_extract_dir(
963 options.bundled_cli_extract_dir.as_deref(),
964 )?;
965 info!(path = %resolved.display(), "resolved copilot CLI");
966 #[cfg(windows)]
967 {
968 if let Some(ext) = resolved.extension().and_then(|e| e.to_str()).filter(|ext| {
969 ext.eq_ignore_ascii_case("cmd") || ext.eq_ignore_ascii_case("bat")
970 }) {
971 warn!(
972 path = %resolved.display(),
973 ext = %ext,
974 "resolved copilot CLI is a .cmd/.bat wrapper; \
975 this may cause console window flashes on Windows"
976 );
977 }
978 }
979 resolved
980 }
981 };
982
983 let client = match options.transport {
984 Transport::External {
985 ref host,
986 port,
987 connection_token: _,
988 } => {
989 info!(host = %host, port = %port, "connecting to external CLI server");
990 let connect_start = Instant::now();
991 let stream = TcpStream::connect((host.as_str(), port)).await?;
992 debug!(
993 elapsed_ms = connect_start.elapsed().as_millis(),
994 host = %host,
995 port,
996 "Client::start TCP connect complete"
997 );
998 let (reader, writer) = tokio::io::split(stream);
999 Self::from_transport(
1000 reader,
1001 writer,
1002 None,
1003 options.working_directory,
1004 options.on_list_models,
1005 session_fs_config.is_some(),
1006 session_fs_sqlite_declared,
1007 options.on_get_trace_context,
1008 effective_connection_token.clone(),
1009 options.mode,
1010 )?
1011 }
1012 Transport::Tcp {
1013 port,
1014 connection_token: _,
1015 } => {
1016 let (mut child, actual_port) = Self::spawn_tcp(&program, &options, port).await?;
1017 let connect_start = Instant::now();
1018 let stream = TcpStream::connect(("127.0.0.1", actual_port)).await?;
1019 debug!(
1020 elapsed_ms = connect_start.elapsed().as_millis(),
1021 port = actual_port,
1022 "Client::start TCP connect complete"
1023 );
1024 let (reader, writer) = tokio::io::split(stream);
1025 Self::drain_stderr(&mut child);
1026 Self::from_transport(
1027 reader,
1028 writer,
1029 Some(child),
1030 options.working_directory,
1031 options.on_list_models,
1032 session_fs_config.is_some(),
1033 session_fs_sqlite_declared,
1034 options.on_get_trace_context,
1035 effective_connection_token.clone(),
1036 options.mode,
1037 )?
1038 }
1039 Transport::Stdio => {
1040 let mut child = Self::spawn_stdio(&program, &options)?;
1041 let stdin = child.stdin.take().expect("stdin is piped");
1042 let stdout = child.stdout.take().expect("stdout is piped");
1043 Self::drain_stderr(&mut child);
1044 Self::from_transport(
1045 stdout,
1046 stdin,
1047 Some(child),
1048 options.working_directory,
1049 options.on_list_models,
1050 session_fs_config.is_some(),
1051 session_fs_sqlite_declared,
1052 options.on_get_trace_context,
1053 effective_connection_token.clone(),
1054 options.mode,
1055 )?
1056 }
1057 };
1058
1059 debug!(
1060 elapsed_ms = start_time.elapsed().as_millis(),
1061 "Client::start transport setup complete"
1062 );
1063 client.verify_protocol_version().await?;
1064 debug!(
1065 elapsed_ms = start_time.elapsed().as_millis(),
1066 "Client::start protocol verification complete"
1067 );
1068 if let Some(cfg) = session_fs_config {
1069 let session_fs_start = Instant::now();
1070 let capabilities = cfg.capabilities.as_ref().map(|c| {
1071 crate::generated::api_types::SessionFsSetProviderCapabilities {
1072 sqlite: Some(c.sqlite),
1073 }
1074 });
1075 let request = crate::generated::api_types::SessionFsSetProviderRequest {
1076 capabilities,
1077 conventions: cfg.conventions.into_wire(),
1078 initial_cwd: cfg.initial_cwd,
1079 session_state_path: cfg.session_state_path,
1080 };
1081 client.rpc().session_fs().set_provider(request).await?;
1082 debug!(
1083 elapsed_ms = session_fs_start.elapsed().as_millis(),
1084 "Client::start session filesystem setup complete"
1085 );
1086 }
1087 if let Some(handler) = request_handler {
1088 let llm_inference_start = Instant::now();
1089 let dispatcher = Arc::new(copilot_request_handler::CopilotRequestDispatcher::new(
1090 handler,
1091 ));
1092 dispatcher.set_client(Arc::downgrade(&client.inner));
1093 let _ = client.inner.llm_inference.set(dispatcher.clone());
1094 client.inner.router.ensure_started(
1097 &client.inner.notification_tx,
1098 &client.inner.request_rx,
1099 Some(dispatcher.clone()),
1100 );
1101 client.rpc().llm_inference().set_provider().await?;
1102 debug!(
1103 elapsed_ms = llm_inference_start.elapsed().as_millis(),
1104 "Client::start Copilot request handler registration complete"
1105 );
1106 }
1107 debug!(
1108 elapsed_ms = start_time.elapsed().as_millis(),
1109 "Client::start complete"
1110 );
1111 Ok(client)
1112 }
1113
1114 pub fn from_streams(
1118 reader: impl AsyncRead + Unpin + Send + 'static,
1119 writer: impl AsyncWrite + Unpin + Send + 'static,
1120 cwd: PathBuf,
1121 ) -> Result<Self> {
1122 Self::from_transport(
1123 reader,
1124 writer,
1125 None,
1126 cwd,
1127 None,
1128 false,
1129 false,
1130 None,
1131 None,
1132 ClientMode::default(),
1133 )
1134 }
1135
1136 #[cfg(any(test, feature = "test-support"))]
1144 pub fn from_streams_with_trace_provider(
1145 reader: impl AsyncRead + Unpin + Send + 'static,
1146 writer: impl AsyncWrite + Unpin + Send + 'static,
1147 cwd: PathBuf,
1148 provider: Arc<dyn TraceContextProvider>,
1149 ) -> Result<Self> {
1150 Self::from_transport(
1151 reader,
1152 writer,
1153 None,
1154 cwd,
1155 None,
1156 false,
1157 false,
1158 Some(provider),
1159 None,
1160 ClientMode::default(),
1161 )
1162 }
1163
1164 #[cfg(any(test, feature = "test-support"))]
1168 pub fn from_streams_with_connection_token(
1169 reader: impl AsyncRead + Unpin + Send + 'static,
1170 writer: impl AsyncWrite + Unpin + Send + 'static,
1171 cwd: PathBuf,
1172 token: Option<String>,
1173 ) -> Result<Self> {
1174 Self::from_transport(
1175 reader,
1176 writer,
1177 None,
1178 cwd,
1179 None,
1180 false,
1181 false,
1182 None,
1183 token,
1184 ClientMode::default(),
1185 )
1186 }
1187
1188 #[cfg(any(test, feature = "test-support"))]
1194 pub fn generate_connection_token_for_test() -> String {
1195 generate_connection_token()
1196 }
1197
1198 #[allow(clippy::too_many_arguments)]
1199 fn from_transport(
1200 reader: impl AsyncRead + Unpin + Send + 'static,
1201 writer: impl AsyncWrite + Unpin + Send + 'static,
1202 child: Option<Child>,
1203 cwd: PathBuf,
1204 on_list_models: Option<Arc<dyn ListModelsHandler>>,
1205 session_fs_configured: bool,
1206 session_fs_sqlite_declared: bool,
1207 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
1208 effective_connection_token: Option<String>,
1209 mode: ClientMode,
1210 ) -> Result<Self> {
1211 let setup_start = Instant::now();
1212 let (request_tx, request_rx) = mpsc::unbounded_channel::<JsonRpcRequest>();
1213 let (notification_broadcast_tx, _) = broadcast::channel::<JsonRpcNotification>(1024);
1214 let rpc = JsonRpcClient::new(
1215 writer,
1216 reader,
1217 notification_broadcast_tx.clone(),
1218 request_tx,
1219 );
1220
1221 let pid = child.as_ref().and_then(|c| c.id());
1222 info!(pid = ?pid, "copilot CLI client ready");
1223
1224 let client = Self {
1225 inner: Arc::new(ClientInner {
1226 child: parking_lot::Mutex::new(child),
1227 rpc,
1228 cwd,
1229 request_rx: parking_lot::Mutex::new(Some(request_rx)),
1230 notification_tx: notification_broadcast_tx,
1231 router: router::SessionRouter::new(),
1232 negotiated_protocol_version: OnceLock::new(),
1233 state: parking_lot::Mutex::new(ConnectionState::Connected),
1234 lifecycle_tx: broadcast::channel(256).0,
1235 on_list_models,
1236 models_cache: parking_lot::Mutex::new(Arc::new(tokio::sync::OnceCell::new())),
1237 session_fs_configured,
1238 session_fs_sqlite_declared,
1239 llm_inference: OnceLock::new(),
1240 on_get_trace_context,
1241 effective_connection_token,
1242 mode,
1243 }),
1244 };
1245 client.spawn_lifecycle_dispatcher();
1246 debug!(
1247 elapsed_ms = setup_start.elapsed().as_millis(),
1248 pid = ?pid,
1249 "Client::from_transport setup complete"
1250 );
1251 Ok(client)
1252 }
1253
1254 fn spawn_lifecycle_dispatcher(&self) {
1258 let inner = Arc::clone(&self.inner);
1259 let mut notif_rx = inner.notification_tx.subscribe();
1260 tokio::spawn(async move {
1261 loop {
1262 match notif_rx.recv().await {
1263 Ok(notification) => {
1264 if notification.method != "session.lifecycle" {
1265 continue;
1266 }
1267 let Some(params) = notification.params.as_ref() else {
1268 continue;
1269 };
1270 let event: SessionLifecycleEvent =
1271 match serde_json::from_value(params.clone()) {
1272 Ok(e) => e,
1273 Err(e) => {
1274 warn!(
1275 error = %e,
1276 "failed to deserialize session.lifecycle notification"
1277 );
1278 continue;
1279 }
1280 };
1281 let _ = inner.lifecycle_tx.send(event);
1284 }
1285 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1286 warn!(missed = n, "lifecycle dispatcher lagged");
1287 }
1288 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1289 }
1290 }
1291 });
1292 }
1293
1294 fn build_command(program: &Path, options: &ClientOptions) -> Command {
1295 let mut command = Command::new(program);
1296 for arg in &options.prefix_args {
1297 command.arg(arg);
1298 }
1299 if let Some(token) = &options.github_token {
1302 command.env("COPILOT_SDK_AUTH_TOKEN", token);
1303 }
1304 if let Some(telemetry) = &options.telemetry {
1307 command.env("COPILOT_OTEL_ENABLED", "true");
1308 if let Some(endpoint) = &telemetry.otlp_endpoint {
1309 command.env("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
1310 }
1311 if let Some(protocol) = telemetry.otlp_protocol {
1312 command.env("OTEL_EXPORTER_OTLP_PROTOCOL", protocol.as_str());
1313 }
1314 if let Some(path) = &telemetry.file_path {
1315 command.env("COPILOT_OTEL_FILE_EXPORTER_PATH", path);
1316 }
1317 if let Some(exporter) = telemetry.exporter_type {
1318 command.env("COPILOT_OTEL_EXPORTER_TYPE", exporter.as_str());
1319 }
1320 if let Some(source) = &telemetry.source_name {
1321 command.env("COPILOT_OTEL_SOURCE_NAME", source);
1322 }
1323 if let Some(capture) = telemetry.capture_content {
1324 command.env(
1325 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
1326 if capture { "true" } else { "false" },
1327 );
1328 }
1329 }
1330 if let Some(dir) = &options.base_directory {
1331 command.env("COPILOT_HOME", dir);
1332 }
1333 if options.mode == ClientMode::Empty {
1336 command.env("COPILOT_DISABLE_KEYTAR", "1");
1337 }
1338 if let Transport::Tcp {
1339 connection_token: Some(token),
1340 ..
1341 } = &options.transport
1342 {
1343 command.env("COPILOT_CONNECTION_TOKEN", token);
1344 }
1345 for (key, value) in &options.env {
1346 command.env(key, value);
1347 }
1348 for key in &options.env_remove {
1349 command.env_remove(key);
1350 }
1351 command
1352 .current_dir(&options.working_directory)
1353 .stdout(Stdio::piped())
1354 .stderr(Stdio::piped());
1355
1356 #[cfg(windows)]
1357 {
1358 use std::os::windows::process::CommandExt;
1359 const CREATE_NO_WINDOW: u32 = 0x08000000;
1360 command.as_std_mut().creation_flags(CREATE_NO_WINDOW);
1361 }
1362
1363 command
1364 }
1365
1366 fn auth_args(options: &ClientOptions) -> Vec<&'static str> {
1374 let mut args: Vec<&'static str> = Vec::new();
1375 if options.github_token.is_some() {
1376 args.push("--auth-token-env");
1377 args.push("COPILOT_SDK_AUTH_TOKEN");
1378 }
1379 let use_logged_in = options
1380 .use_logged_in_user
1381 .unwrap_or(options.github_token.is_none());
1382 if !use_logged_in {
1383 args.push("--no-auto-login");
1384 }
1385 args
1386 }
1387
1388 fn session_idle_timeout_args(options: &ClientOptions) -> Vec<String> {
1392 match options.session_idle_timeout_seconds {
1393 Some(secs) if secs > 0 => {
1394 vec!["--session-idle-timeout".to_string(), secs.to_string()]
1395 }
1396 _ => Vec::new(),
1397 }
1398 }
1399
1400 fn remote_args(options: &ClientOptions) -> Vec<String> {
1401 if options.enable_remote_sessions {
1402 vec!["--remote".to_string()]
1403 } else {
1404 Vec::new()
1405 }
1406 }
1407
1408 fn log_level_args(options: &ClientOptions) -> Vec<&'static str> {
1409 match options.log_level {
1410 Some(level) => vec!["--log-level", level.as_str()],
1411 None => Vec::new(),
1412 }
1413 }
1414
1415 fn spawn_stdio(program: &Path, options: &ClientOptions) -> Result<Child> {
1416 info!(cwd = ?options.working_directory, program = %program.display(), "spawning copilot CLI (stdio)");
1417 let mut command = Self::build_command(program, options);
1418 command
1419 .args(["--server", "--stdio", "--no-auto-update"])
1420 .args(Self::log_level_args(options))
1421 .args(Self::auth_args(options))
1422 .args(Self::session_idle_timeout_args(options))
1423 .args(Self::remote_args(options))
1424 .args(&options.extra_args)
1425 .stdin(Stdio::piped());
1426 let spawn_start = Instant::now();
1427 let child = command.spawn()?;
1428 debug!(
1429 elapsed_ms = spawn_start.elapsed().as_millis(),
1430 "Client::spawn_stdio subprocess spawned"
1431 );
1432 Ok(child)
1433 }
1434
1435 async fn spawn_tcp(program: &Path, options: &ClientOptions, port: u16) -> Result<(Child, u16)> {
1436 info!(cwd = ?options.working_directory, program = %program.display(), port = %port, "spawning copilot CLI (tcp)");
1437 let mut command = Self::build_command(program, options);
1438 command
1439 .args(["--server", "--port", &port.to_string(), "--no-auto-update"])
1440 .args(Self::log_level_args(options))
1441 .args(Self::auth_args(options))
1442 .args(Self::session_idle_timeout_args(options))
1443 .args(Self::remote_args(options))
1444 .args(&options.extra_args)
1445 .stdin(Stdio::null());
1446 let spawn_start = Instant::now();
1447 let mut child = command.spawn()?;
1448 debug!(
1449 elapsed_ms = spawn_start.elapsed().as_millis(),
1450 "Client::spawn_tcp subprocess spawned"
1451 );
1452 let stdout = child.stdout.take().expect("stdout is piped");
1453
1454 let (port_tx, port_rx) = oneshot::channel::<u16>();
1455 let span = tracing::error_span!("copilot_cli_port_scan");
1456 tokio::spawn(
1457 async move {
1458 let port_re = regex::Regex::new(r"listening on port (\d+)").expect("valid regex");
1460 let mut lines = BufReader::new(stdout).lines();
1461 let mut port_tx = Some(port_tx);
1462 while let Ok(Some(line)) = lines.next_line().await {
1463 debug!(line = %line, "CLI stdout");
1464 if let Some(tx) = port_tx.take() {
1465 if let Some(caps) = port_re.captures(&line)
1466 && let Some(p) =
1467 caps.get(1).and_then(|m| m.as_str().parse::<u16>().ok())
1468 {
1469 let _ = tx.send(p);
1470 continue;
1471 }
1472 port_tx = Some(tx);
1474 }
1475 }
1476 }
1477 .instrument(span),
1478 );
1479
1480 let port_wait_start = Instant::now();
1481 let actual_port = tokio::time::timeout(std::time::Duration::from_secs(10), port_rx)
1482 .await
1483 .map_err(|_| Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupTimeout)))?
1484 .map_err(|_| Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupFailed)))?;
1485
1486 debug!(
1487 elapsed_ms = port_wait_start.elapsed().as_millis(),
1488 port = actual_port,
1489 "Client::spawn_tcp TCP port wait complete"
1490 );
1491 info!(port = %actual_port, "CLI server listening");
1492 Ok((child, actual_port))
1493 }
1494
1495 fn drain_stderr(child: &mut Child) {
1496 if let Some(stderr) = child.stderr.take() {
1497 let span = tracing::error_span!("copilot_cli");
1498 tokio::spawn(
1499 async move {
1500 let mut reader = BufReader::new(stderr).lines();
1501 while let Ok(Some(line)) = reader.next_line().await {
1502 warn!(line = %line, "CLI stderr");
1503 }
1504 }
1505 .instrument(span),
1506 );
1507 }
1508 }
1509
1510 pub fn cwd(&self) -> &PathBuf {
1512 &self.inner.cwd
1513 }
1514
1515 pub fn mode(&self) -> ClientMode {
1517 self.inner.mode
1518 }
1519
1520 pub fn rpc(&self) -> crate::generated::rpc::ClientRpc<'_> {
1531 crate::generated::rpc::ClientRpc { client: self }
1532 }
1533
1534 #[allow(dead_code, reason = "convenience for future internal use")]
1536 pub(crate) async fn send_request(
1537 &self,
1538 method: &str,
1539 params: Option<serde_json::Value>,
1540 ) -> Result<JsonRpcResponse> {
1541 self.inner.rpc.send_request(method, params).await
1542 }
1543
1544 pub async fn call(
1564 &self,
1565 method: &str,
1566 params: Option<serde_json::Value>,
1567 ) -> Result<serde_json::Value> {
1568 self.call_with_inline_callback(method, params, None).await
1569 }
1570
1571 pub(crate) async fn call_with_inline_callback(
1586 &self,
1587 method: &str,
1588 params: Option<serde_json::Value>,
1589 inline_callback: Option<crate::jsonrpc::InlineResponseCallback>,
1590 ) -> Result<serde_json::Value> {
1591 let session_id: Option<SessionId> = params
1592 .as_ref()
1593 .and_then(|p| p.get("sessionId"))
1594 .and_then(|v| v.as_str())
1595 .map(SessionId::from);
1596 let response = self
1597 .inner
1598 .rpc
1599 .send_request_with_inline_callback(method, params, inline_callback)
1600 .await?;
1601 if let Some(err) = response.error {
1602 if err.message.contains("Session not found") {
1603 return Err(ErrorKind::Session(SessionErrorKind::NotFound(
1604 session_id.unwrap_or_else(|| "unknown".into()),
1605 ))
1606 .into());
1607 }
1608 return Err(Error::with_message(
1609 ErrorKind::Rpc { code: err.code },
1610 err.message,
1611 ));
1612 }
1613 Ok(response.result.unwrap_or(serde_json::Value::Null))
1614 }
1615
1616 pub(crate) async fn send_response(&self, response: &JsonRpcResponse) -> Result<()> {
1618 self.inner.rpc.write(response).await
1619 }
1620
1621 pub(crate) fn from_inner(inner: Arc<ClientInner>) -> Self {
1623 Self { inner }
1624 }
1625
1626 #[expect(dead_code, reason = "reserved for future pub(crate) use")]
1630 pub(crate) fn take_request_rx(&self) -> Option<mpsc::UnboundedReceiver<JsonRpcRequest>> {
1631 self.inner.request_rx.lock().take()
1632 }
1633
1634 pub(crate) fn register_session(
1642 &self,
1643 session_id: &SessionId,
1644 ) -> crate::router::SessionChannels {
1645 self.inner.router.ensure_started(
1646 &self.inner.notification_tx,
1647 &self.inner.request_rx,
1648 self.inner.llm_inference.get().cloned(),
1649 );
1650 self.inner.router.register(session_id)
1651 }
1652
1653 pub(crate) fn unregister_session(&self, session_id: &SessionId) {
1655 self.inner.router.unregister(session_id);
1656 }
1657
1658 pub fn protocol_version(&self) -> Option<u32> {
1665 self.inner.negotiated_protocol_version.get().copied()
1666 }
1667
1668 pub async fn verify_protocol_version(&self) -> Result<()> {
1692 let handshake_start = Instant::now();
1693 let mut used_fallback_ping = false;
1694 let server_version = match self.connect_handshake().await {
1698 Ok(v) => v,
1699 Err(ref e) if e.rpc_code() == Some(error_codes::METHOD_NOT_FOUND) => {
1700 used_fallback_ping = true;
1701 self.ping(None).await?.protocol_version
1702 }
1703 Err(e) => return Err(e),
1704 };
1705
1706 match server_version {
1707 None => {
1708 warn!("CLI server did not report protocolVersion; skipping version check");
1709 }
1710 Some(v) if !(MIN_PROTOCOL_VERSION..=SDK_PROTOCOL_VERSION).contains(&v) => {
1711 return Err(ErrorKind::Protocol(ProtocolErrorKind::VersionMismatch {
1712 server: v,
1713 min: MIN_PROTOCOL_VERSION,
1714 max: SDK_PROTOCOL_VERSION,
1715 })
1716 .into());
1717 }
1718 Some(v) => {
1719 if let Some(&existing) = self.inner.negotiated_protocol_version.get() {
1720 if existing != v {
1721 return Err(ErrorKind::Protocol(ProtocolErrorKind::VersionChanged {
1722 previous: existing,
1723 current: v,
1724 })
1725 .into());
1726 }
1727 } else {
1728 let _ = self.inner.negotiated_protocol_version.set(v);
1729 }
1730 }
1731 }
1732
1733 debug!(
1734 elapsed_ms = handshake_start.elapsed().as_millis(),
1735 protocol_version = ?server_version,
1736 used_fallback_ping,
1737 "Client::verify_protocol_version protocol handshake complete"
1738 );
1739 Ok(())
1740 }
1741
1742 async fn connect_handshake(&self) -> Result<Option<u32>> {
1749 let result = self
1750 .rpc()
1751 .connect(crate::generated::api_types::ConnectRequest {
1752 token: self.inner.effective_connection_token.clone(),
1753 })
1754 .await?;
1755 Ok(u32::try_from(result.protocol_version).ok())
1756 }
1757
1758 pub async fn ping(&self, message: Option<&str>) -> Result<crate::types::PingResponse> {
1766 let params = match message {
1767 Some(m) => serde_json::json!({ "message": m }),
1768 None => serde_json::json!({}),
1769 };
1770 let value = self
1771 .call(generated::api_types::rpc_methods::PING, Some(params))
1772 .await?;
1773 Ok(serde_json::from_value(value)?)
1774 }
1775
1776 pub async fn list_sessions(
1779 &self,
1780 filter: Option<SessionListFilter>,
1781 ) -> Result<Vec<SessionMetadata>> {
1782 let params = match filter {
1783 Some(f) => serde_json::json!({ "filter": f }),
1784 None => serde_json::json!({}),
1785 };
1786 let result = self.call("session.list", Some(params)).await?;
1787 let response: ListSessionsResponse = serde_json::from_value(result)?;
1788 Ok(response.sessions)
1789 }
1790
1791 pub async fn get_session_metadata(
1809 &self,
1810 session_id: &SessionId,
1811 ) -> Result<Option<SessionMetadata>> {
1812 let result = self
1813 .call(
1814 "session.getMetadata",
1815 Some(serde_json::json!({ "sessionId": session_id })),
1816 )
1817 .await?;
1818 let response: GetSessionMetadataResponse = serde_json::from_value(result)?;
1819 Ok(response.session)
1820 }
1821
1822 pub async fn delete_session(&self, session_id: &SessionId) -> Result<()> {
1824 self.call(
1825 "session.delete",
1826 Some(serde_json::json!({ "sessionId": session_id })),
1827 )
1828 .await?;
1829 Ok(())
1830 }
1831
1832 pub async fn get_last_session_id(&self) -> Result<Option<SessionId>> {
1848 let result = self
1849 .call("session.getLastId", Some(serde_json::json!({})))
1850 .await?;
1851 let response: GetLastSessionIdResponse = serde_json::from_value(result)?;
1852 Ok(response.session_id)
1853 }
1854
1855 pub async fn get_foreground_session_id(&self) -> Result<Option<SessionId>> {
1860 let result = self
1861 .call("session.getForeground", Some(serde_json::json!({})))
1862 .await?;
1863 let response: GetForegroundSessionResponse = serde_json::from_value(result)?;
1864 Ok(response.session_id)
1865 }
1866
1867 pub async fn set_foreground_session_id(&self, session_id: &SessionId) -> Result<()> {
1872 self.call(
1873 "session.setForeground",
1874 Some(serde_json::json!({ "sessionId": session_id })),
1875 )
1876 .await?;
1877 Ok(())
1878 }
1879
1880 pub async fn get_status(&self) -> Result<GetStatusResponse> {
1882 let result = self.call("status.get", Some(serde_json::json!({}))).await?;
1883 Ok(serde_json::from_value(result)?)
1884 }
1885
1886 pub async fn get_auth_status(&self) -> Result<GetAuthStatusResponse> {
1888 let result = self
1889 .call("auth.getStatus", Some(serde_json::json!({})))
1890 .await?;
1891 Ok(serde_json::from_value(result)?)
1892 }
1893
1894 pub async fn list_models(&self) -> Result<Vec<Model>> {
1899 let cache = self.inner.models_cache.lock().clone();
1900 let models = cache
1901 .get_or_try_init(|| async {
1902 if let Some(handler) = &self.inner.on_list_models {
1903 handler.list_models().await
1904 } else {
1905 Ok(self.rpc().models().list().await?.models)
1906 }
1907 })
1908 .await?;
1909 Ok(models.clone())
1910 }
1911
1912 pub(crate) async fn resolve_trace_context(&self) -> TraceContext {
1915 if let Some(provider) = &self.inner.on_get_trace_context {
1916 provider.get_trace_context().await
1917 } else {
1918 TraceContext::default()
1919 }
1920 }
1921
1922 pub fn pid(&self) -> Option<u32> {
1924 self.inner.child.lock().as_ref().and_then(|c| c.id())
1925 }
1926
1927 pub async fn stop(&self) -> std::result::Result<(), StopErrors> {
1954 let pid = self.pid();
1955 info!(pid = ?pid, "stopping CLI process");
1956 let mut errors: Vec<Error> = Vec::new();
1957
1958 for session_id in self.inner.router.session_ids() {
1961 match self
1962 .call(
1963 "session.destroy",
1964 Some(serde_json::json!({ "sessionId": session_id })),
1965 )
1966 .await
1967 {
1968 Ok(_) => {}
1969 Err(e) => {
1970 warn!(
1971 session_id = %session_id,
1972 error = %e,
1973 "session.destroy failed during Client::stop",
1974 );
1975 errors.push(e);
1976 }
1977 }
1978 self.inner.router.unregister(&session_id);
1979 }
1980
1981 let should_shutdown_runtime = self.inner.child.lock().is_some();
1982 if should_shutdown_runtime {
1983 let runtime_shutdown_start = Instant::now();
1984 match tokio::time::timeout(RUNTIME_SHUTDOWN_TIMEOUT, self.rpc().runtime().shutdown())
1985 .await
1986 {
1987 Ok(Ok(())) => {
1988 debug!(
1989 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
1990 "Client::stop runtime shutdown complete"
1991 );
1992 }
1993 Ok(Err(e)) => {
1994 warn!(
1995 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
1996 error = %e,
1997 "runtime.shutdown failed during Client::stop",
1998 );
1999 errors.push(e);
2000 }
2001 Err(_) => {
2002 let e = std::io::Error::new(
2003 std::io::ErrorKind::TimedOut,
2004 "runtime.shutdown timed out during Client::stop",
2005 );
2006 warn!(
2007 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
2008 timeout = ?RUNTIME_SHUTDOWN_TIMEOUT,
2009 error = %e,
2010 "runtime.shutdown timed out during Client::stop",
2011 );
2012 errors.push(e.into());
2013 }
2014 }
2015 }
2016
2017 let child = self.inner.child.lock().take();
2018 *self.inner.state.lock() = ConnectionState::Disconnected;
2019 *self.inner.models_cache.lock() = Arc::new(tokio::sync::OnceCell::new());
2020 if let Some(mut child) = child {
2021 match child.try_wait() {
2022 Ok(Some(_status)) => {}
2023 Ok(None) => {
2024 if let Err(e) = child.kill().await {
2031 errors.push(e.into());
2032 }
2033 }
2034 Err(e) => errors.push(e.into()),
2035 }
2036 }
2037
2038 info!(pid = ?pid, errors = errors.len(), "CLI process stopped");
2039 if errors.is_empty() {
2040 Ok(())
2041 } else {
2042 Err(StopErrors(errors))
2043 }
2044 }
2045
2046 pub fn force_stop(&self) {
2076 let pid = self.pid();
2077 info!(pid = ?pid, "force-stopping CLI process");
2078 if let Some(mut child) = self.inner.child.lock().take()
2079 && let Err(e) = child.start_kill()
2080 {
2081 error!(pid = ?pid, error = %e, "failed to send kill signal");
2082 }
2083 self.inner.rpc.force_close();
2084 self.inner.router.clear();
2087 *self.inner.state.lock() = ConnectionState::Disconnected;
2088 *self.inner.models_cache.lock() = Arc::new(tokio::sync::OnceCell::new());
2089 }
2090
2091 pub fn subscribe_lifecycle(&self) -> LifecycleSubscription {
2126 LifecycleSubscription::new(self.inner.lifecycle_tx.subscribe())
2127 }
2128}
2129
2130impl Drop for ClientInner {
2131 fn drop(&mut self) {
2132 if let Some(ref mut child) = *self.child.lock() {
2133 let pid = child.id();
2134 if let Err(e) = child.start_kill() {
2135 error!(pid = ?pid, error = %e, "failed to kill CLI process on drop");
2136 } else {
2137 info!(pid = ?pid, "kill signal sent for CLI process on drop");
2138 }
2139 }
2140 }
2141}
2142
2143#[cfg(test)]
2144mod tests {
2145 use super::*;
2146
2147 #[test]
2148 fn is_transport_failure_matches_request_cancelled() {
2149 let err = Error::from(ErrorKind::Protocol(ProtocolErrorKind::RequestCancelled));
2150 assert!(err.is_transport_failure());
2151 }
2152
2153 #[test]
2154 fn is_transport_failure_matches_io_error() {
2155 let err = Error::from(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone"));
2156 assert!(err.is_transport_failure());
2157 }
2158
2159 #[test]
2160 fn is_transport_failure_rejects_rpc_error() {
2161 let err = Error::with_message(ErrorKind::Rpc { code: -1 }, "bad");
2162 assert!(!err.is_transport_failure());
2163 }
2164
2165 #[test]
2166 fn is_transport_failure_rejects_session_error() {
2167 let err = Error::from(ErrorKind::Session(SessionErrorKind::NotFound("s1".into())));
2168 assert!(!err.is_transport_failure());
2169 }
2170
2171 #[test]
2172 fn client_options_builder_composes() {
2173 let opts = ClientOptions::new()
2174 .with_program(CliProgram::Path(PathBuf::from("/usr/local/bin/copilot")))
2175 .with_prefix_args(["node"])
2176 .with_cwd(PathBuf::from("/tmp"))
2177 .with_env([("KEY", "value")])
2178 .with_env_remove(["UNWANTED"])
2179 .with_extra_args(["--quiet"])
2180 .with_github_token("ghp_test")
2181 .with_use_logged_in_user(false)
2182 .with_log_level(LogLevel::Debug)
2183 .with_session_idle_timeout_seconds(120)
2184 .with_enable_remote_sessions(true);
2185 assert!(matches!(opts.program, CliProgram::Path(_)));
2186 assert_eq!(opts.prefix_args, vec![std::ffi::OsString::from("node")]);
2187 assert_eq!(opts.working_directory, PathBuf::from("/tmp"));
2188 assert_eq!(
2189 opts.env,
2190 vec![(
2191 std::ffi::OsString::from("KEY"),
2192 std::ffi::OsString::from("value")
2193 )]
2194 );
2195 assert_eq!(opts.env_remove, vec![std::ffi::OsString::from("UNWANTED")]);
2196 assert_eq!(opts.extra_args, vec!["--quiet".to_string()]);
2197 assert_eq!(opts.github_token.as_deref(), Some("ghp_test"));
2198 assert_eq!(opts.use_logged_in_user, Some(false));
2199 assert!(matches!(opts.log_level, Some(LogLevel::Debug)));
2200 assert_eq!(opts.session_idle_timeout_seconds, Some(120));
2201 assert!(opts.enable_remote_sessions);
2202 }
2203
2204 #[test]
2205 fn is_transport_failure_rejects_other_protocol_errors() {
2206 let err = Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupTimeout));
2207 assert!(!err.is_transport_failure());
2208 }
2209
2210 #[test]
2211 fn build_command_lets_env_remove_strip_injected_token() {
2212 let opts = ClientOptions {
2213 github_token: Some("secret".to_string()),
2214 env_remove: vec![std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN")],
2215 ..Default::default()
2216 };
2217 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2218 let action = cmd
2220 .as_std()
2221 .get_envs()
2222 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2223 .map(|(_, v)| v);
2224 assert_eq!(
2225 action,
2226 Some(None),
2227 "env_remove should win over github_token"
2228 );
2229 }
2230
2231 #[test]
2232 fn build_command_lets_env_override_injected_token() {
2233 let opts = ClientOptions {
2234 github_token: Some("from-options".to_string()),
2235 env: vec![(
2236 std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN"),
2237 std::ffi::OsString::from("from-env"),
2238 )],
2239 ..Default::default()
2240 };
2241 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2242 let value = cmd
2243 .as_std()
2244 .get_envs()
2245 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2246 .and_then(|(_, v)| v);
2247 assert_eq!(value, Some(std::ffi::OsStr::new("from-env")));
2248 }
2249
2250 #[test]
2251 fn build_command_injects_github_token_by_default() {
2252 let opts = ClientOptions {
2253 github_token: Some("just-the-token".to_string()),
2254 ..Default::default()
2255 };
2256 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2257 let value = cmd
2258 .as_std()
2259 .get_envs()
2260 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2261 .and_then(|(_, v)| v);
2262 assert_eq!(value, Some(std::ffi::OsStr::new("just-the-token")));
2263 }
2264
2265 fn env_value<'a>(cmd: &'a tokio::process::Command, key: &str) -> Option<&'a std::ffi::OsStr> {
2266 cmd.as_std()
2267 .get_envs()
2268 .find(|(k, _)| *k == std::ffi::OsStr::new(key))
2269 .and_then(|(_, v)| v)
2270 }
2271
2272 #[test]
2273 fn telemetry_config_builder_composes() {
2274 let cfg = TelemetryConfig::new()
2275 .with_otlp_endpoint("http://collector:4318")
2276 .with_otlp_protocol(OtlpHttpProtocol::HttpProtobuf)
2277 .with_file_path(PathBuf::from("/var/log/copilot.jsonl"))
2278 .with_exporter_type(OtelExporterType::OtlpHttp)
2279 .with_source_name("my-app")
2280 .with_capture_content(true);
2281
2282 assert_eq!(cfg.otlp_endpoint.as_deref(), Some("http://collector:4318"));
2283 assert_eq!(cfg.otlp_protocol, Some(OtlpHttpProtocol::HttpProtobuf));
2284 assert_eq!(
2285 cfg.file_path.as_deref(),
2286 Some(Path::new("/var/log/copilot.jsonl")),
2287 );
2288 assert_eq!(cfg.exporter_type, Some(OtelExporterType::OtlpHttp));
2289 assert_eq!(cfg.source_name.as_deref(), Some("my-app"));
2290 assert_eq!(cfg.capture_content, Some(true));
2291 assert!(!cfg.is_empty());
2292 assert!(TelemetryConfig::new().is_empty());
2293 }
2294
2295 #[test]
2296 fn otlp_http_protocol_serde_matches_env_value() {
2297 for (protocol, wire) in [
2298 (OtlpHttpProtocol::HttpJson, "http/json"),
2299 (OtlpHttpProtocol::HttpProtobuf, "http/protobuf"),
2300 ] {
2301 assert_eq!(protocol.as_str(), wire);
2302
2303 let serialized = serde_json::to_string(&protocol).unwrap();
2304 assert_eq!(serialized, format!("\"{wire}\""));
2305
2306 let deserialized: OtlpHttpProtocol = serde_json::from_str(&serialized).unwrap();
2307 assert_eq!(deserialized, protocol);
2308 }
2309 }
2310
2311 #[test]
2312 fn build_command_sets_otel_env_when_telemetry_enabled() {
2313 let opts = ClientOptions {
2314 telemetry: Some(TelemetryConfig {
2315 otlp_endpoint: Some("http://collector:4318".to_string()),
2316 otlp_protocol: Some(OtlpHttpProtocol::HttpProtobuf),
2317 file_path: Some(PathBuf::from("/var/log/copilot.jsonl")),
2318 exporter_type: Some(OtelExporterType::OtlpHttp),
2319 source_name: Some("my-app".to_string()),
2320 capture_content: Some(true),
2321 }),
2322 ..Default::default()
2323 };
2324 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2325 assert_eq!(
2326 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2327 Some(std::ffi::OsStr::new("true")),
2328 );
2329 assert_eq!(
2330 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2331 Some(std::ffi::OsStr::new("http://collector:4318")),
2332 );
2333 assert_eq!(
2334 env_value(&cmd, "OTEL_EXPORTER_OTLP_PROTOCOL"),
2335 Some(std::ffi::OsStr::new("http/protobuf")),
2336 );
2337 assert_eq!(
2338 env_value(&cmd, "COPILOT_OTEL_FILE_EXPORTER_PATH"),
2339 Some(std::ffi::OsStr::new("/var/log/copilot.jsonl")),
2340 );
2341 assert_eq!(
2342 env_value(&cmd, "COPILOT_OTEL_EXPORTER_TYPE"),
2343 Some(std::ffi::OsStr::new("otlp-http")),
2344 );
2345 assert_eq!(
2346 env_value(&cmd, "COPILOT_OTEL_SOURCE_NAME"),
2347 Some(std::ffi::OsStr::new("my-app")),
2348 );
2349 assert_eq!(
2350 env_value(&cmd, "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"),
2351 Some(std::ffi::OsStr::new("true")),
2352 );
2353 }
2354
2355 #[test]
2356 fn build_command_omits_otel_env_when_telemetry_none() {
2357 let opts = ClientOptions::default();
2358 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2359 for key in [
2360 "COPILOT_OTEL_ENABLED",
2361 "OTEL_EXPORTER_OTLP_ENDPOINT",
2362 "OTEL_EXPORTER_OTLP_PROTOCOL",
2363 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2364 "COPILOT_OTEL_EXPORTER_TYPE",
2365 "COPILOT_OTEL_SOURCE_NAME",
2366 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2367 ] {
2368 assert!(
2369 env_value(&cmd, key).is_none(),
2370 "expected {key} to be unset when telemetry is None",
2371 );
2372 }
2373 }
2374
2375 #[test]
2376 fn build_command_omits_unset_telemetry_fields() {
2377 let opts = ClientOptions {
2378 telemetry: Some(TelemetryConfig {
2379 otlp_endpoint: Some("http://collector:4318".to_string()),
2380 ..Default::default()
2381 }),
2382 ..Default::default()
2383 };
2384 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2385 assert_eq!(
2387 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2388 Some(std::ffi::OsStr::new("true")),
2389 );
2390 assert_eq!(
2391 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2392 Some(std::ffi::OsStr::new("http://collector:4318")),
2393 );
2394 for key in [
2396 "OTEL_EXPORTER_OTLP_PROTOCOL",
2397 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2398 "COPILOT_OTEL_EXPORTER_TYPE",
2399 "COPILOT_OTEL_SOURCE_NAME",
2400 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2401 ] {
2402 assert!(env_value(&cmd, key).is_none(), "{key} should be unset");
2403 }
2404 }
2405
2406 #[test]
2407 fn build_command_lets_user_env_override_telemetry() {
2408 let opts = ClientOptions {
2409 telemetry: Some(TelemetryConfig {
2410 otlp_endpoint: Some("http://from-config:4318".to_string()),
2411 ..Default::default()
2412 }),
2413 env: vec![(
2414 std::ffi::OsString::from("OTEL_EXPORTER_OTLP_ENDPOINT"),
2415 std::ffi::OsString::from("http://from-user-env:4318"),
2416 )],
2417 ..Default::default()
2418 };
2419 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2420 assert_eq!(
2421 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2422 Some(std::ffi::OsStr::new("http://from-user-env:4318")),
2423 "user-supplied options.env should override telemetry config",
2424 );
2425 }
2426
2427 #[test]
2428 fn build_command_sets_copilot_home_env_when_configured() {
2429 let opts = ClientOptions::new().with_base_directory(PathBuf::from("/custom/copilot"));
2430 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2431 assert_eq!(
2432 env_value(&cmd, "COPILOT_HOME"),
2433 Some(std::ffi::OsStr::new("/custom/copilot")),
2434 );
2435
2436 let opts = ClientOptions::default();
2437 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2438 assert!(env_value(&cmd, "COPILOT_HOME").is_none());
2439 }
2440
2441 #[test]
2442 fn build_command_sets_connection_token_env_when_configured() {
2443 let opts = ClientOptions::new().with_transport(Transport::Tcp {
2444 port: 0,
2445 connection_token: Some("secret-token".to_string()),
2446 });
2447 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2448 assert_eq!(
2449 env_value(&cmd, "COPILOT_CONNECTION_TOKEN"),
2450 Some(std::ffi::OsStr::new("secret-token")),
2451 );
2452
2453 let opts = ClientOptions::default();
2454 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2455 assert!(env_value(&cmd, "COPILOT_CONNECTION_TOKEN").is_none());
2456 }
2457
2458 #[tokio::test]
2459 async fn start_rejects_empty_connection_token() {
2460 let opts = ClientOptions::new()
2461 .with_transport(Transport::Tcp {
2462 port: 0,
2463 connection_token: Some(String::new()),
2464 })
2465 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2466 let err = Client::start(opts).await.unwrap_err();
2467 assert!(
2468 matches!(err.kind(), ErrorKind::InvalidConfig),
2469 "got {err:?}"
2470 );
2471 }
2472
2473 #[tokio::test]
2474 async fn start_rejects_empty_external_connection_token() {
2475 let opts = ClientOptions::new()
2476 .with_transport(Transport::External {
2477 host: "127.0.0.1".to_string(),
2478 port: 1,
2479 connection_token: Some(String::new()),
2480 })
2481 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2482 let err = Client::start(opts).await.unwrap_err();
2483 assert!(
2484 matches!(err.kind(), ErrorKind::InvalidConfig),
2485 "got {err:?}"
2486 );
2487 }
2488
2489 #[test]
2490 fn telemetry_config_capture_content_serializes_as_lowercase_bool() {
2491 let opts_true = ClientOptions {
2492 telemetry: Some(TelemetryConfig {
2493 capture_content: Some(true),
2494 ..Default::default()
2495 }),
2496 ..Default::default()
2497 };
2498 let opts_false = ClientOptions {
2499 telemetry: Some(TelemetryConfig {
2500 capture_content: Some(false),
2501 ..Default::default()
2502 }),
2503 ..Default::default()
2504 };
2505 let cmd_true = Client::build_command(Path::new("/bin/echo"), &opts_true);
2506 let cmd_false = Client::build_command(Path::new("/bin/echo"), &opts_false);
2507 assert_eq!(
2508 env_value(
2509 &cmd_true,
2510 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2511 ),
2512 Some(std::ffi::OsStr::new("true")),
2513 );
2514 assert_eq!(
2515 env_value(
2516 &cmd_false,
2517 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2518 ),
2519 Some(std::ffi::OsStr::new("false")),
2520 );
2521 }
2522
2523 #[test]
2524 fn session_idle_timeout_args_are_omitted_by_default() {
2525 let opts = ClientOptions::default();
2526 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2527 }
2528
2529 #[test]
2530 fn session_idle_timeout_args_omitted_for_zero() {
2531 let opts = ClientOptions {
2532 session_idle_timeout_seconds: Some(0),
2533 ..Default::default()
2534 };
2535 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2536 }
2537
2538 #[test]
2539 fn session_idle_timeout_args_emit_flag_for_positive_value() {
2540 let opts = ClientOptions {
2541 session_idle_timeout_seconds: Some(300),
2542 ..Default::default()
2543 };
2544 assert_eq!(
2545 Client::session_idle_timeout_args(&opts),
2546 vec!["--session-idle-timeout".to_string(), "300".to_string()]
2547 );
2548 }
2549
2550 #[test]
2551 fn remote_args_omitted_by_default() {
2552 let opts = ClientOptions::default();
2553 assert!(Client::remote_args(&opts).is_empty());
2554 }
2555
2556 #[test]
2557 fn remote_args_emit_flag_when_enabled() {
2558 let opts = ClientOptions {
2559 enable_remote_sessions: true,
2560 ..Default::default()
2561 };
2562 assert_eq!(Client::remote_args(&opts), vec!["--remote".to_string()]);
2563 }
2564
2565 #[test]
2566 fn log_level_args_omitted_when_unset() {
2567 let opts = ClientOptions::default();
2568 assert!(opts.log_level.is_none());
2569 assert!(
2570 Client::log_level_args(&opts).is_empty(),
2571 "with no caller-supplied log_level the SDK must not pass --log-level"
2572 );
2573 }
2574
2575 #[test]
2576 fn log_level_args_emit_flag_when_set() {
2577 let opts = ClientOptions::default().with_log_level(LogLevel::Debug);
2578 assert_eq!(Client::log_level_args(&opts), vec!["--log-level", "debug"]);
2579 }
2580
2581 #[test]
2582 fn log_level_str_round_trips() {
2583 for level in [
2584 LogLevel::None,
2585 LogLevel::Error,
2586 LogLevel::Warning,
2587 LogLevel::Info,
2588 LogLevel::Debug,
2589 LogLevel::All,
2590 ] {
2591 let s = level.as_str();
2592 let json = serde_json::to_string(&level).unwrap();
2593 assert_eq!(json, format!("\"{s}\""));
2594 let parsed: LogLevel = serde_json::from_str(&json).unwrap();
2595 assert_eq!(parsed, level);
2596 }
2597 }
2598
2599 #[test]
2600 fn client_options_debug_redacts_handler() {
2601 struct StubHandler;
2602 #[async_trait]
2603 impl ListModelsHandler for StubHandler {
2604 async fn list_models(&self) -> Result<Vec<Model>> {
2605 Ok(vec![])
2606 }
2607 }
2608 let opts = ClientOptions {
2609 on_list_models: Some(Arc::new(StubHandler)),
2610 github_token: Some("secret-token".into()),
2611 ..Default::default()
2612 };
2613 let debug = format!("{opts:?}");
2614 assert!(debug.contains("on_list_models: Some(\"<set>\")"));
2615 assert!(debug.contains("github_token: Some(\"<redacted>\")"));
2616 assert!(!debug.contains("secret-token"));
2617 }
2618
2619 #[tokio::test]
2620 async fn list_models_uses_on_list_models_handler_when_set() {
2621 use std::sync::atomic::{AtomicUsize, Ordering};
2622
2623 struct CountingHandler {
2624 calls: Arc<AtomicUsize>,
2625 models: Vec<Model>,
2626 }
2627 #[async_trait]
2628 impl ListModelsHandler for CountingHandler {
2629 async fn list_models(&self) -> Result<Vec<Model>> {
2630 self.calls.fetch_add(1, Ordering::SeqCst);
2631 Ok(self.models.clone())
2632 }
2633 }
2634
2635 let calls = Arc::new(AtomicUsize::new(0));
2636 let model = Model {
2637 id: "byok-gpt-4".into(),
2638 name: "BYOK GPT-4".into(),
2639 ..Default::default()
2640 };
2641 let handler: Arc<dyn ListModelsHandler> = Arc::new(CountingHandler {
2642 calls: Arc::clone(&calls),
2643 models: vec![model.clone()],
2644 });
2645
2646 let client = client_with_list_models_handler(handler);
2647
2648 let result = client.list_models().await.unwrap();
2649 assert_eq!(result.len(), 1);
2650 assert_eq!(result[0].id, "byok-gpt-4");
2651 assert_eq!(calls.load(Ordering::SeqCst), 1);
2652 }
2653
2654 #[tokio::test]
2655 async fn list_models_serializes_concurrent_cache_misses() {
2656 use std::sync::atomic::{AtomicUsize, Ordering};
2657
2658 struct SlowCountingHandler {
2659 calls: Arc<AtomicUsize>,
2660 models: Vec<Model>,
2661 }
2662 #[async_trait]
2663 impl ListModelsHandler for SlowCountingHandler {
2664 async fn list_models(&self) -> Result<Vec<Model>> {
2665 self.calls.fetch_add(1, Ordering::SeqCst);
2666 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
2667 Ok(self.models.clone())
2668 }
2669 }
2670
2671 let calls = Arc::new(AtomicUsize::new(0));
2672 let model = Model {
2673 id: "single-flight-model".into(),
2674 name: "Single Flight Model".into(),
2675 ..Default::default()
2676 };
2677 let handler: Arc<dyn ListModelsHandler> = Arc::new(SlowCountingHandler {
2678 calls: Arc::clone(&calls),
2679 models: vec![model],
2680 });
2681 let client = client_with_list_models_handler(handler);
2682
2683 let (first, second) = tokio::join!(client.list_models(), client.list_models());
2684 assert_eq!(first.unwrap()[0].id, "single-flight-model");
2685 assert_eq!(second.unwrap()[0].id, "single-flight-model");
2686 assert_eq!(calls.load(Ordering::SeqCst), 1);
2687 }
2688
2689 #[tokio::test]
2690 async fn cancelled_resume_session_unregisters_pending_session() {
2691 let (client_write, _server_read) = tokio::io::duplex(8192);
2692 let (_server_write, client_read) = tokio::io::duplex(8192);
2693 let client = Client::from_streams(client_read, client_write, std::env::temp_dir()).unwrap();
2694 let session_id = SessionId::new("resume-cancel-test");
2695 let handle = tokio::spawn({
2696 let client = client.clone();
2697 async move {
2698 client
2699 .resume_session(ResumeSessionConfig::new(session_id))
2700 .await
2701 }
2702 });
2703
2704 wait_for_pending_session_registration(&client).await;
2705 handle.abort();
2706 let _ = handle.await;
2707
2708 assert!(client.inner.router.session_ids().is_empty());
2709 client.force_stop();
2710 }
2711
2712 fn client_with_list_models_handler(handler: Arc<dyn ListModelsHandler>) -> Client {
2713 Client {
2714 inner: Arc::new(ClientInner {
2715 child: parking_lot::Mutex::new(None),
2716 rpc: {
2717 let (req_tx, _req_rx) = mpsc::unbounded_channel();
2718 let (notif_tx, _notif_rx) = broadcast::channel(16);
2719 let (read_pipe, _write_pipe) = tokio::io::duplex(64);
2720 let (_unused_read, write_pipe) = tokio::io::duplex(64);
2721 JsonRpcClient::new(write_pipe, read_pipe, notif_tx, req_tx)
2722 },
2723 cwd: PathBuf::from("."),
2724 request_rx: parking_lot::Mutex::new(None),
2725 notification_tx: broadcast::channel(16).0,
2726 router: router::SessionRouter::new(),
2727 negotiated_protocol_version: OnceLock::new(),
2728 state: parking_lot::Mutex::new(ConnectionState::Connected),
2729 lifecycle_tx: broadcast::channel(16).0,
2730 on_list_models: Some(handler),
2731 models_cache: parking_lot::Mutex::new(Arc::new(tokio::sync::OnceCell::new())),
2732 session_fs_configured: false,
2733 session_fs_sqlite_declared: false,
2734 llm_inference: OnceLock::new(),
2735 on_get_trace_context: None,
2736 effective_connection_token: None,
2737 mode: ClientMode::default(),
2738 }),
2739 }
2740 }
2741
2742 async fn wait_for_pending_session_registration(client: &Client) {
2743 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
2744 while client.inner.router.session_ids().is_empty() {
2745 assert!(
2746 tokio::time::Instant::now() < deadline,
2747 "session was not registered"
2748 );
2749 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2750 }
2751 }
2752}