1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3#![deny(rustdoc::broken_intra_doc_links)]
4#![cfg_attr(test, allow(clippy::unwrap_used))]
5
6pub mod embeddedcli;
8pub mod handler;
10pub mod hooks;
12mod jsonrpc;
13pub mod permission;
15pub mod resolve;
17mod router;
18pub mod session;
20pub mod session_fs;
22mod session_fs_dispatch;
23pub mod subscription;
25pub mod tool;
27pub mod trace_context;
29pub mod transforms;
31pub mod types;
33
34pub mod generated;
36
37use std::ffi::OsString;
38use std::path::{Path, PathBuf};
39use std::process::Stdio;
40use std::sync::{Arc, OnceLock};
41
42use async_trait::async_trait;
43pub(crate) use jsonrpc::{
46 JsonRpcClient, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, error_codes,
47};
48
49#[cfg(feature = "test-support")]
51pub mod test_support {
52 pub use crate::jsonrpc::{
53 JsonRpcClient, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
54 error_codes,
55 };
56}
57use serde::{Deserialize, Serialize};
58use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader};
59use tokio::net::TcpStream;
60use tokio::process::{Child, Command};
61use tokio::sync::{broadcast, mpsc, oneshot};
62use tracing::{Instrument, debug, error, info, warn};
63pub use types::*;
64
65mod sdk_protocol_version;
66pub use sdk_protocol_version::{SDK_PROTOCOL_VERSION, get_sdk_protocol_version};
67pub use subscription::{EventSubscription, Lagged, LifecycleSubscription, RecvError};
68
69const MIN_PROTOCOL_VERSION: u32 = 2;
71
72#[derive(Debug, thiserror::Error)]
74#[non_exhaustive]
75pub enum Error {
76 #[error("protocol error: {0}")]
78 Protocol(ProtocolError),
79
80 #[error("RPC error {code}: {message}")]
82 Rpc {
83 code: i32,
85 message: String,
87 },
88
89 #[error("session error: {0}")]
91 Session(SessionError),
92
93 #[error(transparent)]
95 Io(#[from] std::io::Error),
96
97 #[error(transparent)]
99 Json(#[from] serde_json::Error),
100
101 #[error("binary not found: {name} ({hint})")]
103 BinaryNotFound {
104 name: &'static str,
106 hint: &'static str,
108 },
109
110 #[error("invalid client configuration: {0}")]
115 InvalidConfig(String),
116}
117
118impl Error {
119 pub fn is_transport_failure(&self) -> bool {
123 matches!(
124 self,
125 Error::Protocol(ProtocolError::RequestCancelled) | Error::Io(_)
126 )
127 }
128}
129
130#[derive(Debug)]
142pub struct StopErrors(Vec<Error>);
143
144impl StopErrors {
145 pub fn errors(&self) -> &[Error] {
148 &self.0
149 }
150
151 pub fn into_errors(self) -> Vec<Error> {
153 self.0
154 }
155}
156
157impl std::fmt::Display for StopErrors {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match self.0.as_slice() {
160 [] => write!(f, "stop completed with no errors"),
161 [only] => write!(f, "stop failed: {only}"),
162 [first, rest @ ..] => write!(
163 f,
164 "stop failed with {n} errors; first: {first}",
165 n = 1 + rest.len(),
166 ),
167 }
168 }
169}
170
171impl std::error::Error for StopErrors {
172 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
173 self.0
174 .first()
175 .map(|e| e as &(dyn std::error::Error + 'static))
176 }
177}
178
179#[derive(Debug, thiserror::Error)]
181#[non_exhaustive]
182pub enum ProtocolError {
183 #[error("missing Content-Length header")]
185 MissingContentLength,
186
187 #[error("invalid Content-Length value: \"{0}\"")]
189 InvalidContentLength(String),
190
191 #[error("request cancelled")]
193 RequestCancelled,
194
195 #[error("timed out waiting for CLI to report listening port")]
197 CliStartupTimeout,
198
199 #[error("CLI exited before reporting listening port")]
201 CliStartupFailed,
202
203 #[error("version mismatch: server={server}, supported={min}–{max}")]
205 VersionMismatch {
206 server: u32,
208 min: u32,
210 max: u32,
212 },
213
214 #[error("version changed: was {previous}, now {current}")]
216 VersionChanged {
217 previous: u32,
219 current: u32,
221 },
222}
223
224#[derive(Debug, thiserror::Error)]
226#[non_exhaustive]
227pub enum SessionError {
228 #[error("session not found: {0}")]
230 NotFound(SessionId),
231
232 #[error("{0}")]
234 AgentError(String),
235
236 #[error("timed out after {0:?}")]
238 Timeout(std::time::Duration),
239
240 #[error("cannot send while send_and_wait is in flight")]
242 SendWhileWaiting,
243
244 #[error("event loop closed before session reached idle")]
246 EventLoopClosed,
247
248 #[error(
251 "elicitation not supported by host — check session.capabilities().ui.elicitation first"
252 )]
253 ElicitationNotSupported,
254
255 #[error(
260 "session was created on a client with session_fs configured but no SessionFsProvider was supplied"
261 )]
262 SessionFsProviderRequired,
263
264 #[error("invalid SessionFsConfig: {0}")]
268 InvalidSessionFsConfig(String),
269}
270
271#[derive(Debug, Default)]
273#[non_exhaustive]
274pub enum Transport {
275 #[default]
277 Stdio,
278 Tcp {
280 port: u16,
282 },
283 External {
285 host: String,
287 port: u16,
289 },
290}
291
292#[derive(Debug, Clone, Default)]
294pub enum CliProgram {
295 #[default]
298 Resolve,
299 Path(PathBuf),
301}
302
303impl From<PathBuf> for CliProgram {
304 fn from(path: PathBuf) -> Self {
305 Self::Path(path)
306 }
307}
308
309#[non_exhaustive]
318pub struct ClientOptions {
319 pub program: CliProgram,
321 pub prefix_args: Vec<OsString>,
323 pub cwd: PathBuf,
325 pub env: Vec<(OsString, OsString)>,
327 pub env_remove: Vec<OsString>,
329 pub extra_args: Vec<String>,
331 pub transport: Transport,
333 pub github_token: Option<String>,
338 pub use_logged_in_user: Option<bool>,
342 pub log_level: Option<LogLevel>,
345 pub session_idle_timeout_seconds: Option<u64>,
351 pub on_list_models: Option<Arc<dyn ListModelsHandler>>,
359 pub session_fs: Option<SessionFsConfig>,
367 pub on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
377 pub telemetry: Option<TelemetryConfig>,
381 pub copilot_home: Option<PathBuf>,
386 pub tcp_connection_token: Option<String>,
396}
397
398impl std::fmt::Debug for ClientOptions {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("ClientOptions")
401 .field("program", &self.program)
402 .field("prefix_args", &self.prefix_args)
403 .field("cwd", &self.cwd)
404 .field("env", &self.env)
405 .field("env_remove", &self.env_remove)
406 .field("extra_args", &self.extra_args)
407 .field("transport", &self.transport)
408 .field(
409 "github_token",
410 &self.github_token.as_ref().map(|_| "<redacted>"),
411 )
412 .field("use_logged_in_user", &self.use_logged_in_user)
413 .field("log_level", &self.log_level)
414 .field(
415 "session_idle_timeout_seconds",
416 &self.session_idle_timeout_seconds,
417 )
418 .field(
419 "on_list_models",
420 &self.on_list_models.as_ref().map(|_| "<set>"),
421 )
422 .field("session_fs", &self.session_fs)
423 .field(
424 "on_get_trace_context",
425 &self.on_get_trace_context.as_ref().map(|_| "<set>"),
426 )
427 .field("telemetry", &self.telemetry)
428 .field("copilot_home", &self.copilot_home)
429 .field(
430 "tcp_connection_token",
431 &self.tcp_connection_token.as_ref().map(|_| "<redacted>"),
432 )
433 .finish()
434 }
435}
436
437#[async_trait]
446pub trait ListModelsHandler: Send + Sync + 'static {
447 async fn list_models(&self) -> Result<Vec<Model>, Error>;
449}
450
451#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
453#[serde(rename_all = "lowercase")]
454pub enum LogLevel {
455 None,
457 Error,
459 Warning,
461 Info,
463 Debug,
465 All,
467}
468
469impl LogLevel {
470 pub fn as_str(self) -> &'static str {
472 match self {
473 Self::None => "none",
474 Self::Error => "error",
475 Self::Warning => "warning",
476 Self::Info => "info",
477 Self::Debug => "debug",
478 Self::All => "all",
479 }
480 }
481}
482
483impl std::fmt::Display for LogLevel {
484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485 f.write_str(self.as_str())
486 }
487}
488
489#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
494#[serde(rename_all = "kebab-case")]
495#[non_exhaustive]
496pub enum OtelExporterType {
497 OtlpHttp,
500 File,
503}
504
505impl OtelExporterType {
506 pub fn as_str(self) -> &'static str {
508 match self {
509 Self::OtlpHttp => "otlp-http",
510 Self::File => "file",
511 }
512 }
513}
514
515#[derive(Debug, Clone, Default)]
548#[non_exhaustive]
549pub struct TelemetryConfig {
550 pub otlp_endpoint: Option<String>,
552 pub file_path: Option<PathBuf>,
554 pub exporter_type: Option<OtelExporterType>,
557 pub source_name: Option<String>,
561 pub capture_content: Option<bool>,
565}
566
567impl TelemetryConfig {
568 pub fn new() -> Self {
571 Self::default()
572 }
573
574 pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
576 self.otlp_endpoint = Some(endpoint.into());
577 self
578 }
579
580 pub fn with_file_path(mut self, path: impl Into<PathBuf>) -> Self {
582 self.file_path = Some(path.into());
583 self
584 }
585
586 pub fn with_exporter_type(mut self, exporter_type: OtelExporterType) -> Self {
588 self.exporter_type = Some(exporter_type);
589 self
590 }
591
592 pub fn with_source_name(mut self, source_name: impl Into<String>) -> Self {
596 self.source_name = Some(source_name.into());
597 self
598 }
599
600 pub fn with_capture_content(mut self, capture: bool) -> Self {
604 self.capture_content = Some(capture);
605 self
606 }
607
608 pub fn is_empty(&self) -> bool {
611 self.otlp_endpoint.is_none()
612 && self.file_path.is_none()
613 && self.exporter_type.is_none()
614 && self.source_name.is_none()
615 && self.capture_content.is_none()
616 }
617}
618
619impl Default for ClientOptions {
620 fn default() -> Self {
621 Self {
622 program: CliProgram::Resolve,
623 prefix_args: Vec::new(),
624 cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
625 env: Vec::new(),
626 env_remove: Vec::new(),
627 extra_args: Vec::new(),
628 transport: Transport::default(),
629 github_token: None,
630 use_logged_in_user: None,
631 log_level: None,
632 session_idle_timeout_seconds: None,
633 on_list_models: None,
634 session_fs: None,
635 on_get_trace_context: None,
636 telemetry: None,
637 copilot_home: None,
638 tcp_connection_token: None,
639 }
640 }
641}
642
643impl ClientOptions {
644 pub fn new() -> Self {
660 Self::default()
661 }
662
663 pub fn with_program(mut self, program: impl Into<CliProgram>) -> Self {
665 self.program = program.into();
666 self
667 }
668
669 pub fn with_prefix_args<I, S>(mut self, args: I) -> Self
671 where
672 I: IntoIterator<Item = S>,
673 S: Into<OsString>,
674 {
675 self.prefix_args = args.into_iter().map(Into::into).collect();
676 self
677 }
678
679 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
681 self.cwd = cwd.into();
682 self
683 }
684
685 pub fn with_env<I, K, V>(mut self, env: I) -> Self
687 where
688 I: IntoIterator<Item = (K, V)>,
689 K: Into<OsString>,
690 V: Into<OsString>,
691 {
692 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
693 self
694 }
695
696 pub fn with_env_remove<I, S>(mut self, names: I) -> Self
698 where
699 I: IntoIterator<Item = S>,
700 S: Into<OsString>,
701 {
702 self.env_remove = names.into_iter().map(Into::into).collect();
703 self
704 }
705
706 pub fn with_extra_args<I, S>(mut self, args: I) -> Self
708 where
709 I: IntoIterator<Item = S>,
710 S: Into<String>,
711 {
712 self.extra_args = args.into_iter().map(Into::into).collect();
713 self
714 }
715
716 pub fn with_transport(mut self, transport: Transport) -> Self {
718 self.transport = transport;
719 self
720 }
721
722 pub fn with_github_token(mut self, token: impl Into<String>) -> Self {
725 self.github_token = Some(token.into());
726 self
727 }
728
729 pub fn with_use_logged_in_user(mut self, use_logged_in: bool) -> Self {
732 self.use_logged_in_user = Some(use_logged_in);
733 self
734 }
735
736 pub fn with_log_level(mut self, level: LogLevel) -> Self {
738 self.log_level = Some(level);
739 self
740 }
741
742 pub fn with_session_idle_timeout_seconds(mut self, seconds: u64) -> Self {
745 self.session_idle_timeout_seconds = Some(seconds);
746 self
747 }
748
749 pub fn with_list_models_handler<H>(mut self, handler: H) -> Self
752 where
753 H: ListModelsHandler + 'static,
754 {
755 self.on_list_models = Some(Arc::new(handler));
756 self
757 }
758
759 pub fn with_session_fs(mut self, config: SessionFsConfig) -> Self {
761 self.session_fs = Some(config);
762 self
763 }
764
765 pub fn with_trace_context_provider<P>(mut self, provider: P) -> Self
769 where
770 P: TraceContextProvider + 'static,
771 {
772 self.on_get_trace_context = Some(Arc::new(provider));
773 self
774 }
775
776 pub fn with_telemetry(mut self, config: TelemetryConfig) -> Self {
778 self.telemetry = Some(config);
779 self
780 }
781
782 pub fn with_copilot_home(mut self, home: impl Into<PathBuf>) -> Self {
785 self.copilot_home = Some(home.into());
786 self
787 }
788
789 pub fn with_tcp_connection_token(mut self, token: impl Into<String>) -> Self {
793 self.tcp_connection_token = Some(token.into());
794 self
795 }
796}
797
798fn validate_session_fs_config(cfg: &SessionFsConfig) -> Result<(), Error> {
800 if cfg.initial_cwd.trim().is_empty() {
801 return Err(Error::Session(SessionError::InvalidSessionFsConfig(
802 "initial_cwd must not be empty".to_string(),
803 )));
804 }
805 if cfg.session_state_path.trim().is_empty() {
806 return Err(Error::Session(SessionError::InvalidSessionFsConfig(
807 "session_state_path must not be empty".to_string(),
808 )));
809 }
810 Ok(())
811}
812
813fn generate_connection_token() -> String {
820 let mut bytes = [0u8; 16];
821 getrandom::getrandom(&mut bytes)
822 .expect("OS CSPRNG (getrandom) is unavailable; cannot generate connection token");
823 let mut hex = String::with_capacity(32);
824 for byte in bytes {
825 use std::fmt::Write;
826 let _ = write!(hex, "{byte:02x}");
827 }
828 hex
829}
830
831#[derive(Clone)]
836pub struct Client {
837 inner: Arc<ClientInner>,
838}
839
840impl std::fmt::Debug for Client {
841 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
842 f.debug_struct("Client")
843 .field("cwd", &self.inner.cwd)
844 .field("pid", &self.pid())
845 .finish()
846 }
847}
848
849struct ClientInner {
850 child: parking_lot::Mutex<Option<Child>>,
851 rpc: JsonRpcClient,
852 cwd: PathBuf,
853 request_rx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<JsonRpcRequest>>>,
854 notification_tx: broadcast::Sender<JsonRpcNotification>,
855 router: router::SessionRouter,
856 negotiated_protocol_version: OnceLock<u32>,
857 state: parking_lot::Mutex<ConnectionState>,
858 lifecycle_tx: broadcast::Sender<SessionLifecycleEvent>,
859 on_list_models: Option<Arc<dyn ListModelsHandler>>,
860 session_fs_configured: bool,
861 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
862 effective_connection_token: Option<String>,
867}
868
869impl Client {
870 pub async fn start(options: ClientOptions) -> Result<Self, Error> {
883 if let Some(cfg) = &options.session_fs {
884 validate_session_fs_config(cfg)?;
885 }
886 if let Some(token) = &options.tcp_connection_token {
890 if token.is_empty() {
891 return Err(Error::InvalidConfig(
892 "tcp_connection_token must be a non-empty string".to_string(),
893 ));
894 }
895 if matches!(options.transport, Transport::Stdio) {
896 return Err(Error::InvalidConfig(
897 "tcp_connection_token cannot be used with Transport::Stdio".to_string(),
898 ));
899 }
900 }
901 let effective_connection_token: Option<String> = match &options.transport {
902 Transport::Stdio => None,
903 Transport::Tcp { .. } => Some(
904 options
905 .tcp_connection_token
906 .clone()
907 .unwrap_or_else(generate_connection_token),
908 ),
909 Transport::External { .. } => options.tcp_connection_token.clone(),
910 };
911 let mut options = options;
912 if matches!(options.transport, Transport::Tcp { .. })
913 && options.tcp_connection_token.is_none()
914 {
915 options.tcp_connection_token = effective_connection_token.clone();
918 }
919 let session_fs_config = options.session_fs.clone();
920 let program = match &options.program {
921 CliProgram::Path(path) => {
922 info!(path = %path.display(), "using explicit copilot CLI path");
923 path.clone()
924 }
925 CliProgram::Resolve => {
926 let resolved = resolve::copilot_binary()?;
927 info!(path = %resolved.display(), "resolved copilot CLI");
928 #[cfg(windows)]
929 {
930 if let Some(ext) = resolved.extension().and_then(|e| e.to_str()) {
931 if ext.eq_ignore_ascii_case("cmd") || ext.eq_ignore_ascii_case("bat") {
932 warn!(
933 path = %resolved.display(),
934 ext = %ext,
935 "resolved copilot CLI is a .cmd/.bat wrapper; \
936 this may cause console window flashes on Windows"
937 );
938 }
939 }
940 }
941 resolved
942 }
943 };
944
945 let client = match options.transport {
946 Transport::External { ref host, port } => {
947 info!(host = %host, port = %port, "connecting to external CLI server");
948 let stream = TcpStream::connect((host.as_str(), port)).await?;
949 let (reader, writer) = tokio::io::split(stream);
950 Self::from_transport(
951 reader,
952 writer,
953 None,
954 options.cwd,
955 options.on_list_models,
956 session_fs_config.is_some(),
957 options.on_get_trace_context,
958 effective_connection_token.clone(),
959 )?
960 }
961 Transport::Tcp { port } => {
962 let (mut child, actual_port) = Self::spawn_tcp(&program, &options, port).await?;
963 let stream = TcpStream::connect(("127.0.0.1", actual_port)).await?;
964 let (reader, writer) = tokio::io::split(stream);
965 Self::drain_stderr(&mut child);
966 Self::from_transport(
967 reader,
968 writer,
969 Some(child),
970 options.cwd,
971 options.on_list_models,
972 session_fs_config.is_some(),
973 options.on_get_trace_context,
974 effective_connection_token.clone(),
975 )?
976 }
977 Transport::Stdio => {
978 let mut child = Self::spawn_stdio(&program, &options)?;
979 let stdin = child.stdin.take().expect("stdin is piped");
980 let stdout = child.stdout.take().expect("stdout is piped");
981 Self::drain_stderr(&mut child);
982 Self::from_transport(
983 stdout,
984 stdin,
985 Some(child),
986 options.cwd,
987 options.on_list_models,
988 session_fs_config.is_some(),
989 options.on_get_trace_context,
990 effective_connection_token.clone(),
991 )?
992 }
993 };
994
995 client.verify_protocol_version().await?;
996 if let Some(cfg) = session_fs_config {
997 let request = crate::generated::api_types::SessionFsSetProviderRequest {
998 conventions: cfg.conventions.into_wire(),
999 initial_cwd: cfg.initial_cwd,
1000 session_state_path: cfg.session_state_path,
1001 };
1002 client.rpc().session_fs().set_provider(request).await?;
1003 }
1004 Ok(client)
1005 }
1006
1007 pub fn from_streams(
1011 reader: impl AsyncRead + Unpin + Send + 'static,
1012 writer: impl AsyncWrite + Unpin + Send + 'static,
1013 cwd: PathBuf,
1014 ) -> Result<Self, Error> {
1015 Self::from_transport(reader, writer, None, cwd, None, false, None, None)
1016 }
1017
1018 #[cfg(any(test, feature = "test-support"))]
1026 pub fn from_streams_with_trace_provider(
1027 reader: impl AsyncRead + Unpin + Send + 'static,
1028 writer: impl AsyncWrite + Unpin + Send + 'static,
1029 cwd: PathBuf,
1030 provider: Arc<dyn TraceContextProvider>,
1031 ) -> Result<Self, Error> {
1032 Self::from_transport(reader, writer, None, cwd, None, false, Some(provider), None)
1033 }
1034
1035 #[cfg(any(test, feature = "test-support"))]
1039 pub fn from_streams_with_connection_token(
1040 reader: impl AsyncRead + Unpin + Send + 'static,
1041 writer: impl AsyncWrite + Unpin + Send + 'static,
1042 cwd: PathBuf,
1043 token: Option<String>,
1044 ) -> Result<Self, Error> {
1045 Self::from_transport(reader, writer, None, cwd, None, false, None, token)
1046 }
1047
1048 #[cfg(any(test, feature = "test-support"))]
1054 pub fn generate_connection_token_for_test() -> String {
1055 generate_connection_token()
1056 }
1057
1058 #[allow(clippy::too_many_arguments)]
1059 fn from_transport(
1060 reader: impl AsyncRead + Unpin + Send + 'static,
1061 writer: impl AsyncWrite + Unpin + Send + 'static,
1062 child: Option<Child>,
1063 cwd: PathBuf,
1064 on_list_models: Option<Arc<dyn ListModelsHandler>>,
1065 session_fs_configured: bool,
1066 on_get_trace_context: Option<Arc<dyn TraceContextProvider>>,
1067 effective_connection_token: Option<String>,
1068 ) -> Result<Self, Error> {
1069 let (request_tx, request_rx) = mpsc::unbounded_channel::<JsonRpcRequest>();
1070 let (notification_broadcast_tx, _) = broadcast::channel::<JsonRpcNotification>(1024);
1071 let rpc = JsonRpcClient::new(
1072 writer,
1073 reader,
1074 notification_broadcast_tx.clone(),
1075 request_tx,
1076 );
1077
1078 let pid = child.as_ref().and_then(|c| c.id());
1079 info!(pid = ?pid, "copilot CLI client ready");
1080
1081 let client = Self {
1082 inner: Arc::new(ClientInner {
1083 child: parking_lot::Mutex::new(child),
1084 rpc,
1085 cwd,
1086 request_rx: parking_lot::Mutex::new(Some(request_rx)),
1087 notification_tx: notification_broadcast_tx,
1088 router: router::SessionRouter::new(),
1089 negotiated_protocol_version: OnceLock::new(),
1090 state: parking_lot::Mutex::new(ConnectionState::Connected),
1091 lifecycle_tx: broadcast::channel(256).0,
1092 on_list_models,
1093 session_fs_configured,
1094 on_get_trace_context,
1095 effective_connection_token,
1096 }),
1097 };
1098 client.spawn_lifecycle_dispatcher();
1099 Ok(client)
1100 }
1101
1102 fn spawn_lifecycle_dispatcher(&self) {
1106 let inner = Arc::clone(&self.inner);
1107 let mut notif_rx = inner.notification_tx.subscribe();
1108 tokio::spawn(async move {
1109 loop {
1110 match notif_rx.recv().await {
1111 Ok(notification) => {
1112 if notification.method != "session.lifecycle" {
1113 continue;
1114 }
1115 let Some(params) = notification.params.as_ref() else {
1116 continue;
1117 };
1118 let event: SessionLifecycleEvent =
1119 match serde_json::from_value(params.clone()) {
1120 Ok(e) => e,
1121 Err(e) => {
1122 warn!(
1123 error = %e,
1124 "failed to deserialize session.lifecycle notification"
1125 );
1126 continue;
1127 }
1128 };
1129 let _ = inner.lifecycle_tx.send(event);
1132 }
1133 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1134 warn!(missed = n, "lifecycle dispatcher lagged");
1135 }
1136 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1137 }
1138 }
1139 });
1140 }
1141
1142 fn build_command(program: &Path, options: &ClientOptions) -> Command {
1143 let mut command = Command::new(program);
1144 for arg in &options.prefix_args {
1145 command.arg(arg);
1146 }
1147 if let Some(token) = &options.github_token {
1150 command.env("COPILOT_SDK_AUTH_TOKEN", token);
1151 }
1152 if let Some(telemetry) = &options.telemetry {
1155 command.env("COPILOT_OTEL_ENABLED", "true");
1156 if let Some(endpoint) = &telemetry.otlp_endpoint {
1157 command.env("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
1158 }
1159 if let Some(path) = &telemetry.file_path {
1160 command.env("COPILOT_OTEL_FILE_EXPORTER_PATH", path);
1161 }
1162 if let Some(exporter) = telemetry.exporter_type {
1163 command.env("COPILOT_OTEL_EXPORTER_TYPE", exporter.as_str());
1164 }
1165 if let Some(source) = &telemetry.source_name {
1166 command.env("COPILOT_OTEL_SOURCE_NAME", source);
1167 }
1168 if let Some(capture) = telemetry.capture_content {
1169 command.env(
1170 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
1171 if capture { "true" } else { "false" },
1172 );
1173 }
1174 }
1175 if let Some(home) = &options.copilot_home {
1176 command.env("COPILOT_HOME", home);
1177 }
1178 if let Some(token) = &options.tcp_connection_token {
1179 command.env("COPILOT_CONNECTION_TOKEN", token);
1180 }
1181 for (key, value) in &options.env {
1182 command.env(key, value);
1183 }
1184 for key in &options.env_remove {
1185 command.env_remove(key);
1186 }
1187 command
1188 .current_dir(&options.cwd)
1189 .stdout(Stdio::piped())
1190 .stderr(Stdio::piped());
1191
1192 #[cfg(windows)]
1193 {
1194 use std::os::windows::process::CommandExt;
1195 const CREATE_NO_WINDOW: u32 = 0x08000000;
1196 command.as_std_mut().creation_flags(CREATE_NO_WINDOW);
1197 }
1198
1199 command
1200 }
1201
1202 fn auth_args(options: &ClientOptions) -> Vec<&'static str> {
1210 let mut args: Vec<&'static str> = Vec::new();
1211 if options.github_token.is_some() {
1212 args.push("--auth-token-env");
1213 args.push("COPILOT_SDK_AUTH_TOKEN");
1214 }
1215 let use_logged_in = options
1216 .use_logged_in_user
1217 .unwrap_or(options.github_token.is_none());
1218 if !use_logged_in {
1219 args.push("--no-auto-login");
1220 }
1221 args
1222 }
1223
1224 fn session_idle_timeout_args(options: &ClientOptions) -> Vec<String> {
1228 match options.session_idle_timeout_seconds {
1229 Some(secs) if secs > 0 => {
1230 vec!["--session-idle-timeout".to_string(), secs.to_string()]
1231 }
1232 _ => Vec::new(),
1233 }
1234 }
1235
1236 fn spawn_stdio(program: &Path, options: &ClientOptions) -> Result<Child, Error> {
1237 info!(cwd = ?options.cwd, program = %program.display(), "spawning copilot CLI (stdio)");
1238 let mut command = Self::build_command(program, options);
1239 let log_level = options.log_level.unwrap_or(LogLevel::Info);
1240 command
1241 .args([
1242 "--server",
1243 "--stdio",
1244 "--no-auto-update",
1245 "--log-level",
1246 log_level.as_str(),
1247 ])
1248 .args(Self::auth_args(options))
1249 .args(Self::session_idle_timeout_args(options))
1250 .args(&options.extra_args)
1251 .stdin(Stdio::piped());
1252 Ok(command.spawn()?)
1253 }
1254
1255 async fn spawn_tcp(
1256 program: &Path,
1257 options: &ClientOptions,
1258 port: u16,
1259 ) -> Result<(Child, u16), Error> {
1260 info!(cwd = ?options.cwd, program = %program.display(), port = %port, "spawning copilot CLI (tcp)");
1261 let mut command = Self::build_command(program, options);
1262 let log_level = options.log_level.unwrap_or(LogLevel::Info);
1263 command
1264 .args([
1265 "--server",
1266 "--port",
1267 &port.to_string(),
1268 "--no-auto-update",
1269 "--log-level",
1270 log_level.as_str(),
1271 ])
1272 .args(Self::auth_args(options))
1273 .args(Self::session_idle_timeout_args(options))
1274 .args(&options.extra_args)
1275 .stdin(Stdio::null());
1276 let mut child = command.spawn()?;
1277 let stdout = child.stdout.take().expect("stdout is piped");
1278
1279 let (port_tx, port_rx) = oneshot::channel::<u16>();
1280 let span = tracing::error_span!("copilot_cli_port_scan");
1281 tokio::spawn(
1282 async move {
1283 let port_re = regex::Regex::new(r"listening on port (\d+)").expect("valid regex");
1285 let mut lines = BufReader::new(stdout).lines();
1286 let mut port_tx = Some(port_tx);
1287 while let Ok(Some(line)) = lines.next_line().await {
1288 debug!(line = %line, "CLI stdout");
1289 if let Some(tx) = port_tx.take() {
1290 if let Some(caps) = port_re.captures(&line)
1291 && let Some(p) =
1292 caps.get(1).and_then(|m| m.as_str().parse::<u16>().ok())
1293 {
1294 let _ = tx.send(p);
1295 continue;
1296 }
1297 port_tx = Some(tx);
1299 }
1300 }
1301 }
1302 .instrument(span),
1303 );
1304
1305 let actual_port = tokio::time::timeout(std::time::Duration::from_secs(10), port_rx)
1306 .await
1307 .map_err(|_| Error::Protocol(ProtocolError::CliStartupTimeout))?
1308 .map_err(|_| Error::Protocol(ProtocolError::CliStartupFailed))?;
1309
1310 info!(port = %actual_port, "CLI server listening");
1311 Ok((child, actual_port))
1312 }
1313
1314 fn drain_stderr(child: &mut Child) {
1315 if let Some(stderr) = child.stderr.take() {
1316 let span = tracing::error_span!("copilot_cli");
1317 tokio::spawn(
1318 async move {
1319 let mut reader = BufReader::new(stderr).lines();
1320 while let Ok(Some(line)) = reader.next_line().await {
1321 warn!(line = %line, "CLI stderr");
1322 }
1323 }
1324 .instrument(span),
1325 );
1326 }
1327 }
1328
1329 pub fn cwd(&self) -> &PathBuf {
1331 &self.inner.cwd
1332 }
1333
1334 pub fn rpc(&self) -> crate::generated::rpc::ClientRpc<'_> {
1345 crate::generated::rpc::ClientRpc { client: self }
1346 }
1347
1348 pub(crate) async fn send_request(
1350 &self,
1351 method: &str,
1352 params: Option<serde_json::Value>,
1353 ) -> Result<JsonRpcResponse, Error> {
1354 self.inner.rpc.send_request(method, params).await
1355 }
1356
1357 pub async fn call(
1377 &self,
1378 method: &str,
1379 params: Option<serde_json::Value>,
1380 ) -> Result<serde_json::Value, Error> {
1381 let session_id: Option<SessionId> = params
1382 .as_ref()
1383 .and_then(|p| p.get("sessionId"))
1384 .and_then(|v| v.as_str())
1385 .map(SessionId::from);
1386 let response = self.send_request(method, params).await?;
1387 if let Some(err) = response.error {
1388 if err.message.contains("Session not found") {
1389 return Err(Error::Session(SessionError::NotFound(
1390 session_id.unwrap_or_else(|| "unknown".into()),
1391 )));
1392 }
1393 return Err(Error::Rpc {
1394 code: err.code,
1395 message: err.message,
1396 });
1397 }
1398 Ok(response.result.unwrap_or(serde_json::Value::Null))
1399 }
1400
1401 pub(crate) async fn send_response(&self, response: &JsonRpcResponse) -> Result<(), Error> {
1403 self.inner.rpc.write(response).await
1404 }
1405
1406 #[expect(dead_code, reason = "reserved for future pub(crate) use")]
1410 pub(crate) fn take_request_rx(&self) -> Option<mpsc::UnboundedReceiver<JsonRpcRequest>> {
1411 self.inner.request_rx.lock().take()
1412 }
1413
1414 pub(crate) fn register_session(
1422 &self,
1423 session_id: &SessionId,
1424 ) -> crate::router::SessionChannels {
1425 self.inner
1426 .router
1427 .ensure_started(&self.inner.notification_tx, &self.inner.request_rx);
1428 self.inner.router.register(session_id)
1429 }
1430
1431 pub(crate) fn unregister_session(&self, session_id: &SessionId) {
1433 self.inner.router.unregister(session_id);
1434 }
1435
1436 pub fn protocol_version(&self) -> Option<u32> {
1443 self.inner.negotiated_protocol_version.get().copied()
1444 }
1445
1446 pub async fn verify_protocol_version(&self) -> Result<(), Error> {
1470 let server_version = match self.connect_handshake().await {
1475 Ok(v) => v,
1476 Err(Error::Rpc { code, .. }) if code == error_codes::METHOD_NOT_FOUND => {
1477 self.ping(None).await?.protocol_version
1478 }
1479 Err(e) => return Err(e),
1480 };
1481
1482 match server_version {
1483 None => {
1484 warn!("CLI server did not report protocolVersion; skipping version check");
1485 }
1486 Some(v) if !(MIN_PROTOCOL_VERSION..=SDK_PROTOCOL_VERSION).contains(&v) => {
1487 return Err(Error::Protocol(ProtocolError::VersionMismatch {
1488 server: v,
1489 min: MIN_PROTOCOL_VERSION,
1490 max: SDK_PROTOCOL_VERSION,
1491 }));
1492 }
1493 Some(v) => {
1494 if let Some(&existing) = self.inner.negotiated_protocol_version.get() {
1495 if existing != v {
1496 return Err(Error::Protocol(ProtocolError::VersionChanged {
1497 previous: existing,
1498 current: v,
1499 }));
1500 }
1501 } else {
1502 let _ = self.inner.negotiated_protocol_version.set(v);
1503 }
1504 }
1505 }
1506
1507 Ok(())
1508 }
1509
1510 async fn connect_handshake(&self) -> Result<Option<u32>, Error> {
1517 let result = self
1518 .rpc()
1519 .connect(crate::generated::api_types::ConnectRequest {
1520 token: self.inner.effective_connection_token.clone(),
1521 })
1522 .await?;
1523 Ok(u32::try_from(result.protocol_version).ok())
1524 }
1525
1526 pub async fn ping(&self, message: Option<&str>) -> Result<crate::types::PingResponse, Error> {
1534 let params = match message {
1535 Some(m) => serde_json::json!({ "message": m }),
1536 None => serde_json::json!({}),
1537 };
1538 let value = self
1539 .call(generated::api_types::rpc_methods::PING, Some(params))
1540 .await?;
1541 Ok(serde_json::from_value(value)?)
1542 }
1543
1544 pub async fn list_sessions(
1547 &self,
1548 filter: Option<SessionListFilter>,
1549 ) -> Result<Vec<SessionMetadata>, Error> {
1550 let params = match filter {
1551 Some(f) => serde_json::json!({ "filter": f }),
1552 None => serde_json::json!({}),
1553 };
1554 let result = self.call("session.list", Some(params)).await?;
1555 let response: ListSessionsResponse = serde_json::from_value(result)?;
1556 Ok(response.sessions)
1557 }
1558
1559 pub async fn get_session_metadata(
1577 &self,
1578 session_id: &SessionId,
1579 ) -> Result<Option<SessionMetadata>, Error> {
1580 let result = self
1581 .call(
1582 "session.getMetadata",
1583 Some(serde_json::json!({ "sessionId": session_id })),
1584 )
1585 .await?;
1586 let response: GetSessionMetadataResponse = serde_json::from_value(result)?;
1587 Ok(response.session)
1588 }
1589
1590 pub async fn delete_session(&self, session_id: &SessionId) -> Result<(), Error> {
1592 self.call(
1593 "session.delete",
1594 Some(serde_json::json!({ "sessionId": session_id })),
1595 )
1596 .await?;
1597 Ok(())
1598 }
1599
1600 pub async fn get_last_session_id(&self) -> Result<Option<SessionId>, Error> {
1616 let result = self
1617 .call("session.getLastId", Some(serde_json::json!({})))
1618 .await?;
1619 let response: GetLastSessionIdResponse = serde_json::from_value(result)?;
1620 Ok(response.session_id)
1621 }
1622
1623 pub async fn get_foreground_session_id(&self) -> Result<Option<SessionId>, Error> {
1628 let result = self
1629 .call("session.getForeground", Some(serde_json::json!({})))
1630 .await?;
1631 let response: GetForegroundSessionResponse = serde_json::from_value(result)?;
1632 Ok(response.session_id)
1633 }
1634
1635 pub async fn set_foreground_session_id(&self, session_id: &SessionId) -> Result<(), Error> {
1640 self.call(
1641 "session.setForeground",
1642 Some(serde_json::json!({ "sessionId": session_id })),
1643 )
1644 .await?;
1645 Ok(())
1646 }
1647
1648 pub async fn get_status(&self) -> Result<GetStatusResponse, Error> {
1650 let result = self.call("status.get", Some(serde_json::json!({}))).await?;
1651 Ok(serde_json::from_value(result)?)
1652 }
1653
1654 pub async fn get_auth_status(&self) -> Result<GetAuthStatusResponse, Error> {
1656 let result = self
1657 .call("auth.getStatus", Some(serde_json::json!({})))
1658 .await?;
1659 Ok(serde_json::from_value(result)?)
1660 }
1661
1662 pub async fn list_models(&self) -> Result<Vec<Model>, Error> {
1667 if let Some(handler) = &self.inner.on_list_models {
1668 return handler.list_models().await;
1669 }
1670 Ok(self.rpc().models().list().await?.models)
1671 }
1672
1673 pub(crate) async fn resolve_trace_context(&self) -> TraceContext {
1676 if let Some(provider) = &self.inner.on_get_trace_context {
1677 provider.get_trace_context().await
1678 } else {
1679 TraceContext::default()
1680 }
1681 }
1682
1683 pub fn pid(&self) -> Option<u32> {
1685 self.inner.child.lock().as_ref().and_then(|c| c.id())
1686 }
1687
1688 pub async fn stop(&self) -> Result<(), StopErrors> {
1714 let pid = self.pid();
1715 info!(pid = ?pid, "stopping CLI process");
1716 let mut errors: Vec<Error> = Vec::new();
1717
1718 for session_id in self.inner.router.session_ids() {
1721 match self
1722 .call(
1723 "session.destroy",
1724 Some(serde_json::json!({ "sessionId": session_id })),
1725 )
1726 .await
1727 {
1728 Ok(_) => {}
1729 Err(e) => {
1730 warn!(
1731 session_id = %session_id,
1732 error = %e,
1733 "session.destroy failed during Client::stop",
1734 );
1735 errors.push(e);
1736 }
1737 }
1738 self.inner.router.unregister(&session_id);
1739 }
1740
1741 let child = self.inner.child.lock().take();
1742 *self.inner.state.lock() = ConnectionState::Disconnected;
1743 if let Some(mut child) = child
1744 && let Err(e) = child.kill().await
1745 {
1746 errors.push(Error::Io(e));
1747 }
1748
1749 info!(pid = ?pid, errors = errors.len(), "CLI process stopped");
1750 if errors.is_empty() {
1751 Ok(())
1752 } else {
1753 Err(StopErrors(errors))
1754 }
1755 }
1756
1757 pub fn force_stop(&self) {
1787 let pid = self.pid();
1788 info!(pid = ?pid, "force-stopping CLI process");
1789 if let Some(mut child) = self.inner.child.lock().take()
1790 && let Err(e) = child.start_kill()
1791 {
1792 error!(pid = ?pid, error = %e, "failed to send kill signal");
1793 }
1794 self.inner.router.clear();
1797 *self.inner.state.lock() = ConnectionState::Disconnected;
1798 }
1799
1800 pub fn subscribe_lifecycle(&self) -> LifecycleSubscription {
1834 LifecycleSubscription::new(self.inner.lifecycle_tx.subscribe())
1835 }
1836
1837 pub fn state(&self) -> ConnectionState {
1844 *self.inner.state.lock()
1845 }
1846}
1847
1848impl Drop for ClientInner {
1849 fn drop(&mut self) {
1850 if let Some(ref mut child) = *self.child.lock() {
1851 let pid = child.id();
1852 if let Err(e) = child.start_kill() {
1853 error!(pid = ?pid, error = %e, "failed to kill CLI process on drop");
1854 } else {
1855 info!(pid = ?pid, "kill signal sent for CLI process on drop");
1856 }
1857 }
1858 }
1859}
1860
1861#[cfg(test)]
1862mod tests {
1863 use super::*;
1864
1865 #[test]
1866 fn is_transport_failure_matches_request_cancelled() {
1867 let err = Error::Protocol(ProtocolError::RequestCancelled);
1868 assert!(err.is_transport_failure());
1869 }
1870
1871 #[test]
1872 fn is_transport_failure_matches_io_error() {
1873 let err = Error::Io(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone"));
1874 assert!(err.is_transport_failure());
1875 }
1876
1877 #[test]
1878 fn is_transport_failure_rejects_rpc_error() {
1879 let err = Error::Rpc {
1880 code: -1,
1881 message: "bad".into(),
1882 };
1883 assert!(!err.is_transport_failure());
1884 }
1885
1886 #[test]
1887 fn is_transport_failure_rejects_session_error() {
1888 let err = Error::Session(SessionError::NotFound("s1".into()));
1889 assert!(!err.is_transport_failure());
1890 }
1891
1892 #[test]
1893 fn client_options_builder_composes() {
1894 let opts = ClientOptions::new()
1895 .with_program(CliProgram::Path(PathBuf::from("/usr/local/bin/copilot")))
1896 .with_prefix_args(["node"])
1897 .with_cwd(PathBuf::from("/tmp"))
1898 .with_env([("KEY", "value")])
1899 .with_env_remove(["UNWANTED"])
1900 .with_extra_args(["--quiet"])
1901 .with_github_token("ghp_test")
1902 .with_use_logged_in_user(false)
1903 .with_log_level(LogLevel::Debug)
1904 .with_session_idle_timeout_seconds(120);
1905 assert!(matches!(opts.program, CliProgram::Path(_)));
1906 assert_eq!(opts.prefix_args, vec![std::ffi::OsString::from("node")]);
1907 assert_eq!(opts.cwd, PathBuf::from("/tmp"));
1908 assert_eq!(
1909 opts.env,
1910 vec![(
1911 std::ffi::OsString::from("KEY"),
1912 std::ffi::OsString::from("value")
1913 )]
1914 );
1915 assert_eq!(opts.env_remove, vec![std::ffi::OsString::from("UNWANTED")]);
1916 assert_eq!(opts.extra_args, vec!["--quiet".to_string()]);
1917 assert_eq!(opts.github_token.as_deref(), Some("ghp_test"));
1918 assert_eq!(opts.use_logged_in_user, Some(false));
1919 assert!(matches!(opts.log_level, Some(LogLevel::Debug)));
1920 assert_eq!(opts.session_idle_timeout_seconds, Some(120));
1921 }
1922
1923 #[test]
1924 fn is_transport_failure_rejects_other_protocol_errors() {
1925 let err = Error::Protocol(ProtocolError::CliStartupTimeout);
1926 assert!(!err.is_transport_failure());
1927 }
1928
1929 #[test]
1930 fn build_command_lets_env_remove_strip_injected_token() {
1931 let opts = ClientOptions {
1932 github_token: Some("secret".to_string()),
1933 env_remove: vec![std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN")],
1934 ..Default::default()
1935 };
1936 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
1937 let action = cmd
1939 .as_std()
1940 .get_envs()
1941 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
1942 .map(|(_, v)| v);
1943 assert_eq!(
1944 action,
1945 Some(None),
1946 "env_remove should win over github_token"
1947 );
1948 }
1949
1950 #[test]
1951 fn build_command_lets_env_override_injected_token() {
1952 let opts = ClientOptions {
1953 github_token: Some("from-options".to_string()),
1954 env: vec![(
1955 std::ffi::OsString::from("COPILOT_SDK_AUTH_TOKEN"),
1956 std::ffi::OsString::from("from-env"),
1957 )],
1958 ..Default::default()
1959 };
1960 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
1961 let value = cmd
1962 .as_std()
1963 .get_envs()
1964 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
1965 .and_then(|(_, v)| v);
1966 assert_eq!(value, Some(std::ffi::OsStr::new("from-env")));
1967 }
1968
1969 #[test]
1970 fn build_command_injects_github_token_by_default() {
1971 let opts = ClientOptions {
1972 github_token: Some("just-the-token".to_string()),
1973 ..Default::default()
1974 };
1975 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
1976 let value = cmd
1977 .as_std()
1978 .get_envs()
1979 .find(|(k, _)| *k == std::ffi::OsStr::new("COPILOT_SDK_AUTH_TOKEN"))
1980 .and_then(|(_, v)| v);
1981 assert_eq!(value, Some(std::ffi::OsStr::new("just-the-token")));
1982 }
1983
1984 fn env_value<'a>(cmd: &'a tokio::process::Command, key: &str) -> Option<&'a std::ffi::OsStr> {
1985 cmd.as_std()
1986 .get_envs()
1987 .find(|(k, _)| *k == std::ffi::OsStr::new(key))
1988 .and_then(|(_, v)| v)
1989 }
1990
1991 #[test]
1992 fn telemetry_config_builder_composes() {
1993 let cfg = TelemetryConfig::new()
1994 .with_otlp_endpoint("http://collector:4318")
1995 .with_file_path(PathBuf::from("/var/log/copilot.jsonl"))
1996 .with_exporter_type(OtelExporterType::OtlpHttp)
1997 .with_source_name("my-app")
1998 .with_capture_content(true);
1999
2000 assert_eq!(cfg.otlp_endpoint.as_deref(), Some("http://collector:4318"));
2001 assert_eq!(
2002 cfg.file_path.as_deref(),
2003 Some(Path::new("/var/log/copilot.jsonl")),
2004 );
2005 assert_eq!(cfg.exporter_type, Some(OtelExporterType::OtlpHttp));
2006 assert_eq!(cfg.source_name.as_deref(), Some("my-app"));
2007 assert_eq!(cfg.capture_content, Some(true));
2008 assert!(!cfg.is_empty());
2009 assert!(TelemetryConfig::new().is_empty());
2010 }
2011
2012 #[test]
2013 fn build_command_sets_otel_env_when_telemetry_enabled() {
2014 let opts = ClientOptions {
2015 telemetry: Some(TelemetryConfig {
2016 otlp_endpoint: Some("http://collector:4318".to_string()),
2017 file_path: Some(PathBuf::from("/var/log/copilot.jsonl")),
2018 exporter_type: Some(OtelExporterType::OtlpHttp),
2019 source_name: Some("my-app".to_string()),
2020 capture_content: Some(true),
2021 }),
2022 ..Default::default()
2023 };
2024 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2025 assert_eq!(
2026 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2027 Some(std::ffi::OsStr::new("true")),
2028 );
2029 assert_eq!(
2030 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2031 Some(std::ffi::OsStr::new("http://collector:4318")),
2032 );
2033 assert_eq!(
2034 env_value(&cmd, "COPILOT_OTEL_FILE_EXPORTER_PATH"),
2035 Some(std::ffi::OsStr::new("/var/log/copilot.jsonl")),
2036 );
2037 assert_eq!(
2038 env_value(&cmd, "COPILOT_OTEL_EXPORTER_TYPE"),
2039 Some(std::ffi::OsStr::new("otlp-http")),
2040 );
2041 assert_eq!(
2042 env_value(&cmd, "COPILOT_OTEL_SOURCE_NAME"),
2043 Some(std::ffi::OsStr::new("my-app")),
2044 );
2045 assert_eq!(
2046 env_value(&cmd, "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"),
2047 Some(std::ffi::OsStr::new("true")),
2048 );
2049 }
2050
2051 #[test]
2052 fn build_command_omits_otel_env_when_telemetry_none() {
2053 let opts = ClientOptions::default();
2054 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2055 for key in [
2056 "COPILOT_OTEL_ENABLED",
2057 "OTEL_EXPORTER_OTLP_ENDPOINT",
2058 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2059 "COPILOT_OTEL_EXPORTER_TYPE",
2060 "COPILOT_OTEL_SOURCE_NAME",
2061 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2062 ] {
2063 assert!(
2064 env_value(&cmd, key).is_none(),
2065 "expected {key} to be unset when telemetry is None",
2066 );
2067 }
2068 }
2069
2070 #[test]
2071 fn build_command_omits_unset_telemetry_fields() {
2072 let opts = ClientOptions {
2073 telemetry: Some(TelemetryConfig {
2074 otlp_endpoint: Some("http://collector:4318".to_string()),
2075 ..Default::default()
2076 }),
2077 ..Default::default()
2078 };
2079 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2080 assert_eq!(
2082 env_value(&cmd, "COPILOT_OTEL_ENABLED"),
2083 Some(std::ffi::OsStr::new("true")),
2084 );
2085 assert_eq!(
2086 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2087 Some(std::ffi::OsStr::new("http://collector:4318")),
2088 );
2089 for key in [
2091 "COPILOT_OTEL_FILE_EXPORTER_PATH",
2092 "COPILOT_OTEL_EXPORTER_TYPE",
2093 "COPILOT_OTEL_SOURCE_NAME",
2094 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT",
2095 ] {
2096 assert!(env_value(&cmd, key).is_none(), "{key} should be unset");
2097 }
2098 }
2099
2100 #[test]
2101 fn build_command_lets_user_env_override_telemetry() {
2102 let opts = ClientOptions {
2103 telemetry: Some(TelemetryConfig {
2104 otlp_endpoint: Some("http://from-config:4318".to_string()),
2105 ..Default::default()
2106 }),
2107 env: vec![(
2108 std::ffi::OsString::from("OTEL_EXPORTER_OTLP_ENDPOINT"),
2109 std::ffi::OsString::from("http://from-user-env:4318"),
2110 )],
2111 ..Default::default()
2112 };
2113 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2114 assert_eq!(
2115 env_value(&cmd, "OTEL_EXPORTER_OTLP_ENDPOINT"),
2116 Some(std::ffi::OsStr::new("http://from-user-env:4318")),
2117 "user-supplied options.env should override telemetry config",
2118 );
2119 }
2120
2121 #[test]
2122 fn build_command_sets_copilot_home_env_when_configured() {
2123 let opts = ClientOptions::new().with_copilot_home(PathBuf::from("/custom/copilot"));
2124 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2125 assert_eq!(
2126 env_value(&cmd, "COPILOT_HOME"),
2127 Some(std::ffi::OsStr::new("/custom/copilot")),
2128 );
2129
2130 let opts = ClientOptions::default();
2131 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2132 assert!(env_value(&cmd, "COPILOT_HOME").is_none());
2133 }
2134
2135 #[test]
2136 fn build_command_sets_connection_token_env_when_configured() {
2137 let opts = ClientOptions::new().with_tcp_connection_token("secret-token");
2138 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2139 assert_eq!(
2140 env_value(&cmd, "COPILOT_CONNECTION_TOKEN"),
2141 Some(std::ffi::OsStr::new("secret-token")),
2142 );
2143
2144 let opts = ClientOptions::default();
2145 let cmd = Client::build_command(Path::new("/bin/echo"), &opts);
2146 assert!(env_value(&cmd, "COPILOT_CONNECTION_TOKEN").is_none());
2147 }
2148
2149 #[tokio::test]
2150 async fn start_rejects_token_with_stdio_transport() {
2151 let opts = ClientOptions::new()
2152 .with_tcp_connection_token("token-123")
2153 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2154 let err = Client::start(opts).await.unwrap_err();
2155 assert!(matches!(err, Error::InvalidConfig(_)), "got {err:?}");
2156 let Error::InvalidConfig(msg) = err else {
2157 unreachable!()
2158 };
2159 assert!(
2160 msg.contains("Stdio"),
2161 "error should explain the stdio incompatibility: {msg}"
2162 );
2163 }
2164
2165 #[tokio::test]
2166 async fn start_rejects_empty_connection_token() {
2167 let opts = ClientOptions::new()
2168 .with_tcp_connection_token("")
2169 .with_transport(Transport::Tcp { port: 0 })
2170 .with_program(CliProgram::Path(PathBuf::from("/bin/echo")));
2171 let err = Client::start(opts).await.unwrap_err();
2172 assert!(matches!(err, Error::InvalidConfig(_)), "got {err:?}");
2173 }
2174
2175 #[test]
2176 fn telemetry_config_capture_content_serializes_as_lowercase_bool() {
2177 let opts_true = ClientOptions {
2178 telemetry: Some(TelemetryConfig {
2179 capture_content: Some(true),
2180 ..Default::default()
2181 }),
2182 ..Default::default()
2183 };
2184 let opts_false = ClientOptions {
2185 telemetry: Some(TelemetryConfig {
2186 capture_content: Some(false),
2187 ..Default::default()
2188 }),
2189 ..Default::default()
2190 };
2191 let cmd_true = Client::build_command(Path::new("/bin/echo"), &opts_true);
2192 let cmd_false = Client::build_command(Path::new("/bin/echo"), &opts_false);
2193 assert_eq!(
2194 env_value(
2195 &cmd_true,
2196 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2197 ),
2198 Some(std::ffi::OsStr::new("true")),
2199 );
2200 assert_eq!(
2201 env_value(
2202 &cmd_false,
2203 "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
2204 ),
2205 Some(std::ffi::OsStr::new("false")),
2206 );
2207 }
2208
2209 #[test]
2210 fn session_idle_timeout_args_are_omitted_by_default() {
2211 let opts = ClientOptions::default();
2212 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2213 }
2214
2215 #[test]
2216 fn session_idle_timeout_args_omitted_for_zero() {
2217 let opts = ClientOptions {
2218 session_idle_timeout_seconds: Some(0),
2219 ..Default::default()
2220 };
2221 assert!(Client::session_idle_timeout_args(&opts).is_empty());
2222 }
2223
2224 #[test]
2225 fn session_idle_timeout_args_emit_flag_for_positive_value() {
2226 let opts = ClientOptions {
2227 session_idle_timeout_seconds: Some(300),
2228 ..Default::default()
2229 };
2230 assert_eq!(
2231 Client::session_idle_timeout_args(&opts),
2232 vec!["--session-idle-timeout".to_string(), "300".to_string()]
2233 );
2234 }
2235
2236 #[test]
2237 fn log_level_str_round_trips() {
2238 for level in [
2239 LogLevel::None,
2240 LogLevel::Error,
2241 LogLevel::Warning,
2242 LogLevel::Info,
2243 LogLevel::Debug,
2244 LogLevel::All,
2245 ] {
2246 let s = level.as_str();
2247 let json = serde_json::to_string(&level).unwrap();
2248 assert_eq!(json, format!("\"{s}\""));
2249 let parsed: LogLevel = serde_json::from_str(&json).unwrap();
2250 assert_eq!(parsed, level);
2251 }
2252 }
2253
2254 #[test]
2255 fn client_options_debug_redacts_handler() {
2256 struct StubHandler;
2257 #[async_trait]
2258 impl ListModelsHandler for StubHandler {
2259 async fn list_models(&self) -> Result<Vec<Model>, Error> {
2260 Ok(vec![])
2261 }
2262 }
2263 let opts = ClientOptions {
2264 on_list_models: Some(Arc::new(StubHandler)),
2265 github_token: Some("secret-token".into()),
2266 ..Default::default()
2267 };
2268 let debug = format!("{opts:?}");
2269 assert!(debug.contains("on_list_models: Some(\"<set>\")"));
2270 assert!(debug.contains("github_token: Some(\"<redacted>\")"));
2271 assert!(!debug.contains("secret-token"));
2272 }
2273
2274 #[tokio::test]
2275 async fn list_models_uses_on_list_models_handler_when_set() {
2276 use std::sync::atomic::{AtomicUsize, Ordering};
2277
2278 struct CountingHandler {
2279 calls: Arc<AtomicUsize>,
2280 models: Vec<Model>,
2281 }
2282 #[async_trait]
2283 impl ListModelsHandler for CountingHandler {
2284 async fn list_models(&self) -> Result<Vec<Model>, Error> {
2285 self.calls.fetch_add(1, Ordering::SeqCst);
2286 Ok(self.models.clone())
2287 }
2288 }
2289
2290 let calls = Arc::new(AtomicUsize::new(0));
2291 let model = Model {
2292 billing: None,
2293 capabilities: ModelCapabilities {
2294 limits: None,
2295 supports: None,
2296 },
2297 default_reasoning_effort: None,
2298 id: "byok-gpt-4".into(),
2299 name: "BYOK GPT-4".into(),
2300 policy: None,
2301 supported_reasoning_efforts: Vec::new(),
2302 };
2303 let handler = Arc::new(CountingHandler {
2304 calls: Arc::clone(&calls),
2305 models: vec![model.clone()],
2306 });
2307
2308 let inner = ClientInner {
2313 child: parking_lot::Mutex::new(None),
2314 rpc: {
2315 let (req_tx, _req_rx) = mpsc::unbounded_channel();
2316 let (notif_tx, _notif_rx) = broadcast::channel(16);
2317 let (read_pipe, _write_pipe) = tokio::io::duplex(64);
2318 let (_unused_read, write_pipe) = tokio::io::duplex(64);
2319 JsonRpcClient::new(write_pipe, read_pipe, notif_tx, req_tx)
2320 },
2321 cwd: PathBuf::from("."),
2322 request_rx: parking_lot::Mutex::new(None),
2323 notification_tx: broadcast::channel(16).0,
2324 router: router::SessionRouter::new(),
2325 negotiated_protocol_version: OnceLock::new(),
2326 state: parking_lot::Mutex::new(ConnectionState::Connected),
2327 lifecycle_tx: broadcast::channel(16).0,
2328 on_list_models: Some(handler),
2329 session_fs_configured: false,
2330 on_get_trace_context: None,
2331 effective_connection_token: None,
2332 };
2333 let client = Client {
2334 inner: Arc::new(inner),
2335 };
2336
2337 let result = client.list_models().await.unwrap();
2338 assert_eq!(result.len(), 1);
2339 assert_eq!(result[0].id, "byok-gpt-4");
2340 assert_eq!(calls.load(Ordering::SeqCst), 1);
2341 }
2342}