1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3#![deny(rustdoc::broken_intra_doc_links)]
4#![cfg_attr(test, allow(clippy::unwrap_used))]
5
6pub mod canvas;
8mod canvas_dispatch;
9#[cfg(feature = "bundled-cli")]
11pub(crate) mod embeddedcli;
12mod errors;
13pub use errors::*;
14pub mod handler;
16pub mod hooks;
18mod jsonrpc;
19pub mod permission;
21pub(crate) mod resolve;
23mod router;
24pub mod session;
26pub mod session_fs;
28mod session_fs_dispatch;
29pub mod subscription;
31pub mod tool;
33pub mod trace_context;
35pub mod transforms;
37pub mod types;
39mod wire;
40
41pub mod session_events;
43
44pub mod rpc;
47
48pub(crate) mod generated;
53
54pub mod mode;
57
58use std::ffi::OsString;
59use std::path::{Path, PathBuf};
60use std::process::Stdio;
61use std::sync::{Arc, OnceLock};
62use std::time::{Duration, Instant};
63
64use async_trait::async_trait;
65pub(crate) use jsonrpc::{
68 JsonRpcClient, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, error_codes,
69};
70pub use mode::{BUILTIN_TOOLS_ISOLATED, ClientMode, ToolSet};
71
72#[cfg(feature = "test-support")]
74pub mod test_support {
75 pub use crate::jsonrpc::{
76 JsonRpcClient, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
77 error_codes,
78 };
79}
80use serde::{Deserialize, Serialize};
81use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader};
82use tokio::net::TcpStream;
83use tokio::process::{Child, Command};
84use tokio::sync::{broadcast, mpsc, oneshot};
85use tracing::{Instrument, debug, error, info, warn};
86pub use types::*;
87
88mod sdk_protocol_version;
89pub use sdk_protocol_version::{SDK_PROTOCOL_VERSION, get_sdk_protocol_version};
90pub use subscription::{EventSubscription, LifecycleSubscription};
91
92const MIN_PROTOCOL_VERSION: u32 = 3;
94const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
95
96#[derive(Debug, Default)]
98#[non_exhaustive]
99pub enum Transport {
100 #[default]
102 Stdio,
103 Tcp {
105 port: u16,
107 connection_token: Option<String>,
111 },
112 External {
114 host: String,
116 port: u16,
118 connection_token: Option<String>,
121 },
122}
123
124#[derive(Debug, Clone, Default)]
126pub enum CliProgram {
127 #[default]
130 Resolve,
131 Path(PathBuf),
133}
134
135impl From<PathBuf> for CliProgram {
136 fn from(path: PathBuf) -> Self {
137 Self::Path(path)
138 }
139}
140
141pub const HAS_BUNDLED_CLI: bool = cfg!(has_bundled_cli);
148
149pub fn install_bundled_cli() -> Option<PathBuf> {
171 #[cfg(feature = "bundled-cli")]
172 {
173 embeddedcli::path()
174 }
175 #[cfg(not(feature = "bundled-cli"))]
176 {
177 None
178 }
179}
180
181#[non_exhaustive]
191pub struct ClientOptions {
192 pub program: CliProgram,
194 pub prefix_args: Vec<OsString>,
196 pub working_directory: PathBuf,
198 pub env: Vec<(OsString, OsString)>,
200 pub env_remove: Vec<OsString>,
202 pub extra_args: Vec<String>,
204 pub transport: Transport,
206 pub github_token: Option<String>,
211 pub use_logged_in_user: Option<bool>,
215 pub log_level: Option<LogLevel>,
219 pub session_idle_timeout_seconds: Option<u64>,
225 pub on_list_models: Option<Arc<dyn ListModelsHandler>>,
233 pub session_fs: Option<SessionFsConfig>,
241 pub on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
251 pub telemetry: Option<TelemetryConfig>,
255 pub base_directory: Option<PathBuf>,
260 pub enable_remote_sessions: bool,
266 pub bundled_cli_extract_dir: Option<PathBuf>,
285 pub mode: ClientMode,
289}
290
291impl std::fmt::Debug for ClientOptions {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.debug_struct("ClientOptions")
294 .field("program", &self.program)
295 .field("prefix_args", &self.prefix_args)
296 .field("working_directory", &self.working_directory)
297 .field("env", &self.env)
298 .field("env_remove", &self.env_remove)
299 .field("extra_args", &self.extra_args)
300 .field("transport", &self.transport)
301 .field(
302 "github_token",
303 &self.github_token.as_ref().map(|_| "<redacted>"),
304 )
305 .field("use_logged_in_user", &self.use_logged_in_user)
306 .field("log_level", &self.log_level)
307 .field(
308 "session_idle_timeout_seconds",
309 &self.session_idle_timeout_seconds,
310 )
311 .field(
312 "on_list_models",
313 &self.on_list_models.as_ref().map(|_| "<set>"),
314 )
315 .field("session_fs", &self.session_fs)
316 .field(
317 "on_get_trace_context",
318 &self.on_get_trace_context.as_ref().map(|_| "<set>"),
319 )
320 .field("telemetry", &self.telemetry)
321 .field("base_directory", &self.base_directory)
322 .field("enable_remote_sessions", &self.enable_remote_sessions)
323 .field("bundled_cli_extract_dir", &self.bundled_cli_extract_dir)
324 .finish()
325 }
326}
327
328#[async_trait]
337pub trait ListModelsHandler: Send + Sync + 'static {
338 async fn list_models(&self) -> Result<Vec<Model>>;
340}
341
342#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
344#[serde(rename_all = "lowercase")]
345pub enum LogLevel {
346 None,
348 Error,
350 Warning,
352 Info,
354 Debug,
356 All,
358}
359
360impl LogLevel {
361 pub fn as_str(self) -> &'static str {
363 match self {
364 Self::None => "none",
365 Self::Error => "error",
366 Self::Warning => "warning",
367 Self::Info => "info",
368 Self::Debug => "debug",
369 Self::All => "all",
370 }
371 }
372}
373
374impl std::fmt::Display for LogLevel {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.write_str(self.as_str())
377 }
378}
379
380#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
385#[serde(rename_all = "kebab-case")]
386#[non_exhaustive]
387pub enum OtelExporterType {
388 OtlpHttp,
391 File,
394}
395
396impl OtelExporterType {
397 pub fn as_str(self) -> &'static str {
399 match self {
400 Self::OtlpHttp => "otlp-http",
401 Self::File => "file",
402 }
403 }
404}
405
406#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
412#[non_exhaustive]
413pub enum OtlpHttpProtocol {
414 #[serde(rename = "http/json")]
416 HttpJson,
417 #[serde(rename = "http/protobuf")]
419 HttpProtobuf,
420}
421
422impl OtlpHttpProtocol {
423 pub fn as_str(self) -> &'static str {
425 match self {
426 Self::HttpJson => "http/json",
427 Self::HttpProtobuf => "http/protobuf",
428 }
429 }
430}
431
432#[derive(Debug, Clone, Default)]
467#[non_exhaustive]
468pub struct TelemetryConfig {
469 pub otlp_endpoint: Option<String>,
471 pub otlp_protocol: Option<OtlpHttpProtocol>,
473 pub file_path: Option<PathBuf>,
475 pub exporter_type: Option<OtelExporterType>,
478 pub source_name: Option<String>,
482 pub capture_content: Option<bool>,
486}
487
488impl TelemetryConfig {
489 pub fn new() -> Self {
492 Self::default()
493 }
494
495 pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
497 self.otlp_endpoint = Some(endpoint.into());
498 self
499 }
500
501 pub fn with_otlp_protocol(mut self, protocol: OtlpHttpProtocol) -> Self {
503 self.otlp_protocol = Some(protocol);
504 self
505 }
506
507 pub fn with_file_path(mut self, path: impl Into<PathBuf>) -> Self {
509 self.file_path = Some(path.into());
510 self
511 }
512
513 pub fn with_exporter_type(mut self, exporter_type: OtelExporterType) -> Self {
515 self.exporter_type = Some(exporter_type);
516 self
517 }
518
519 pub fn with_source_name(mut self, source_name: impl Into<String>) -> Self {
523 self.source_name = Some(source_name.into());
524 self
525 }
526
527 pub fn with_capture_content(mut self, capture: bool) -> Self {
531 self.capture_content = Some(capture);
532 self
533 }
534
535 pub fn is_empty(&self) -> bool {
538 self.otlp_endpoint.is_none()
539 && self.otlp_protocol.is_none()
540 && self.file_path.is_none()
541 && self.exporter_type.is_none()
542 && self.source_name.is_none()
543 && self.capture_content.is_none()
544 }
545}
546
547impl Default for ClientOptions {
548 fn default() -> Self {
549 Self {
550 program: CliProgram::Resolve,
551 prefix_args: Vec::new(),
552 working_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
553 env: Vec::new(),
554 env_remove: Vec::new(),
555 extra_args: Vec::new(),
556 transport: Transport::default(),
557 github_token: None,
558 use_logged_in_user: None,
559 log_level: None,
560 session_idle_timeout_seconds: None,
561 on_list_models: None,
562 session_fs: None,
563 on_get_trace_context: None,
564 telemetry: None,
565 base_directory: None,
566 enable_remote_sessions: false,
567 bundled_cli_extract_dir: None,
568 mode: ClientMode::default(),
569 }
570 }
571}
572
573impl ClientOptions {
574 pub fn new() -> Self {
590 Self::default()
591 }
592
593 pub fn with_program(mut self, program: impl Into<CliProgram>) -> Self {
595 self.program = program.into();
596 self
597 }
598
599 pub fn with_prefix_args<I, S>(mut self, args: I) -> Self
601 where
602 I: IntoIterator<Item = S>,
603 S: Into<OsString>,
604 {
605 self.prefix_args = args.into_iter().map(Into::into).collect();
606 self
607 }
608
609 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
611 self.working_directory = cwd.into();
612 self
613 }
614
615 pub fn with_env<I, K, V>(mut self, env: I) -> Self
617 where
618 I: IntoIterator<Item = (K, V)>,
619 K: Into<OsString>,
620 V: Into<OsString>,
621 {
622 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
623 self
624 }
625
626 pub fn with_env_remove<I, S>(mut self, names: I) -> Self
628 where
629 I: IntoIterator<Item = S>,
630 S: Into<OsString>,
631 {
632 self.env_remove = names.into_iter().map(Into::into).collect();
633 self
634 }
635
636 pub fn with_extra_args<I, S>(mut self, args: I) -> Self
638 where
639 I: IntoIterator<Item = S>,
640 S: Into<String>,
641 {
642 self.extra_args = args.into_iter().map(Into::into).collect();
643 self
644 }
645
646 pub fn with_transport(mut self, transport: Transport) -> Self {
648 self.transport = transport;
649 self
650 }
651
652 pub fn with_github_token(mut self, token: impl Into<String>) -> Self {
655 self.github_token = Some(token.into());
656 self
657 }
658
659 pub fn with_use_logged_in_user(mut self, use_logged_in: bool) -> Self {
662 self.use_logged_in_user = Some(use_logged_in);
663 self
664 }
665
666 pub fn with_log_level(mut self, level: LogLevel) -> Self {
668 self.log_level = Some(level);
669 self
670 }
671
672 pub fn with_session_idle_timeout_seconds(mut self, seconds: u64) -> Self {
675 self.session_idle_timeout_seconds = Some(seconds);
676 self
677 }
678
679 pub fn with_list_models_handler<H>(mut self, handler: H) -> Self
682 where
683 H: ListModelsHandler + 'static,
684 {
685 self.on_list_models = Some(Arc::new(handler));
686 self
687 }
688
689 pub fn with_session_fs(mut self, config: SessionFsConfig) -> Self {
691 self.session_fs = Some(config);
692 self
693 }
694
695 pub fn with_trace_context_provider<P>(mut self, provider: P) -> Self
699 where
700 P: TraceContextProvider + 'static,
701 {
702 self.on_get_trace_context = Some(Arc::new(provider));
703 self
704 }
705
706 pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
708 self.telemetry = Some(config);
709 self
710 }
711
712 pub fn with_base_directory(mut self, dir: impl Into<PathBuf>) -> Self {
715 self.base_directory = Some(dir.into());
716 self
717 }
718
719 pub fn with_enable_remote_sessions(mut self, enabled: bool) -> Self {
722 self.enable_remote_sessions = enabled;
723 self
724 }
725
726 pub fn with_bundled_cli_extract_dir(mut self, dir: impl Into<PathBuf>) -> Self {
736 self.bundled_cli_extract_dir = Some(dir.into());
737 self
738 }
739
740 pub fn with_mode(mut self, mode: ClientMode) -> Self {
745 self.mode = mode;
746 self
747 }
748}
749
750fn validate_session_fs_config(cfg: &SessionFsConfig) -> Result<()> {
752 if cfg.initial_cwd.trim().is_empty() {
753 return Err(Error::with_message(
754 ErrorKind::Session(SessionErrorKind::InvalidSessionFsConfig),
755 "invalid SessionFsConfig: initial_cwd must not be empty",
756 ));
757 }
758 if cfg.session_state_path.trim().is_empty() {
759 return Err(Error::with_message(
760 ErrorKind::Session(SessionErrorKind::InvalidSessionFsConfig),
761 "invalid SessionFsConfig: session_state_path must not be empty",
762 ));
763 }
764 Ok(())
765}
766
767fn generate_connection_token() -> String {
774 let mut bytes = [0u8; 16];
775 getrandom::getrandom(&mut bytes)
776 .expect("OS CSPRNG (getrandom) is unavailable; cannot generate connection token");
777 let mut hex = String::with_capacity(32);
778 for byte in bytes {
779 use std::fmt::Write;
780 let _ = write!(hex, "{byte:02x}");
781 }
782 hex
783}
784
785#[derive(Clone)]
790pub struct Client {
791 inner: Arc<ClientInner>,
792}
793
794impl std::fmt::Debug for Client {
795 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
796 f.debug_struct("Client")
797 .field("working_directory", &self.inner.cwd)
798 .field("pid", &self.pid())
799 .finish()
800 }
801}
802
803struct ClientInner {
804 child: parking_lot::Mutex<Option<Child>>,
805 rpc: JsonRpcClient,
806 cwd: PathBuf,
807 request_rx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<JsonRpcRequest>>>,
808 notification_tx: broadcast::Sender<JsonRpcNotification>,
809 router: router::SessionRouter,
810 negotiated_protocol_version: OnceLock<u32>,
811 state: parking_lot::Mutex<ConnectionState>,
812 lifecycle_tx: broadcast::Sender<SessionLifecycleEvent>,
813 on_list_models: Option<Arc<dyn ListModelsHandler>>,
814 models_cache: parking_lot::Mutex<Arc<tokio::sync::OnceCell<Vec<Model>>>>,
815 session_fs_configured: bool,
816 session_fs_sqlite_declared: bool,
817 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
818 effective_connection_token: Option<String>,
823 pub(crate) mode: ClientMode,
826}
827
828impl Client {
829 pub async fn start(options: ClientOptions) -> Result<Self> {
842 let start_time = Instant::now();
843 if options.mode == ClientMode::Empty
844 && options.base_directory.is_none()
845 && options.session_fs.is_none()
846 {
847 return Err(Error::with_message(
848 ErrorKind::InvalidConfig,
849 "ClientMode::Empty requires either `base_directory` or \
850 `session_fs` to be set (no implicit ~/.copilot fallback).",
851 ));
852 }
853 if let Some(cfg) = &options.session_fs {
854 validate_session_fs_config(cfg)?;
855 }
856 if matches!(options.transport, Transport::External { .. }) {
859 if options.github_token.is_some() {
860 return Err(Error::with_message(
861 ErrorKind::InvalidConfig,
862 "invalid client configuration: github_token cannot be used with \
863 Transport::External (external server manages its own auth)",
864 ));
865 }
866 if options.use_logged_in_user == Some(true) {
867 return Err(Error::with_message(
868 ErrorKind::InvalidConfig,
869 "invalid client configuration: use_logged_in_user cannot be used with \
870 Transport::External (external server manages its own auth)",
871 ));
872 }
873 }
874 match &options.transport {
878 Transport::Tcp {
879 connection_token: Some(t),
880 ..
881 }
882 | Transport::External {
883 connection_token: Some(t),
884 ..
885 } if t.is_empty() => {
886 return Err(Error::with_message(
887 ErrorKind::InvalidConfig,
888 "invalid client configuration: connection_token must be a non-empty string",
889 ));
890 }
891 _ => {}
892 }
893 let mut options = options;
898 let effective_connection_token: Option<String> = match &mut options.transport {
899 Transport::Stdio => None,
900 Transport::Tcp {
901 connection_token, ..
902 } => Some(
903 connection_token
904 .get_or_insert_with(generate_connection_token)
905 .clone(),
906 ),
907 Transport::External {
908 connection_token, ..
909 } => connection_token.clone(),
910 };
911 let session_fs_config = options.session_fs.clone();
912 let session_fs_sqlite_declared = session_fs_config
913 .as_ref()
914 .and_then(|c| c.capabilities.as_ref())
915 .is_some_and(|caps| caps.sqlite);
916 let program = match &options.program {
917 CliProgram::Path(path) => {
918 info!(path = %path.display(), "using explicit copilot CLI path");
919 path.clone()
920 }
921 CliProgram::Resolve => {
922 let resolved = resolve::copilot_binary_with_extract_dir(
923 options.bundled_cli_extract_dir.as_deref(),
924 )?;
925 info!(path = %resolved.display(), "resolved copilot CLI");
926 #[cfg(windows)]
927 {
928 if let Some(ext) = resolved.extension().and_then(|e| e.to_str()).filter(|ext| {
929 ext.eq_ignore_ascii_case("cmd") || ext.eq_ignore_ascii_case("bat")
930 }) {
931 warn!(
932 path = %resolved.display(),
933 ext = %ext,
934 "resolved copilot CLI is a .cmd/.bat wrapper; \
935 this may cause console window flashes on Windows"
936 );
937 }
938 }
939 resolved
940 }
941 };
942
943 let client = match options.transport {
944 Transport::External {
945 ref host,
946 port,
947 connection_token: _,
948 } => {
949 info!(host = %host, port = %port, "connecting to external CLI server");
950 let connect_start = Instant::now();
951 let stream = TcpStream::connect((host.as_str(), port)).await?;
952 debug!(
953 elapsed_ms = connect_start.elapsed().as_millis(),
954 host = %host,
955 port,
956 "Client::start TCP connect complete"
957 );
958 let (reader, writer) = tokio::io::split(stream);
959 Self::from_transport(
960 reader,
961 writer,
962 None,
963 options.working_directory,
964 options.on_list_models,
965 session_fs_config.is_some(),
966 session_fs_sqlite_declared,
967 options.on_get_trace_context,
968 effective_connection_token.clone(),
969 options.mode,
970 )?
971 }
972 Transport::Tcp {
973 port,
974 connection_token: _,
975 } => {
976 let (mut child, actual_port) = Self::spawn_tcp(&program, &options, port).await?;
977 let connect_start = Instant::now();
978 let stream = TcpStream::connect(("127.0.0.1", actual_port)).await?;
979 debug!(
980 elapsed_ms = connect_start.elapsed().as_millis(),
981 port = actual_port,
982 "Client::start TCP connect complete"
983 );
984 let (reader, writer) = tokio::io::split(stream);
985 Self::drain_stderr(&mut child);
986 Self::from_transport(
987 reader,
988 writer,
989 Some(child),
990 options.working_directory,
991 options.on_list_models,
992 session_fs_config.is_some(),
993 session_fs_sqlite_declared,
994 options.on_get_trace_context,
995 effective_connection_token.clone(),
996 options.mode,
997 )?
998 }
999 Transport::Stdio => {
1000 let mut child = Self::spawn_stdio(&program, &options)?;
1001 let stdin = child.stdin.take().expect("stdin is piped");
1002 let stdout = child.stdout.take().expect("stdout is piped");
1003 Self::drain_stderr(&mut child);
1004 Self::from_transport(
1005 stdout,
1006 stdin,
1007 Some(child),
1008 options.working_directory,
1009 options.on_list_models,
1010 session_fs_config.is_some(),
1011 session_fs_sqlite_declared,
1012 options.on_get_trace_context,
1013 effective_connection_token.clone(),
1014 options.mode,
1015 )?
1016 }
1017 };
1018
1019 debug!(
1020 elapsed_ms = start_time.elapsed().as_millis(),
1021 "Client::start transport setup complete"
1022 );
1023 client.verify_protocol_version().await?;
1024 debug!(
1025 elapsed_ms = start_time.elapsed().as_millis(),
1026 "Client::start protocol verification complete"
1027 );
1028 if let Some(cfg) = session_fs_config {
1029 let session_fs_start = Instant::now();
1030 let capabilities = cfg.capabilities.as_ref().map(|c| {
1031 crate::generated::api_types::SessionFsSetProviderCapabilities {
1032 sqlite: Some(c.sqlite),
1033 }
1034 });
1035 let request = crate::generated::api_types::SessionFsSetProviderRequest {
1036 capabilities,
1037 conventions: cfg.conventions.into_wire(),
1038 initial_cwd: cfg.initial_cwd,
1039 session_state_path: cfg.session_state_path,
1040 };
1041 client.rpc().session_fs().set_provider(request).await?;
1042 debug!(
1043 elapsed_ms = session_fs_start.elapsed().as_millis(),
1044 "Client::start session filesystem setup complete"
1045 );
1046 }
1047 debug!(
1048 elapsed_ms = start_time.elapsed().as_millis(),
1049 "Client::start complete"
1050 );
1051 Ok(client)
1052 }
1053
1054 pub fn from_streams(
1058 reader: impl AsyncRead + Unpin + Send + 'static,
1059 writer: impl AsyncWrite + Unpin + Send + 'static,
1060 cwd: PathBuf,
1061 ) -> Result<Self> {
1062 Self::from_transport(
1063 reader,
1064 writer,
1065 None,
1066 cwd,
1067 None,
1068 false,
1069 false,
1070 None,
1071 None,
1072 ClientMode::default(),
1073 )
1074 }
1075
1076 #[cfg(any(test, feature = "test-support"))]
1084 pub fn from_streams_with_trace_provider(
1085 reader: impl AsyncRead + Unpin + Send + 'static,
1086 writer: impl AsyncWrite + Unpin + Send + 'static,
1087 cwd: PathBuf,
1088 provider: Arc<dyn TraceContextProvider>,
1089 ) -> Result<Self> {
1090 Self::from_transport(
1091 reader,
1092 writer,
1093 None,
1094 cwd,
1095 None,
1096 false,
1097 false,
1098 Some(provider),
1099 None,
1100 ClientMode::default(),
1101 )
1102 }
1103
1104 #[cfg(any(test, feature = "test-support"))]
1108 pub fn from_streams_with_connection_token(
1109 reader: impl AsyncRead + Unpin + Send + 'static,
1110 writer: impl AsyncWrite + Unpin + Send + 'static,
1111 cwd: PathBuf,
1112 token: Option<String>,
1113 ) -> Result<Self> {
1114 Self::from_transport(
1115 reader,
1116 writer,
1117 None,
1118 cwd,
1119 None,
1120 false,
1121 false,
1122 None,
1123 token,
1124 ClientMode::default(),
1125 )
1126 }
1127
1128 #[cfg(any(test, feature = "test-support"))]
1134 pub fn generate_connection_token_for_test() -> String {
1135 generate_connection_token()
1136 }
1137
1138 #[allow(clippy::too_many_arguments)]
1139 fn from_transport(
1140 reader: impl AsyncRead + Unpin + Send + 'static,
1141 writer: impl AsyncWrite + Unpin + Send + 'static,
1142 child: Option<Child>,
1143 cwd: PathBuf,
1144 on_list_models: Option<Arc<dyn ListModelsHandler>>,
1145 session_fs_configured: bool,
1146 session_fs_sqlite_declared: bool,
1147 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
1148 effective_connection_token: Option<String>,
1149 mode: ClientMode,
1150 ) -> Result<Self> {
1151 let setup_start = Instant::now();
1152 let (request_tx, request_rx) = mpsc::unbounded_channel::<JsonRpcRequest>();
1153 let (notification_broadcast_tx, _) = broadcast::channel::<JsonRpcNotification>(1024);
1154 let rpc = JsonRpcClient::new(
1155 writer,
1156 reader,
1157 notification_broadcast_tx.clone(),
1158 request_tx,
1159 );
1160
1161 let pid = child.as_ref().and_then(|c| c.id());
1162 info!(pid = ?pid, "copilot CLI client ready");
1163
1164 let client = Self {
1165 inner: Arc::new(ClientInner {
1166 child: parking_lot::Mutex::new(child),
1167 rpc,
1168 cwd,
1169 request_rx: parking_lot::Mutex::new(Some(request_rx)),
1170 notification_tx: notification_broadcast_tx,
1171 router: router::SessionRouter::new(),
1172 negotiated_protocol_version: OnceLock::new(),
1173 state: parking_lot::Mutex::new(ConnectionState::Connected),
1174 lifecycle_tx: broadcast::channel(256).0,
1175 on_list_models,
1176 models_cache: parking_lot::Mutex::new(Arc::new(tokio::sync::OnceCell::new())),
1177 session_fs_configured,
1178 session_fs_sqlite_declared,
1179 on_get_trace_context,
1180 effective_connection_token,
1181 mode,
1182 }),
1183 };
1184 client.spawn_lifecycle_dispatcher();
1185 debug!(
1186 elapsed_ms = setup_start.elapsed().as_millis(),
1187 pid = ?pid,
1188 "Client::from_transport setup complete"
1189 );
1190 Ok(client)
1191 }
1192
1193 fn spawn_lifecycle_dispatcher(&self) {
1197 let inner = Arc::clone(&self.inner);
1198 let mut notif_rx = inner.notification_tx.subscribe();
1199 tokio::spawn(async move {
1200 loop {
1201 match notif_rx.recv().await {
1202 Ok(notification) => {
1203 if notification.method != "session.lifecycle" {
1204 continue;
1205 }
1206 let Some(params) = notification.params.as_ref() else {
1207 continue;
1208 };
1209 let event: SessionLifecycleEvent =
1210 match serde_json::from_value(params.clone()) {
1211 Ok(e) => e,
1212 Err(e) => {
1213 warn!(
1214 error = %e,
1215 "failed to deserialize session.lifecycle notification"
1216 );
1217 continue;
1218 }
1219 };
1220 let _ = inner.lifecycle_tx.send(event);
1223 }
1224 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1225 warn!(missed = n, "lifecycle dispatcher lagged");
1226 }
1227 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1228 }
1229 }
1230 });
1231 }
1232
1233 fn build_command(program: &Path, options: &ClientOptions) -> Command {
1234 let mut command = Command::new(program);
1235 for arg in &options.prefix_args {
1236 command.arg(arg);
1237 }
1238 if let Some(token) = &options.github_token {
1241 command.env("COPILOT_SDK_AUTH_TOKEN", token);
1242 }
1243 if let Some(telemetry) = &options.telemetry {
1246 command.env("COPILOT_OTEL_ENABLED", "true");
1247 if let Some(endpoint) = &telemetry.otlp_endpoint {
1248 command.env("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
1249 }
1250 if let Some(protocol) = telemetry.otlp_protocol {
1251 command.env("OTEL_EXPORTER_OTLP_PROTOCOL", protocol.as_str());
1252 }
1253 if let Some(path) = &telemetry.file_path {
1254 command.env("COPILOT_OTEL_FILE_EXPORTER_PATH", path);
1255 }
1256 if let Some(exporter) = telemetry.exporter_type {
1257 command.env("COPILOT_OTEL_EXPORTER_TYPE", exporter.as_str());
1258 }
1259 if let Some(source) = &telemetry.source_name {
1260 command.env("COPILOT_OTEL_SOURCE_NAME", source);
1261 }
1262 if let Some(capture) = telemetry.capture_content {
1263 command.env(
1264 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
1265 if capture { "true" } else { "false" },
1266 );
1267 }
1268 }
1269 if let Some(dir) = &options.base_directory {
1270 command.env("COPILOT_HOME", dir);
1271 }
1272 if options.mode == ClientMode::Empty {
1275 command.env("COPILOT_DISABLE_KEYTAR", "1");
1276 }
1277 if let Transport::Tcp {
1278 connection_token: Some(token),
1279 ..
1280 } = &options.transport
1281 {
1282 command.env("COPILOT_CONNECTION_TOKEN", token);
1283 }
1284 for (key, value) in &options.env {
1285 command.env(key, value);
1286 }
1287 for key in &options.env_remove {
1288 command.env_remove(key);
1289 }
1290 command
1291 .current_dir(&options.working_directory)
1292 .stdout(Stdio::piped())
1293 .stderr(Stdio::piped());
1294
1295 #[cfg(windows)]
1296 {
1297 use std::os::windows::process::CommandExt;
1298 const CREATE_NO_WINDOW: u32 = 0x08000000;
1299 command.as_std_mut().creation_flags(CREATE_NO_WINDOW);
1300 }
1301
1302 command
1303 }
1304
1305 fn auth_args(options: &ClientOptions) -> Vec<&'static str> {
1313 let mut args: Vec<&'static str> = Vec::new();
1314 if options.github_token.is_some() {
1315 args.push("--auth-token-env");
1316 args.push("COPILOT_SDK_AUTH_TOKEN");
1317 }
1318 let use_logged_in = options
1319 .use_logged_in_user
1320 .unwrap_or(options.github_token.is_none());
1321 if !use_logged_in {
1322 args.push("--no-auto-login");
1323 }
1324 args
1325 }
1326
1327 fn session_idle_timeout_args(options: &ClientOptions) -> Vec<String> {
1331 match options.session_idle_timeout_seconds {
1332 Some(secs) if secs > 0 => {
1333 vec!["--session-idle-timeout".to_string(), secs.to_string()]
1334 }
1335 _ => Vec::new(),
1336 }
1337 }
1338
1339 fn remote_args(options: &ClientOptions) -> Vec<String> {
1340 if options.enable_remote_sessions {
1341 vec!["--remote".to_string()]
1342 } else {
1343 Vec::new()
1344 }
1345 }
1346
1347 fn log_level_args(options: &ClientOptions) -> Vec<&'static str> {
1348 match options.log_level {
1349 Some(level) => vec!["--log-level", level.as_str()],
1350 None => Vec::new(),
1351 }
1352 }
1353
1354 fn spawn_stdio(program: &Path, options: &ClientOptions) -> Result<Child> {
1355 info!(cwd = ?options.working_directory, program = %program.display(), "spawning copilot CLI (stdio)");
1356 let mut command = Self::build_command(program, options);
1357 command
1358 .args(["--server", "--stdio", "--no-auto-update"])
1359 .args(Self::log_level_args(options))
1360 .args(Self::auth_args(options))
1361 .args(Self::session_idle_timeout_args(options))
1362 .args(Self::remote_args(options))
1363 .args(&options.extra_args)
1364 .stdin(Stdio::piped());
1365 let spawn_start = Instant::now();
1366 let child = command.spawn()?;
1367 debug!(
1368 elapsed_ms = spawn_start.elapsed().as_millis(),
1369 "Client::spawn_stdio subprocess spawned"
1370 );
1371 Ok(child)
1372 }
1373
1374 async fn spawn_tcp(program: &Path, options: &ClientOptions, port: u16) -> Result<(Child, u16)> {
1375 info!(cwd = ?options.working_directory, program = %program.display(), port = %port, "spawning copilot CLI (tcp)");
1376 let mut command = Self::build_command(program, options);
1377 command
1378 .args(["--server", "--port", &port.to_string(), "--no-auto-update"])
1379 .args(Self::log_level_args(options))
1380 .args(Self::auth_args(options))
1381 .args(Self::session_idle_timeout_args(options))
1382 .args(Self::remote_args(options))
1383 .args(&options.extra_args)
1384 .stdin(Stdio::null());
1385 let spawn_start = Instant::now();
1386 let mut child = command.spawn()?;
1387 debug!(
1388 elapsed_ms = spawn_start.elapsed().as_millis(),
1389 "Client::spawn_tcp subprocess spawned"
1390 );
1391 let stdout = child.stdout.take().expect("stdout is piped");
1392
1393 let (port_tx, port_rx) = oneshot::channel::<u16>();
1394 let span = tracing::error_span!("copilot_cli_port_scan");
1395 tokio::spawn(
1396 async move {
1397 let port_re = regex::Regex::new(r"listening on port (\d+)").expect("valid regex");
1399 let mut lines = BufReader::new(stdout).lines();
1400 let mut port_tx = Some(port_tx);
1401 while let Ok(Some(line)) = lines.next_line().await {
1402 debug!(line = %line, "CLI stdout");
1403 if let Some(tx) = port_tx.take() {
1404 if let Some(caps) = port_re.captures(&line)
1405 && let Some(p) =
1406 caps.get(1).and_then(|m| m.as_str().parse::<u16>().ok())
1407 {
1408 let _ = tx.send(p);
1409 continue;
1410 }
1411 port_tx = Some(tx);
1413 }
1414 }
1415 }
1416 .instrument(span),
1417 );
1418
1419 let port_wait_start = Instant::now();
1420 let actual_port = tokio::time::timeout(std::time::Duration::from_secs(10), port_rx)
1421 .await
1422 .map_err(|_| Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupTimeout)))?
1423 .map_err(|_| Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupFailed)))?;
1424
1425 debug!(
1426 elapsed_ms = port_wait_start.elapsed().as_millis(),
1427 port = actual_port,
1428 "Client::spawn_tcp TCP port wait complete"
1429 );
1430 info!(port = %actual_port, "CLI server listening");
1431 Ok((child, actual_port))
1432 }
1433
1434 fn drain_stderr(child: &mut Child) {
1435 if let Some(stderr) = child.stderr.take() {
1436 let span = tracing::error_span!("copilot_cli");
1437 tokio::spawn(
1438 async move {
1439 let mut reader = BufReader::new(stderr).lines();
1440 while let Ok(Some(line)) = reader.next_line().await {
1441 warn!(line = %line, "CLI stderr");
1442 }
1443 }
1444 .instrument(span),
1445 );
1446 }
1447 }
1448
1449 pub fn cwd(&self) -> &PathBuf {
1451 &self.inner.cwd
1452 }
1453
1454 pub fn mode(&self) -> ClientMode {
1456 self.inner.mode
1457 }
1458
1459 pub fn rpc(&self) -> crate::generated::rpc::ClientRpc<'_> {
1470 crate::generated::rpc::ClientRpc { client: self }
1471 }
1472
1473 #[allow(dead_code, reason = "convenience for future internal use")]
1475 pub(crate) async fn send_request(
1476 &self,
1477 method: &str,
1478 params: Option<serde_json::Value>,
1479 ) -> Result<JsonRpcResponse> {
1480 self.inner.rpc.send_request(method, params).await
1481 }
1482
1483 pub async fn call(
1503 &self,
1504 method: &str,
1505 params: Option<serde_json::Value>,
1506 ) -> Result<serde_json::Value> {
1507 self.call_with_inline_callback(method, params, None).await
1508 }
1509
1510 pub(crate) async fn call_with_inline_callback(
1525 &self,
1526 method: &str,
1527 params: Option<serde_json::Value>,
1528 inline_callback: Option<crate::jsonrpc::InlineResponseCallback>,
1529 ) -> Result<serde_json::Value> {
1530 let session_id: Option<SessionId> = params
1531 .as_ref()
1532 .and_then(|p| p.get("sessionId"))
1533 .and_then(|v| v.as_str())
1534 .map(SessionId::from);
1535 let response = self
1536 .inner
1537 .rpc
1538 .send_request_with_inline_callback(method, params, inline_callback)
1539 .await?;
1540 if let Some(err) = response.error {
1541 if err.message.contains("Session not found") {
1542 return Err(ErrorKind::Session(SessionErrorKind::NotFound(
1543 session_id.unwrap_or_else(|| "unknown".into()),
1544 ))
1545 .into());
1546 }
1547 return Err(Error::with_message(
1548 ErrorKind::Rpc { code: err.code },
1549 err.message,
1550 ));
1551 }
1552 Ok(response.result.unwrap_or(serde_json::Value::Null))
1553 }
1554
1555 pub(crate) async fn send_response(&self, response: &JsonRpcResponse) -> Result<()> {
1557 self.inner.rpc.write(response).await
1558 }
1559
1560 #[expect(dead_code, reason = "reserved for future pub(crate) use")]
1564 pub(crate) fn take_request_rx(&self) -> Option<mpsc::UnboundedReceiver<JsonRpcRequest>> {
1565 self.inner.request_rx.lock().take()
1566 }
1567
1568 pub(crate) fn register_session(
1576 &self,
1577 session_id: &SessionId,
1578 ) -> crate::router::SessionChannels {
1579 self.inner
1580 .router
1581 .ensure_started(&self.inner.notification_tx, &self.inner.request_rx);
1582 self.inner.router.register(session_id)
1583 }
1584
1585 pub(crate) fn unregister_session(&self, session_id: &SessionId) {
1587 self.inner.router.unregister(session_id);
1588 }
1589
1590 pub fn protocol_version(&self) -> Option<u32> {
1597 self.inner.negotiated_protocol_version.get().copied()
1598 }
1599
1600 pub async fn verify_protocol_version(&self) -> Result<()> {
1624 let handshake_start = Instant::now();
1625 let mut used_fallback_ping = false;
1626 let server_version = match self.connect_handshake().await {
1630 Ok(v) => v,
1631 Err(ref e) if e.rpc_code() == Some(error_codes::METHOD_NOT_FOUND) => {
1632 used_fallback_ping = true;
1633 self.ping(None).await?.protocol_version
1634 }
1635 Err(e) => return Err(e),
1636 };
1637
1638 match server_version {
1639 None => {
1640 warn!("CLI server did not report protocolVersion; skipping version check");
1641 }
1642 Some(v) if !(MIN_PROTOCOL_VERSION..=SDK_PROTOCOL_VERSION).contains(&v) => {
1643 return Err(ErrorKind::Protocol(ProtocolErrorKind::VersionMismatch {
1644 server: v,
1645 min: MIN_PROTOCOL_VERSION,
1646 max: SDK_PROTOCOL_VERSION,
1647 })
1648 .into());
1649 }
1650 Some(v) => {
1651 if let Some(&existing) = self.inner.negotiated_protocol_version.get() {
1652 if existing != v {
1653 return Err(ErrorKind::Protocol(ProtocolErrorKind::VersionChanged {
1654 previous: existing,
1655 current: v,
1656 })
1657 .into());
1658 }
1659 } else {
1660 let _ = self.inner.negotiated_protocol_version.set(v);
1661 }
1662 }
1663 }
1664
1665 debug!(
1666 elapsed_ms = handshake_start.elapsed().as_millis(),
1667 protocol_version = ?server_version,
1668 used_fallback_ping,
1669 "Client::verify_protocol_version protocol handshake complete"
1670 );
1671 Ok(())
1672 }
1673
1674 async fn connect_handshake(&self) -> Result<Option<u32>> {
1681 let result = self
1682 .rpc()
1683 .connect(crate::generated::api_types::ConnectRequest {
1684 token: self.inner.effective_connection_token.clone(),
1685 })
1686 .await?;
1687 Ok(u32::try_from(result.protocol_version).ok())
1688 }
1689
1690 pub async fn ping(&self, message: Option<&str>) -> Result<crate::types::PingResponse> {
1698 let params = match message {
1699 Some(m) => serde_json::json!({ "message": m }),
1700 None => serde_json::json!({}),
1701 };
1702 let value = self
1703 .call(generated::api_types::rpc_methods::PING, Some(params))
1704 .await?;
1705 Ok(serde_json::from_value(value)?)
1706 }
1707
1708 pub async fn list_sessions(
1711 &self,
1712 filter: Option<SessionListFilter>,
1713 ) -> Result<Vec<SessionMetadata>> {
1714 let params = match filter {
1715 Some(f) => serde_json::json!({ "filter": f }),
1716 None => serde_json::json!({}),
1717 };
1718 let result = self.call("session.list", Some(params)).await?;
1719 let response: ListSessionsResponse = serde_json::from_value(result)?;
1720 Ok(response.sessions)
1721 }
1722
1723 pub async fn get_session_metadata(
1741 &self,
1742 session_id: &SessionId,
1743 ) -> Result<Option<SessionMetadata>> {
1744 let result = self
1745 .call(
1746 "session.getMetadata",
1747 Some(serde_json::json!({ "sessionId": session_id })),
1748 )
1749 .await?;
1750 let response: GetSessionMetadataResponse = serde_json::from_value(result)?;
1751 Ok(response.session)
1752 }
1753
1754 pub async fn delete_session(&self, session_id: &SessionId) -> Result<()> {
1756 self.call(
1757 "session.delete",
1758 Some(serde_json::json!({ "sessionId": session_id })),
1759 )
1760 .await?;
1761 Ok(())
1762 }
1763
1764 pub async fn get_last_session_id(&self) -> Result<Option<SessionId>> {
1780 let result = self
1781 .call("session.getLastId", Some(serde_json::json!({})))
1782 .await?;
1783 let response: GetLastSessionIdResponse = serde_json::from_value(result)?;
1784 Ok(response.session_id)
1785 }
1786
1787 pub async fn get_foreground_session_id(&self) -> Result<Option<SessionId>> {
1792 let result = self
1793 .call("session.getForeground", Some(serde_json::json!({})))
1794 .await?;
1795 let response: GetForegroundSessionResponse = serde_json::from_value(result)?;
1796 Ok(response.session_id)
1797 }
1798
1799 pub async fn set_foreground_session_id(&self, session_id: &SessionId) -> Result<()> {
1804 self.call(
1805 "session.setForeground",
1806 Some(serde_json::json!({ "sessionId": session_id })),
1807 )
1808 .await?;
1809 Ok(())
1810 }
1811
1812 pub async fn get_status(&self) -> Result<GetStatusResponse> {
1814 let result = self.call("status.get", Some(serde_json::json!({}))).await?;
1815 Ok(serde_json::from_value(result)?)
1816 }
1817
1818 pub async fn get_auth_status(&self) -> Result<GetAuthStatusResponse> {
1820 let result = self
1821 .call("auth.getStatus", Some(serde_json::json!({})))
1822 .await?;
1823 Ok(serde_json::from_value(result)?)
1824 }
1825
1826 pub async fn list_models(&self) -> Result<Vec<Model>> {
1831 let cache = self.inner.models_cache.lock().clone();
1832 let models = cache
1833 .get_or_try_init(|| async {
1834 if let Some(handler) = &self.inner.on_list_models {
1835 handler.list_models().await
1836 } else {
1837 Ok(self.rpc().models().list().await?.models)
1838 }
1839 })
1840 .await?;
1841 Ok(models.clone())
1842 }
1843
1844 pub(crate) async fn resolve_trace_context(&self) -> TraceContext {
1847 if let Some(provider) = &self.inner.on_get_trace_context {
1848 provider.get_trace_context().await
1849 } else {
1850 TraceContext::default()
1851 }
1852 }
1853
1854 pub fn pid(&self) -> Option<u32> {
1856 self.inner.child.lock().as_ref().and_then(|c| c.id())
1857 }
1858
1859 pub async fn stop(&self) -> std::result::Result<(), StopErrors> {
1886 let pid = self.pid();
1887 info!(pid = ?pid, "stopping CLI process");
1888 let mut errors: Vec<Error> = Vec::new();
1889
1890 for session_id in self.inner.router.session_ids() {
1893 match self
1894 .call(
1895 "session.destroy",
1896 Some(serde_json::json!({ "sessionId": session_id })),
1897 )
1898 .await
1899 {
1900 Ok(_) => {}
1901 Err(e) => {
1902 warn!(
1903 session_id = %session_id,
1904 error = %e,
1905 "session.destroy failed during Client::stop",
1906 );
1907 errors.push(e);
1908 }
1909 }
1910 self.inner.router.unregister(&session_id);
1911 }
1912
1913 let should_shutdown_runtime = self.inner.child.lock().is_some();
1914 let mut runtime_shutdown_completed = false;
1915 if should_shutdown_runtime {
1916 let runtime_shutdown_start = Instant::now();
1917 match tokio::time::timeout(RUNTIME_SHUTDOWN_TIMEOUT, self.rpc().runtime().shutdown())
1918 .await
1919 {
1920 Ok(Ok(())) => {
1921 runtime_shutdown_completed = true;
1922 debug!(
1923 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
1924 "Client::stop runtime shutdown complete"
1925 );
1926 }
1927 Ok(Err(e)) => {
1928 warn!(
1929 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
1930 error = %e,
1931 "runtime.shutdown failed during Client::stop",
1932 );
1933 errors.push(e);
1934 }
1935 Err(_) => {
1936 let e = std::io::Error::new(
1937 std::io::ErrorKind::TimedOut,
1938 "runtime.shutdown timed out during Client::stop",
1939 );
1940 warn!(
1941 elapsed_ms = runtime_shutdown_start.elapsed().as_millis(),
1942 timeout = ?RUNTIME_SHUTDOWN_TIMEOUT,
1943 error = %e,
1944 "runtime.shutdown timed out during Client::stop",
1945 );
1946 errors.push(e.into());
1947 }
1948 }
1949 }
1950
1951 let child = self.inner.child.lock().take();
1952 *self.inner.state.lock() = ConnectionState::Disconnected;
1953 *self.inner.models_cache.lock() = Arc::new(tokio::sync::OnceCell::new());
1954 if let Some(mut child) = child {
1955 match child.try_wait() {
1956 Ok(Some(_status)) => {}
1957 Ok(None) => {
1958 if runtime_shutdown_completed {
1959 match tokio::time::timeout(RUNTIME_SHUTDOWN_TIMEOUT, child.wait()).await {
1960 Ok(Ok(_status)) => {}
1961 Ok(Err(e)) => errors.push(e.into()),
1962 Err(_) => {
1963 if let Err(e) = child.kill().await {
1964 errors.push(e.into());
1965 }
1966 }
1967 }
1968 } else if let Err(e) = child.kill().await {
1969 errors.push(e.into());
1970 }
1971 }
1972 Err(e) => errors.push(e.into()),
1973 }
1974 }
1975
1976 info!(pid = ?pid, errors = errors.len(), "CLI process stopped");
1977 if errors.is_empty() {
1978 Ok(())
1979 } else {
1980 Err(StopErrors(errors))
1981 }
1982 }
1983
1984 pub fn force_stop(&self) {
2014 let pid = self.pid();
2015 info!(pid = ?pid, "force-stopping CLI process");
2016 if let Some(mut child) = self.inner.child.lock().take()
2017 && let Err(e) = child.start_kill()
2018 {
2019 error!(pid = ?pid, error = %e, "failed to send kill signal");
2020 }
2021 self.inner.rpc.force_close();
2022 self.inner.router.clear();
2025 *self.inner.state.lock() = ConnectionState::Disconnected;
2026 *self.inner.models_cache.lock() = Arc::new(tokio::sync::OnceCell::new());
2027 }
2028
2029 pub fn subscribe_lifecycle(&self) -> LifecycleSubscription {
2064 LifecycleSubscription::new(self.inner.lifecycle_tx.subscribe())
2065 }
2066}
2067
2068impl Drop for ClientInner {
2069 fn drop(&mut self) {
2070 if let Some(ref mut child) = *self.child.lock() {
2071 let pid = child.id();
2072 if let Err(e) = child.start_kill() {
2073 error!(pid = ?pid, error = %e, "failed to kill CLI process on drop");
2074 } else {
2075 info!(pid = ?pid, "kill signal sent for CLI process on drop");
2076 }
2077 }
2078 }
2079}
2080
2081#[cfg(test)]
2082mod tests {
2083 use super::*;
2084
2085 #[test]
2086 fn is_transport_failure_matches_request_cancelled() {
2087 let err = Error::from(ErrorKind::Protocol(ProtocolErrorKind::RequestCancelled));
2088 assert!(err.is_transport_failure());
2089 }
2090
2091 #[test]
2092 fn is_transport_failure_matches_io_error() {
2093 let err = Error::from(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone"));
2094 assert!(err.is_transport_failure());
2095 }
2096
2097 #[test]
2098 fn is_transport_failure_rejects_rpc_error() {
2099 let err = Error::with_message(ErrorKind::Rpc { code: -1 }, "bad");
2100 assert!(!err.is_transport_failure());
2101 }
2102
2103 #[test]
2104 fn is_transport_failure_rejects_session_error() {
2105 let err = Error::from(ErrorKind::Session(SessionErrorKind::NotFound("s1".into())));
2106 assert!(!err.is_transport_failure());
2107 }
2108
2109 #[test]
2110 fn client_options_builder_composes() {
2111 let opts = ClientOptions::new()
2112 .with_program(CliProgram::Path(PathBuf::from("/usr/local/bin/copilot")))
2113 .with_prefix_args(["node"])
2114 .with_cwd(PathBuf::from("/tmp"))
2115 .with_env([("KEY", "value")])
2116 .with_env_remove(["UNWANTED"])
2117 .with_extra_args(["--quiet"])
2118 .with_github_token("ghp_test")
2119 .with_use_logged_in_user(false)
2120 .with_log_level(LogLevel::Debug)
2121 .with_session_idle_timeout_seconds(120)
2122 .with_enable_remote_sessions(true);
2123 assert!(matches!(opts.program, CliProgram::Path(_)));
2124 assert_eq!(opts.prefix_args, vec![std::ffi::OsString::from("node")]);
2125 assert_eq!(opts.working_directory, PathBuf::from("/tmp"));
2126 assert_eq!(
2127 opts.env,
2128 vec![(
2129 std::ffi::OsString::from("KEY"),
2130 std::ffi::OsString::from("value")
2131 )]
2132 );
2133 assert_eq!(opts.env_remove, vec![std::ffi::OsString::from("UNWANTED")]);
2134 assert_eq!(opts.extra_args, vec!["--quiet".to_string()]);
2135 assert_eq!(opts.github_token.as_deref(), Some("ghp_test"));
2136 assert_eq!(opts.use_logged_in_user, Some(false));
2137 assert!(matches!(opts.log_level, Some(LogLevel::Debug)));
2138 assert_eq!(opts.session_idle_timeout_seconds, Some(120));
2139 assert!(opts.enable_remote_sessions);
2140 }
2141
2142 #[test]
2143 fn is_transport_failure_rejects_other_protocol_errors() {
2144 let err = Error::from(ErrorKind::Protocol(ProtocolErrorKind::CliStartupTimeout));
2145 assert!(!err.is_transport_failure());
2146 }
2147
2148 #[test]
2149 fn build_command_lets_env_remove_strip_injected_token() {
2150 let opts = ClientOptions {
2151 github_token: Some("secret".to_string()),
2152 env_remove: vec![std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN")],
2153 ..Default::default()
2154 };
2155 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2156 let action = cmd
2158 .as_std()
2159 .get_envs()
2160 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2161 .map(|(_, v)| v);
2162 assert_eq!(
2163 action,
2164 Some(None),
2165 "env_remove should win over github_token"
2166 );
2167 }
2168
2169 #[test]
2170 fn build_command_lets_env_override_injected_token() {
2171 let opts = ClientOptions {
2172 github_token: Some("from-options".to_string()),
2173 env: vec![(
2174 std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN"),
2175 std::ffi::OsString::from("from-env"),
2176 )],
2177 ..Default::default()
2178 };
2179 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2180 let value = cmd
2181 .as_std()
2182 .get_envs()
2183 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2184 .and_then(|(_, v)| v);
2185 assert_eq!(value, Some(std::ffi::OsStr::new("from-env")));
2186 }
2187
2188 #[test]
2189 fn build_command_injects_github_token_by_default() {
2190 let opts = ClientOptions {
2191 github_token: Some("just-the-token".to_string()),
2192 ..Default::default()
2193 };
2194 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2195 let value = cmd
2196 .as_std()
2197 .get_envs()
2198 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
2199 .and_then(|(_, v)| v);
2200 assert_eq!(value, Some(std::ffi::OsStr::new("just-the-token")));
2201 }
2202
2203 fn env_value<'a>(cmd: &'a tokio::process::Command, key: &str) -> Option<&'a std::ffi::OsStr> {
2204 cmd.as_std()
2205 .get_envs()
2206 .find(|(k, _)| *k == std::ffi::OsStr::new(key))
2207 .and_then(|(_, v)| v)
2208 }
2209
2210 #[test]
2211 fn telemetry_config_builder_composes() {
2212 let cfg = TelemetryConfig::new()
2213 .with_otlp_endpoint("http://collector:4318")
2214 .with_otlp_protocol(OtlpHttpProtocol::HttpProtobuf)
2215 .with_file_path(PathBuf::from("/var/log/copilot.jsonl"))
2216 .with_exporter_type(OtelExporterType::OtlpHttp)
2217 .with_source_name("my-app")
2218 .with_capture_content(true);
2219
2220 assert_eq!(cfg.otlp_endpoint.as_deref(), Some("http://collector:4318"));
2221 assert_eq!(cfg.otlp_protocol, Some(OtlpHttpProtocol::HttpProtobuf));
2222 assert_eq!(
2223 cfg.file_path.as_deref(),
2224 Some(Path::new("/var/log/copilot.jsonl")),
2225 );
2226 assert_eq!(cfg.exporter_type, Some(OtelExporterType::OtlpHttp));
2227 assert_eq!(cfg.source_name.as_deref(), Some("my-app"));
2228 assert_eq!(cfg.capture_content, Some(true));
2229 assert!(!cfg.is_empty());
2230 assert!(TelemetryConfig::new().is_empty());
2231 }
2232
2233 #[test]
2234 fn otlp_http_protocol_serde_matches_env_value() {
2235 for (protocol, wire) in [
2236 (OtlpHttpProtocol::HttpJson, "http/json"),
2237 (OtlpHttpProtocol::HttpProtobuf, "http/protobuf"),
2238 ] {
2239 assert_eq!(protocol.as_str(), wire);
2240
2241 let serialized = serde_json::to_string(&protocol).unwrap();
2242 assert_eq!(serialized, format!("\"{wire}\""));
2243
2244 let deserialized: OtlpHttpProtocol = serde_json::from_str(&serialized).unwrap();
2245 assert_eq!(deserialized, protocol);
2246 }
2247 }
2248
2249 #[test]
2250 fn build_command_sets_otel_env_when_telemetry_enabled() {
2251 let opts = ClientOptions {
2252 telemetry: Some(TelemetryConfig {
2253 otlp_endpoint: Some("http://collector:4318".to_string()),
2254 otlp_protocol: Some(OtlpHttpProtocol::HttpProtobuf),
2255 file_path: Some(PathBuf::from("/var/log/copilot.jsonl")),
2256 exporter_type: Some(OtelExporterType::OtlpHttp),
2257 source_name: Some("my-app".to_string()),
2258 capture_content: Some(true),
2259 }),
2260 ..Default::default()
2261 };
2262 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2263 assert_eq!(
2264 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2265 Some(std::ffi::OsStr::new("true")),
2266 );
2267 assert_eq!(
2268 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2269 Some(std::ffi::OsStr::new("http://collector:4318")),
2270 );
2271 assert_eq!(
2272 env_value(&cmd, "OTEL_EXPORTER_OTLP_PROTOCOL"),
2273 Some(std::ffi::OsStr::new("http/protobuf")),
2274 );
2275 assert_eq!(
2276 env_value(&cmd, "COPILOT_OTEL_FILE_EXPORTER_PATH"),
2277 Some(std::ffi::OsStr::new("/var/log/copilot.jsonl")),
2278 );
2279 assert_eq!(
2280 env_value(&cmd, "COPILOT_OTEL_EXPORTER_TYPE"),
2281 Some(std::ffi::OsStr::new("otlp-http")),
2282 );
2283 assert_eq!(
2284 env_value(&cmd, "COPILOT_OTEL_SOURCE_NAME"),
2285 Some(std::ffi::OsStr::new("my-app")),
2286 );
2287 assert_eq!(
2288 env_value(&cmd, "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"),
2289 Some(std::ffi::OsStr::new("true")),
2290 );
2291 }
2292
2293 #[test]
2294 fn build_command_omits_otel_env_when_telemetry_none() {
2295 let opts = ClientOptions::default();
2296 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2297 for key in [
2298 "COPILOT_OTEL_ENABLED",
2299 "OTEL_EXPORTER_OTLP_ENDPOINT",
2300 "OTEL_EXPORTER_OTLP_PROTOCOL",
2301 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2302 "COPILOT_OTEL_EXPORTER_TYPE",
2303 "COPILOT_OTEL_SOURCE_NAME",
2304 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2305 ] {
2306 assert!(
2307 env_value(&cmd, key).is_none(),
2308 "expected {key} to be unset when telemetry is None",
2309 );
2310 }
2311 }
2312
2313 #[test]
2314 fn build_command_omits_unset_telemetry_fields() {
2315 let opts = ClientOptions {
2316 telemetry: Some(TelemetryConfig {
2317 otlp_endpoint: Some("http://collector:4318".to_string()),
2318 ..Default::default()
2319 }),
2320 ..Default::default()
2321 };
2322 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2323 assert_eq!(
2325 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2326 Some(std::ffi::OsStr::new("true")),
2327 );
2328 assert_eq!(
2329 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2330 Some(std::ffi::OsStr::new("http://collector:4318")),
2331 );
2332 for key in [
2334 "OTEL_EXPORTER_OTLP_PROTOCOL",
2335 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2336 "COPILOT_OTEL_EXPORTER_TYPE",
2337 "COPILOT_OTEL_SOURCE_NAME",
2338 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2339 ] {
2340 assert!(env_value(&cmd, key).is_none(), "{key} should be unset");
2341 }
2342 }
2343
2344 #[test]
2345 fn build_command_lets_user_env_override_telemetry() {
2346 let opts = ClientOptions {
2347 telemetry: Some(TelemetryConfig {
2348 otlp_endpoint: Some("http://from-config:4318".to_string()),
2349 ..Default::default()
2350 }),
2351 env: vec![(
2352 std::ffi::OsString::from("OTEL_EXPORTER_OTLP_ENDPOINT"),
2353 std::ffi::OsString::from("http://from-user-env:4318"),
2354 )],
2355 ..Default::default()
2356 };
2357 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2358 assert_eq!(
2359 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2360 Some(std::ffi::OsStr::new("http://from-user-env:4318")),
2361 "user-supplied options.env should override telemetry config",
2362 );
2363 }
2364
2365 #[test]
2366 fn build_command_sets_copilot_home_env_when_configured() {
2367 let opts = ClientOptions::new().with_base_directory(PathBuf::from("/custom/copilot"));
2368 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2369 assert_eq!(
2370 env_value(&cmd, "COPILOT_HOME"),
2371 Some(std::ffi::OsStr::new("/custom/copilot")),
2372 );
2373
2374 let opts = ClientOptions::default();
2375 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2376 assert!(env_value(&cmd, "COPILOT_HOME").is_none());
2377 }
2378
2379 #[test]
2380 fn build_command_sets_connection_token_env_when_configured() {
2381 let opts = ClientOptions::new().with_transport(Transport::Tcp {
2382 port: 0,
2383 connection_token: Some("secret-token".to_string()),
2384 });
2385 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2386 assert_eq!(
2387 env_value(&cmd, "COPILOT_CONNECTION_TOKEN"),
2388 Some(std::ffi::OsStr::new("secret-token")),
2389 );
2390
2391 let opts = ClientOptions::default();
2392 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2393 assert!(env_value(&cmd, "COPILOT_CONNECTION_TOKEN").is_none());
2394 }
2395
2396 #[tokio::test]
2397 async fn start_rejects_empty_connection_token() {
2398 let opts = ClientOptions::new()
2399 .with_transport(Transport::Tcp {
2400 port: 0,
2401 connection_token: Some(String::new()),
2402 })
2403 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2404 let err = Client::start(opts).await.unwrap_err();
2405 assert!(
2406 matches!(err.kind(), ErrorKind::InvalidConfig),
2407 "got {err:?}"
2408 );
2409 }
2410
2411 #[tokio::test]
2412 async fn start_rejects_empty_external_connection_token() {
2413 let opts = ClientOptions::new()
2414 .with_transport(Transport::External {
2415 host: "127.0.0.1".to_string(),
2416 port: 1,
2417 connection_token: Some(String::new()),
2418 })
2419 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2420 let err = Client::start(opts).await.unwrap_err();
2421 assert!(
2422 matches!(err.kind(), ErrorKind::InvalidConfig),
2423 "got {err:?}"
2424 );
2425 }
2426
2427 #[test]
2428 fn telemetry_config_capture_content_serializes_as_lowercase_bool() {
2429 let opts_true = ClientOptions {
2430 telemetry: Some(TelemetryConfig {
2431 capture_content: Some(true),
2432 ..Default::default()
2433 }),
2434 ..Default::default()
2435 };
2436 let opts_false = ClientOptions {
2437 telemetry: Some(TelemetryConfig {
2438 capture_content: Some(false),
2439 ..Default::default()
2440 }),
2441 ..Default::default()
2442 };
2443 let cmd_true = Client::build_command(Path::new("/bin/echo"), &opts_true);
2444 let cmd_false = Client::build_command(Path::new("/bin/echo"), &opts_false);
2445 assert_eq!(
2446 env_value(
2447 &cmd_true,
2448 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2449 ),
2450 Some(std::ffi::OsStr::new("true")),
2451 );
2452 assert_eq!(
2453 env_value(
2454 &cmd_false,
2455 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2456 ),
2457 Some(std::ffi::OsStr::new("false")),
2458 );
2459 }
2460
2461 #[test]
2462 fn session_idle_timeout_args_are_omitted_by_default() {
2463 let opts = ClientOptions::default();
2464 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2465 }
2466
2467 #[test]
2468 fn session_idle_timeout_args_omitted_for_zero() {
2469 let opts = ClientOptions {
2470 session_idle_timeout_seconds: Some(0),
2471 ..Default::default()
2472 };
2473 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2474 }
2475
2476 #[test]
2477 fn session_idle_timeout_args_emit_flag_for_positive_value() {
2478 let opts = ClientOptions {
2479 session_idle_timeout_seconds: Some(300),
2480 ..Default::default()
2481 };
2482 assert_eq!(
2483 Client::session_idle_timeout_args(&opts),
2484 vec!["--session-idle-timeout".to_string(), "300".to_string()]
2485 );
2486 }
2487
2488 #[test]
2489 fn remote_args_omitted_by_default() {
2490 let opts = ClientOptions::default();
2491 assert!(Client::remote_args(&opts).is_empty());
2492 }
2493
2494 #[test]
2495 fn remote_args_emit_flag_when_enabled() {
2496 let opts = ClientOptions {
2497 enable_remote_sessions: true,
2498 ..Default::default()
2499 };
2500 assert_eq!(Client::remote_args(&opts), vec!["--remote".to_string()]);
2501 }
2502
2503 #[test]
2504 fn log_level_args_omitted_when_unset() {
2505 let opts = ClientOptions::default();
2506 assert!(opts.log_level.is_none());
2507 assert!(
2508 Client::log_level_args(&opts).is_empty(),
2509 "with no caller-supplied log_level the SDK must not pass --log-level"
2510 );
2511 }
2512
2513 #[test]
2514 fn log_level_args_emit_flag_when_set() {
2515 let opts = ClientOptions::default().with_log_level(LogLevel::Debug);
2516 assert_eq!(Client::log_level_args(&opts), vec!["--log-level", "debug"]);
2517 }
2518
2519 #[test]
2520 fn log_level_str_round_trips() {
2521 for level in [
2522 LogLevel::None,
2523 LogLevel::Error,
2524 LogLevel::Warning,
2525 LogLevel::Info,
2526 LogLevel::Debug,
2527 LogLevel::All,
2528 ] {
2529 let s = level.as_str();
2530 let json = serde_json::to_string(&level).unwrap();
2531 assert_eq!(json, format!("\"{s}\""));
2532 let parsed: LogLevel = serde_json::from_str(&json).unwrap();
2533 assert_eq!(parsed, level);
2534 }
2535 }
2536
2537 #[test]
2538 fn client_options_debug_redacts_handler() {
2539 struct StubHandler;
2540 #[async_trait]
2541 impl ListModelsHandler for StubHandler {
2542 async fn list_models(&self) -> Result<Vec<Model>> {
2543 Ok(vec![])
2544 }
2545 }
2546 let opts = ClientOptions {
2547 on_list_models: Some(Arc::new(StubHandler)),
2548 github_token: Some("secret-token".into()),
2549 ..Default::default()
2550 };
2551 let debug = format!("{opts:?}");
2552 assert!(debug.contains("on_list_models: Some(\"<set>\")"));
2553 assert!(debug.contains("github_token: Some(\"<redacted>\")"));
2554 assert!(!debug.contains("secret-token"));
2555 }
2556
2557 #[tokio::test]
2558 async fn list_models_uses_on_list_models_handler_when_set() {
2559 use std::sync::atomic::{AtomicUsize, Ordering};
2560
2561 struct CountingHandler {
2562 calls: Arc<AtomicUsize>,
2563 models: Vec<Model>,
2564 }
2565 #[async_trait]
2566 impl ListModelsHandler for CountingHandler {
2567 async fn list_models(&self) -> Result<Vec<Model>> {
2568 self.calls.fetch_add(1, Ordering::SeqCst);
2569 Ok(self.models.clone())
2570 }
2571 }
2572
2573 let calls = Arc::new(AtomicUsize::new(0));
2574 let model = Model {
2575 id: "byok-gpt-4".into(),
2576 name: "BYOK GPT-4".into(),
2577 ..Default::default()
2578 };
2579 let handler: Arc<dyn ListModelsHandler> = Arc::new(CountingHandler {
2580 calls: Arc::clone(&calls),
2581 models: vec![model.clone()],
2582 });
2583
2584 let client = client_with_list_models_handler(handler);
2585
2586 let result = client.list_models().await.unwrap();
2587 assert_eq!(result.len(), 1);
2588 assert_eq!(result[0].id, "byok-gpt-4");
2589 assert_eq!(calls.load(Ordering::SeqCst), 1);
2590 }
2591
2592 #[tokio::test]
2593 async fn list_models_serializes_concurrent_cache_misses() {
2594 use std::sync::atomic::{AtomicUsize, Ordering};
2595
2596 struct SlowCountingHandler {
2597 calls: Arc<AtomicUsize>,
2598 models: Vec<Model>,
2599 }
2600 #[async_trait]
2601 impl ListModelsHandler for SlowCountingHandler {
2602 async fn list_models(&self) -> Result<Vec<Model>> {
2603 self.calls.fetch_add(1, Ordering::SeqCst);
2604 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
2605 Ok(self.models.clone())
2606 }
2607 }
2608
2609 let calls = Arc::new(AtomicUsize::new(0));
2610 let model = Model {
2611 id: "single-flight-model".into(),
2612 name: "Single Flight Model".into(),
2613 ..Default::default()
2614 };
2615 let handler: Arc<dyn ListModelsHandler> = Arc::new(SlowCountingHandler {
2616 calls: Arc::clone(&calls),
2617 models: vec![model],
2618 });
2619 let client = client_with_list_models_handler(handler);
2620
2621 let (first, second) = tokio::join!(client.list_models(), client.list_models());
2622 assert_eq!(first.unwrap()[0].id, "single-flight-model");
2623 assert_eq!(second.unwrap()[0].id, "single-flight-model");
2624 assert_eq!(calls.load(Ordering::SeqCst), 1);
2625 }
2626
2627 #[tokio::test]
2628 async fn cancelled_resume_session_unregisters_pending_session() {
2629 let (client_write, _server_read) = tokio::io::duplex(8192);
2630 let (_server_write, client_read) = tokio::io::duplex(8192);
2631 let client = Client::from_streams(client_read, client_write, std::env::temp_dir()).unwrap();
2632 let session_id = SessionId::new("resume-cancel-test");
2633 let handle = tokio::spawn({
2634 let client = client.clone();
2635 async move {
2636 client
2637 .resume_session(ResumeSessionConfig::new(session_id))
2638 .await
2639 }
2640 });
2641
2642 wait_for_pending_session_registration(&client).await;
2643 handle.abort();
2644 let _ = handle.await;
2645
2646 assert!(client.inner.router.session_ids().is_empty());
2647 client.force_stop();
2648 }
2649
2650 fn client_with_list_models_handler(handler: Arc<dyn ListModelsHandler>) -> Client {
2651 Client {
2652 inner: Arc::new(ClientInner {
2653 child: parking_lot::Mutex::new(None),
2654 rpc: {
2655 let (req_tx, _req_rx) = mpsc::unbounded_channel();
2656 let (notif_tx, _notif_rx) = broadcast::channel(16);
2657 let (read_pipe, _write_pipe) = tokio::io::duplex(64);
2658 let (_unused_read, write_pipe) = tokio::io::duplex(64);
2659 JsonRpcClient::new(write_pipe, read_pipe, notif_tx, req_tx)
2660 },
2661 cwd: PathBuf::from("."),
2662 request_rx: parking_lot::Mutex::new(None),
2663 notification_tx: broadcast::channel(16).0,
2664 router: router::SessionRouter::new(),
2665 negotiated_protocol_version: OnceLock::new(),
2666 state: parking_lot::Mutex::new(ConnectionState::Connected),
2667 lifecycle_tx: broadcast::channel(16).0,
2668 on_list_models: Some(handler),
2669 models_cache: parking_lot::Mutex::new(Arc::new(tokio::sync::OnceCell::new())),
2670 session_fs_configured: false,
2671 session_fs_sqlite_declared: false,
2672 on_get_trace_context: None,
2673 effective_connection_token: None,
2674 mode: ClientMode::default(),
2675 }),
2676 }
2677 }
2678
2679 async fn wait_for_pending_session_registration(client: &Client) {
2680 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
2681 while client.inner.router.session_ids().is_empty() {
2682 assert!(
2683 tokio::time::Instant::now() < deadline,
2684 "session was not registered"
2685 );
2686 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2687 }
2688 }
2689}