1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::canvas::CanvasHandler;
14use crate::generated::api_types::{LogRequest, ModelSwitchToRequest, OpenCanvasInstance};
15use crate::generated::session_events::{
16 CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData,
17 SessionCanvasClosedData, SessionErrorData, SessionEventType,
18};
19use crate::handler::{
20 AutoModeSwitchHandler, AutoModeSwitchResponse, ElicitationHandler, ExitPlanModeHandler,
21 PermissionHandler, PermissionResult, UserInputHandler, UserInputResponse,
22};
23use crate::hooks::SessionHooks;
24use crate::provider_token::BearerTokenProvider;
25use crate::session_fs::SessionFsProvider;
26use crate::trace_context::inject_trace_context;
27use crate::transforms::SystemMessageTransform;
28use crate::types::{
29 CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
30 ElicitationResult, ExitPlanModeData, GetMessagesResponse, MessageOptions,
31 PermissionRequestData, RequestId, ResumeSessionConfig, ResumeSessionResult, SectionOverride,
32 SessionCapabilities, SessionConfig, SessionEvent, SessionId, SetModelOptions,
33 SystemMessageConfig, ToolInvocation, ToolResult, ToolResultExpanded, TraceContext,
34 UiInputOptions, ensure_attachment_display_names,
35};
36use crate::{
37 Client, Error, ErrorKind, JsonRpcResponse, SessionErrorKind, SessionEventNotification,
38 error_codes,
39};
40
41#[derive(Clone)]
49pub(crate) struct SessionHandlers {
50 pub permission: Option<Arc<dyn PermissionHandler>>,
51 pub elicitation: Option<Arc<dyn ElicitationHandler>>,
52 pub user_input: Option<Arc<dyn UserInputHandler>>,
53 pub exit_plan_mode: Option<Arc<dyn ExitPlanModeHandler>>,
54 pub auto_mode_switch: Option<Arc<dyn AutoModeSwitchHandler>>,
55 pub tools: Arc<HashMap<String, Arc<dyn crate::tool::ToolHandler>>>,
56}
57
58struct IdleWaiter {
60 tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
61 last_assistant_message: Option<SessionEvent>,
62 started_at: Instant,
63 first_assistant_message_seen: bool,
64}
65
66struct WaiterGuard {
78 slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
79}
80
81impl Drop for WaiterGuard {
82 fn drop(&mut self) {
83 self.slot.lock().take();
84 }
85}
86
87struct PendingSessionRegistration {
88 client: Client,
89 session_id: SessionId,
90 shutdown: CancellationToken,
91 disarmed: bool,
92}
93
94impl PendingSessionRegistration {
95 fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
96 Self {
97 client,
98 session_id,
99 shutdown,
100 disarmed: false,
101 }
102 }
103
104 async fn cleanup(mut self, event_loop: JoinHandle<()>) {
105 self.shutdown.cancel();
106 let _ = event_loop.await;
107 self.client.unregister_session(&self.session_id);
108 self.disarmed = true;
109 }
110
111 fn disarm(&mut self) {
112 self.disarmed = true;
113 }
114}
115
116impl Drop for PendingSessionRegistration {
117 fn drop(&mut self) {
118 if !self.disarmed {
119 self.shutdown.cancel();
120 self.client.unregister_session(&self.session_id);
121 }
122 }
123}
124
125pub struct Session {
138 id: SessionId,
139 cwd: PathBuf,
140 workspace_path: Option<PathBuf>,
141 remote_url: Option<String>,
142 client: Client,
143 event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
148 shutdown: CancellationToken,
162 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
169 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
171 open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
173 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
175}
176
177impl Session {
178 pub fn id(&self) -> &SessionId {
180 &self.id
181 }
182
183 pub fn cwd(&self) -> &PathBuf {
185 &self.cwd
186 }
187
188 pub fn workspace_path(&self) -> Option<&Path> {
190 self.workspace_path.as_deref()
191 }
192
193 pub fn remote_url(&self) -> Option<&str> {
195 self.remote_url.as_deref()
196 }
197
198 pub fn capabilities(&self) -> SessionCapabilities {
203 self.capabilities.read().clone()
204 }
205
206 pub fn open_canvases(&self) -> Vec<OpenCanvasInstance> {
209 self.open_canvases.read().clone()
210 }
211
212 pub fn cancellation_token(&self) -> CancellationToken {
239 self.shutdown.child_token()
240 }
241
242 pub fn subscribe(&self) -> crate::subscription::EventSubscription {
281 crate::subscription::EventSubscription::new(self.event_tx.subscribe())
282 }
283
284 pub fn client(&self) -> &Client {
286 &self.client
287 }
288
289 pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
300 crate::generated::rpc::SessionRpc { session: self }
301 }
302
303 pub async fn stop_event_loop(&self) {
311 self.shutdown.cancel();
312 let handle = self.event_loop.lock().take();
313 if let Some(handle) = handle {
314 let _ = handle.await;
315 }
316 if let Some(waiter) = self.idle_waiter.lock().take() {
318 let _ = waiter.tx.send(Err(
319 ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()
320 ));
321 }
322 }
323
324 pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
349 if self.idle_waiter.lock().is_some() {
350 return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
351 }
352 self.send_inner(opts.into()).await
353 }
354
355 async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
356 let mut params = serde_json::json!({
357 "sessionId": self.id,
358 "prompt": opts.prompt,
359 });
360 if let Some(m) = opts.mode {
361 params["mode"] = serde_json::to_value(m)?;
362 }
363 if let Some(am) = opts.agent_mode {
364 params["agentMode"] = serde_json::to_value(am)?;
365 }
366 if let Some(mut a) = opts.attachments {
367 ensure_attachment_display_names(&mut a);
368 params["attachments"] = serde_json::to_value(a)?;
369 }
370 if let Some(headers) = opts.request_headers
371 && !headers.is_empty()
372 {
373 params["requestHeaders"] = serde_json::to_value(headers)?;
374 }
375 if let Some(display_prompt) = opts.display_prompt {
376 params["displayPrompt"] = serde_json::to_value(display_prompt)?;
377 }
378 let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
379 TraceContext {
380 traceparent: opts.traceparent,
381 tracestate: opts.tracestate,
382 }
383 } else {
384 self.client.resolve_trace_context().await
385 };
386 inject_trace_context(&mut params, &trace_ctx);
387 let rpc_start = Instant::now();
388 let result = self.client.call("session.send", Some(params)).await?;
389 let message_id = result
390 .get("messageId")
391 .and_then(|v| v.as_str())
392 .map(|s| s.to_string())
393 .unwrap_or_default();
394 tracing::debug!(
395 elapsed_ms = rpc_start.elapsed().as_millis(),
396 session_id = %self.id,
397 message_id = %message_id,
398 "Session::send completed successfully"
399 );
400 Ok(message_id)
401 }
402
403 pub async fn send_and_wait(
423 &self,
424 opts: impl Into<MessageOptions>,
425 ) -> Result<Option<SessionEvent>, Error> {
426 let total_start = Instant::now();
427 let opts = opts.into();
428 let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
429 let (tx, rx) = oneshot::channel();
430
431 {
432 let mut guard = self.idle_waiter.lock();
433 if guard.is_some() {
434 return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
435 }
436 *guard = Some(IdleWaiter {
437 tx,
438 last_assistant_message: None,
439 started_at: total_start,
440 first_assistant_message_seen: false,
441 });
442 }
443
444 let _waiter_guard = WaiterGuard {
449 slot: self.idle_waiter.clone(),
450 };
451
452 let result = tokio::time::timeout(timeout_duration, async {
453 self.send_inner(opts).await?;
454 match rx.await {
455 Ok(result) => result,
456 Err(_) => Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()),
457 }
458 })
459 .await;
460
461 match result {
462 Ok(inner) => {
463 tracing::debug!(
464 elapsed_ms = total_start.elapsed().as_millis(),
465 session_id = %self.id,
466 completed_by = if inner.is_ok() { "idle" } else { "error" },
467 "Session::send_and_wait complete"
468 );
469 inner
470 }
471 Err(_) => {
472 tracing::warn!(
473 elapsed_ms = total_start.elapsed().as_millis(),
474 session_id = %self.id,
475 completed_by = "timeout",
476 "Session::send_and_wait failed"
477 );
478 Err(ErrorKind::Session(SessionErrorKind::Timeout(timeout_duration)).into())
479 }
480 }
481 }
482
483 pub async fn get_events(&self) -> Result<Vec<SessionEvent>, Error> {
485 let result = self
486 .client
487 .call(
488 "session.getMessages",
489 Some(serde_json::json!({ "sessionId": self.id })),
490 )
491 .await?;
492 let response: GetMessagesResponse = serde_json::from_value(result)?;
493 Ok(response.events)
494 }
495
496 #[deprecated(since = "0.1.0", note = "Use `get_events()` instead")]
498 pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
499 self.get_events().await
500 }
501
502 pub async fn abort(&self) -> Result<(), Error> {
510 self.client
511 .call(
512 "session.abort",
513 Some(serde_json::json!({ "sessionId": self.id })),
514 )
515 .await?;
516 Ok(())
517 }
518
519 pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
523 let opts = opts.unwrap_or_default();
524 let request = ModelSwitchToRequest {
525 model_id: model.to_string(),
526 reasoning_effort: opts.reasoning_effort,
527 reasoning_summary: opts.reasoning_summary,
528 context_tier: opts.context_tier,
529 model_capabilities: opts.model_capabilities,
530 };
531 self.rpc().model().switch_to(request).await?;
532 Ok(())
533 }
534
535 pub async fn disconnect(&self) -> Result<(), Error> {
552 self.client
553 .call(
554 "session.destroy",
555 Some(serde_json::json!({ "sessionId": self.id })),
556 )
557 .await?;
558 self.stop_event_loop().await;
559 self.client.unregister_session(&self.id);
560 Ok(())
561 }
562
563 #[deprecated(since = "0.1.0", note = "Use `disconnect()` instead")]
568 pub async fn destroy(&self) -> Result<(), Error> {
569 self.disconnect().await
570 }
571
572 pub async fn log(
576 &self,
577 message: &str,
578 opts: Option<crate::types::LogOptions>,
579 ) -> Result<(), Error> {
580 let opts = opts.unwrap_or_default();
581 let level = match opts.level {
582 Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
583 None => None,
584 };
585 let request = LogRequest {
586 message: message.to_string(),
587 level,
588 ephemeral: opts.ephemeral,
589 r#type: None,
590 tip: None,
591 url: None,
592 };
593 self.rpc().log(request).await?;
594 Ok(())
595 }
596
597 pub fn ui(&self) -> SessionUi<'_> {
603 SessionUi { session: self }
604 }
605
606 fn assert_elicitation(&self) -> Result<(), Error> {
608 if self
609 .capabilities
610 .read()
611 .ui
612 .as_ref()
613 .and_then(|u| u.elicitation)
614 != Some(true)
615 {
616 return Err(ErrorKind::Session(SessionErrorKind::ElicitationNotSupported).into());
617 }
618 Ok(())
619 }
620}
621
622impl Drop for Session {
623 fn drop(&mut self) {
624 self.shutdown.cancel();
636 self.client.unregister_session(&self.id);
637 }
638}
639
640pub struct SessionUi<'a> {
647 session: &'a Session,
648}
649
650impl<'a> SessionUi<'a> {
651 pub async fn elicitation(
659 &self,
660 message: &str,
661 schema: Value,
662 ) -> Result<ElicitationResult, Error> {
663 self.session.assert_elicitation()?;
664 let result = self
665 .session
666 .client
667 .call(
668 "session.ui.elicitation",
669 Some(serde_json::json!({
670 "sessionId": self.session.id,
671 "message": message,
672 "requestedSchema": schema,
673 })),
674 )
675 .await?;
676 let elicitation: ElicitationResult = serde_json::from_value(result)?;
677 Ok(elicitation)
678 }
679
680 pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
684 self.session.assert_elicitation()?;
685 let schema = serde_json::json!({
686 "type": "object",
687 "properties": {
688 "confirmed": {
689 "type": "boolean",
690 "default": true,
691 }
692 },
693 "required": ["confirmed"]
694 });
695 let result = self.elicitation(message, schema).await?;
696 Ok(result.action == "accept"
697 && result
698 .content
699 .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
700 == Some(true))
701 }
702
703 pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
707 self.session.assert_elicitation()?;
708 let schema = serde_json::json!({
709 "type": "object",
710 "properties": {
711 "selection": {
712 "type": "string",
713 "enum": options,
714 }
715 },
716 "required": ["selection"]
717 });
718 let result = self.elicitation(message, schema).await?;
719 if result.action != "accept" {
720 return Ok(None);
721 }
722 let selection = result.content.and_then(|c| {
723 c.get("selection")
724 .and_then(|v| v.as_str())
725 .map(String::from)
726 });
727 Ok(selection)
728 }
729
730 pub async fn input(
735 &self,
736 message: &str,
737 options: Option<&UiInputOptions<'_>>,
738 ) -> Result<Option<String>, Error> {
739 self.session.assert_elicitation()?;
740 let mut field = serde_json::json!({ "type": "string" });
741 if let Some(opts) = options {
742 if let Some(title) = opts.title {
743 field["title"] = Value::String(title.to_string());
744 }
745 if let Some(desc) = opts.description {
746 field["description"] = Value::String(desc.to_string());
747 }
748 if let Some(min) = opts.min_length {
749 field["minLength"] = Value::Number(min.into());
750 }
751 if let Some(max) = opts.max_length {
752 field["maxLength"] = Value::Number(max.into());
753 }
754 if let Some(fmt) = &opts.format {
755 field["format"] = Value::String(fmt.as_str().to_string());
756 }
757 if let Some(default) = opts.default {
758 field["default"] = Value::String(default.to_string());
759 }
760 }
761 let schema = serde_json::json!({
762 "type": "object",
763 "properties": { "value": field },
764 "required": ["value"]
765 });
766 let result = self.elicitation(message, schema).await?;
767 if result.action != "accept" {
768 return Ok(None);
769 }
770 let value = result
771 .content
772 .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
773 Ok(value)
774 }
775}
776
777impl Client {
778 pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
800 let total_start = Instant::now();
801 let caller_session_id = config.session_id.clone();
809 let use_server_generated_id = config.cloud.is_some() && caller_session_id.is_none();
810 let local_session_id: Option<SessionId> = if use_server_generated_id {
811 None
812 } else {
813 Some(
814 caller_session_id
815 .clone()
816 .unwrap_or_else(|| SessionId::new(uuid::Uuid::new_v4().to_string())),
817 )
818 };
819 if config.hooks_handler.is_some() && config.hooks.is_none() {
820 config.hooks = Some(true);
821 }
822 if let Some(transforms) = config.system_message_transform.clone() {
823 inject_transform_sections(&mut config, transforms.as_ref());
824 }
825 let mode = self.inner.mode;
826 if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
827 return Err(Error::with_message(
828 ErrorKind::InvalidConfig,
829 "ClientMode::Empty requires available_tools to be set on the session config. \
830 Use ToolSet to specify which tools the session may use (e.g. \
831 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
832 ));
833 }
834 crate::mode::validate_tool_filter_list(
835 "available_tools",
836 config.available_tools.as_deref(),
837 )?;
838 crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
839 config.system_message =
840 crate::mode::system_message_for_mode(mode, config.system_message.take());
841 config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
842 if mode == crate::ClientMode::Empty {
843 if config.enable_session_telemetry.is_none() {
844 config.enable_session_telemetry = Some(false);
845 }
846 if config.skip_embedding_retrieval.is_none() {
847 config.skip_embedding_retrieval = Some(true);
848 }
849 if config.enable_on_demand_instruction_discovery.is_none() {
850 config.enable_on_demand_instruction_discovery = Some(false);
851 }
852 if config.enable_file_hooks.is_none() {
853 config.enable_file_hooks = Some(false);
854 }
855 if config.enable_host_git_operations.is_none() {
856 config.enable_host_git_operations = Some(false);
857 }
858 if config.enable_session_store.is_none() {
859 config.enable_session_store = Some(false);
860 }
861 if config.enable_skills.is_none() {
862 config.enable_skills = Some(false);
863 }
864 }
865 if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
866 config.mcp_oauth_token_storage = Some("in-memory".into());
867 }
868 if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
869 config.embedding_cache_storage = Some("in-memory".into());
870 }
871 let opt_skip_custom_instructions = config.skip_custom_instructions;
872 let opt_custom_agents_local_only = config.custom_agents_local_only;
873 let opt_coauthor_enabled = config.coauthor_enabled;
874 let opt_manage_schedule_enabled = config.manage_schedule_enabled;
875 let (wire, mut runtime) = config.into_wire(local_session_id.clone())?;
876
877 let permission_handler = crate::permission::resolve_handler(
878 runtime.permission_handler.take(),
879 runtime.permission_policy.take(),
880 );
881 let handlers = SessionHandlers {
882 permission: permission_handler,
883 elicitation: runtime.elicitation_handler.take(),
884 user_input: runtime.user_input_handler.take(),
885 exit_plan_mode: runtime.exit_plan_mode_handler.take(),
886 auto_mode_switch: runtime.auto_mode_switch_handler.take(),
887 tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
888 };
889 let hooks = runtime.hooks_handler.take();
890 let transforms = runtime.system_message_transform.take();
891 let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
892 let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
893 let has_hooks = hooks.is_some();
894 let command_handlers = build_command_handler_map(runtime.commands.as_deref());
895 let canvas_handler = runtime.canvas_handler.take();
896 let session_fs_provider = runtime.session_fs_provider.take();
897 let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
898 if self.inner.session_fs_configured && session_fs_provider.is_none() {
899 return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
900 }
901 if self.inner.session_fs_sqlite_declared
902 && let Some(ref provider) = session_fs_provider
903 && provider.sqlite().is_none()
904 {
905 return Err(Error::with_message(
906 ErrorKind::InvalidConfig,
907 "SessionFs capabilities declare SQLite support but the provider \
908 does not implement SessionFsSqliteProvider",
909 ));
910 }
911
912 let mut params = serde_json::to_value(&wire)?;
913 let trace_ctx = self.resolve_trace_context().await;
914 inject_trace_context(&mut params, &trace_ctx);
915
916 let setup_start = Instant::now();
917 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
918 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
919 let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
920 let shutdown = CancellationToken::new();
921 let (event_tx, _) = tokio::sync::broadcast::channel(512);
922
923 let inline_stash: Arc<
929 ParkingLotMutex<Option<(SessionId, crate::router::SessionChannels)>>,
930 > = Arc::new(ParkingLotMutex::new(None));
931
932 let inline_callback: Option<crate::jsonrpc::InlineResponseCallback> = if let Some(ref sid) =
933 local_session_id
934 {
935 let channels = self.register_session(sid);
936 *inline_stash.lock() = Some((sid.clone(), channels));
937 None
938 } else {
939 let client = self.clone();
940 let stash = inline_stash.clone();
941 let expected = caller_session_id.clone();
942 Some(Box::new(move |response| {
943 let result = response.result.as_ref().ok_or_else(|| {
944 Error::with_message(ErrorKind::Json, "session.create response had no result")
945 })?;
946 let parsed: CreateSessionResult =
947 serde_json::from_value(result.clone()).map_err(Error::from)?;
948 if let Some(requested) = expected.as_ref()
949 && parsed.session_id != *requested
950 {
951 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
952 requested: requested.clone(),
953 returned: parsed.session_id,
954 })
955 .into());
956 }
957 let channels = client.register_session(&parsed.session_id);
958 *stash.lock() = Some((parsed.session_id, channels));
959 Ok(())
960 }))
961 };
962
963 let rpc_start = Instant::now();
964 let result = match self
965 .call_with_inline_callback("session.create", Some(params), inline_callback)
966 .await
967 {
968 Ok(result) => result,
969 Err(error) => {
970 if let Some((id, _channels)) = inline_stash.lock().take() {
971 self.unregister_session(&id);
972 }
973 return Err(error);
974 }
975 };
976 tracing::debug!(
977 elapsed_ms = rpc_start.elapsed().as_millis(),
978 "Client::create_session session creation request completed successfully"
979 );
980 let create_result: CreateSessionResult = match serde_json::from_value(result) {
981 Ok(result) => result,
982 Err(error) => {
983 if let Some((id, _channels)) = inline_stash.lock().take() {
984 self.unregister_session(&id);
985 }
986 return Err(error.into());
987 }
988 };
989
990 if let Some(ref requested) = local_session_id
991 && create_result.session_id != *requested
992 {
993 if let Some((id, _channels)) = inline_stash.lock().take() {
994 self.unregister_session(&id);
995 }
996 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
997 requested: requested.clone(),
998 returned: create_result.session_id.clone(),
999 })
1000 .into());
1001 }
1002
1003 let (session_id, channels) = inline_stash
1004 .lock()
1005 .take()
1006 .expect("session registration must have populated stash on success");
1007 let event_loop = spawn_event_loop(
1008 session_id.clone(),
1009 self.clone(),
1010 handlers,
1011 hooks,
1012 transforms,
1013 command_handlers,
1014 canvas_handler,
1015 session_fs_provider,
1016 bearer_token_providers,
1017 channels,
1018 idle_waiter.clone(),
1019 capabilities.clone(),
1020 open_canvases.clone(),
1021 event_tx.clone(),
1022 shutdown.clone(),
1023 );
1024 tracing::debug!(
1025 elapsed_ms = setup_start.elapsed().as_millis(),
1026 session_id = %session_id,
1027 tools_count,
1028 commands_count,
1029 has_hooks,
1030 "Client::create_session local setup complete"
1031 );
1032 *capabilities.write() = create_result.capabilities.unwrap_or_default();
1033
1034 tracing::debug!(
1035 elapsed_ms = total_start.elapsed().as_millis(),
1036 session_id = %session_id,
1037 "Client::create_session complete"
1038 );
1039 let session = Session {
1040 id: session_id,
1041 cwd: self.cwd().clone(),
1042 workspace_path: create_result.workspace_path,
1043 remote_url: create_result.remote_url,
1044 client: self.clone(),
1045 event_loop: ParkingLotMutex::new(Some(event_loop)),
1046 shutdown,
1047 idle_waiter,
1048 capabilities,
1049 open_canvases,
1050 event_tx,
1051 };
1052 apply_mode_post_create_patch(
1053 &session,
1054 mode,
1055 opt_skip_custom_instructions,
1056 opt_custom_agents_local_only,
1057 opt_coauthor_enabled,
1058 opt_manage_schedule_enabled,
1059 )
1060 .await?;
1061 Ok(session)
1062 }
1063
1064 pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
1075 let total_start = Instant::now();
1076 let session_id = config.session_id.clone();
1077 if config.hooks_handler.is_some() && config.hooks.is_none() {
1078 config.hooks = Some(true);
1079 }
1080 if let Some(transforms) = config.system_message_transform.clone() {
1081 inject_transform_sections_resume(&mut config, transforms.as_ref());
1082 }
1083 let mode = self.inner.mode;
1084 if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
1085 return Err(Error::with_message(
1086 ErrorKind::InvalidConfig,
1087 "ClientMode::Empty requires available_tools to be set on the session config. \
1088 Use ToolSet to specify which tools the session may use (e.g. \
1089 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
1090 ));
1091 }
1092 crate::mode::validate_tool_filter_list(
1093 "available_tools",
1094 config.available_tools.as_deref(),
1095 )?;
1096 crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
1097 config.system_message =
1098 crate::mode::system_message_for_mode(mode, config.system_message.take());
1099 config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
1100 if mode == crate::ClientMode::Empty {
1101 if config.enable_session_telemetry.is_none() {
1102 config.enable_session_telemetry = Some(false);
1103 }
1104 if config.skip_embedding_retrieval.is_none() {
1105 config.skip_embedding_retrieval = Some(true);
1106 }
1107 if config.enable_on_demand_instruction_discovery.is_none() {
1108 config.enable_on_demand_instruction_discovery = Some(false);
1109 }
1110 if config.enable_file_hooks.is_none() {
1111 config.enable_file_hooks = Some(false);
1112 }
1113 if config.enable_host_git_operations.is_none() {
1114 config.enable_host_git_operations = Some(false);
1115 }
1116 if config.enable_session_store.is_none() {
1117 config.enable_session_store = Some(false);
1118 }
1119 if config.enable_skills.is_none() {
1120 config.enable_skills = Some(false);
1121 }
1122 }
1123 if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
1124 config.mcp_oauth_token_storage = Some("in-memory".into());
1125 }
1126 if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
1127 config.embedding_cache_storage = Some("in-memory".into());
1128 }
1129 let opt_skip_custom_instructions = config.skip_custom_instructions;
1130 let opt_custom_agents_local_only = config.custom_agents_local_only;
1131 let opt_coauthor_enabled = config.coauthor_enabled;
1132 let opt_manage_schedule_enabled = config.manage_schedule_enabled;
1133 let (wire, mut runtime) = config.into_wire()?;
1134
1135 let permission_handler = crate::permission::resolve_handler(
1136 runtime.permission_handler.take(),
1137 runtime.permission_policy.take(),
1138 );
1139 let handlers = SessionHandlers {
1140 permission: permission_handler,
1141 elicitation: runtime.elicitation_handler.take(),
1142 user_input: runtime.user_input_handler.take(),
1143 exit_plan_mode: runtime.exit_plan_mode_handler.take(),
1144 auto_mode_switch: runtime.auto_mode_switch_handler.take(),
1145 tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
1146 };
1147 let hooks = runtime.hooks_handler.take();
1148 let transforms = runtime.system_message_transform.take();
1149 let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
1150 let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
1151 let has_hooks = hooks.is_some();
1152 let command_handlers = build_command_handler_map(runtime.commands.as_deref());
1153 let canvas_handler = runtime.canvas_handler.take();
1154 let session_fs_provider = runtime.session_fs_provider.take();
1155 let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
1156 if self.inner.session_fs_configured && session_fs_provider.is_none() {
1157 return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
1158 }
1159 if self.inner.session_fs_sqlite_declared
1160 && let Some(ref provider) = session_fs_provider
1161 && provider.sqlite().is_none()
1162 {
1163 return Err(Error::with_message(
1164 ErrorKind::InvalidConfig,
1165 "SessionFs capabilities declare SQLite support but the provider \
1166 does not implement SessionFsSqliteProvider",
1167 ));
1168 }
1169
1170 let mut params = serde_json::to_value(&wire)?;
1171 let trace_ctx = self.resolve_trace_context().await;
1172 inject_trace_context(&mut params, &trace_ctx);
1173
1174 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
1175 let setup_start = Instant::now();
1176 let channels = self.register_session(&session_id);
1177 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
1178 let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
1179 let shutdown = CancellationToken::new();
1180 let (event_tx, _) = tokio::sync::broadcast::channel(512);
1181 let event_loop = spawn_event_loop(
1182 session_id.clone(),
1183 self.clone(),
1184 handlers,
1185 hooks,
1186 transforms,
1187 command_handlers,
1188 canvas_handler,
1189 session_fs_provider,
1190 bearer_token_providers,
1191 channels,
1192 idle_waiter.clone(),
1193 capabilities.clone(),
1194 open_canvases.clone(),
1195 event_tx.clone(),
1196 shutdown.clone(),
1197 );
1198 let mut registration =
1199 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
1200 tracing::debug!(
1201 elapsed_ms = setup_start.elapsed().as_millis(),
1202 session_id = %session_id,
1203 tools_count,
1204 commands_count,
1205 has_hooks,
1206 "Client::resume_session local setup complete"
1207 );
1208
1209 let rpc_start = Instant::now();
1210 let result = match self.call("session.resume", Some(params)).await {
1211 Ok(result) => result,
1212 Err(error) => {
1213 registration.cleanup(event_loop).await;
1214 return Err(error);
1215 }
1216 };
1217 tracing::debug!(
1218 elapsed_ms = rpc_start.elapsed().as_millis(),
1219 session_id = %session_id,
1220 "Client::resume_session session resume request completed successfully"
1221 );
1222
1223 let resume_result: ResumeSessionResult = match serde_json::from_value(result) {
1224 Ok(result) => result,
1225 Err(error) => {
1226 registration.cleanup(event_loop).await;
1227 return Err(error.into());
1228 }
1229 };
1230 let cli_session_id = resume_result
1231 .session_id
1232 .clone()
1233 .unwrap_or_else(|| session_id.clone());
1234 if cli_session_id != session_id {
1235 registration.cleanup(event_loop).await;
1236 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1237 requested: session_id,
1238 returned: cli_session_id,
1239 })
1240 .into());
1241 }
1242
1243 let skills_reload_start = Instant::now();
1245 if let Err(e) = self
1246 .call(
1247 "session.skills.reload",
1248 Some(serde_json::json!({ "sessionId": session_id })),
1249 )
1250 .await
1251 {
1252 warn!(
1253 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1254 session_id = %session_id,
1255 error = %e,
1256 "Client::resume_session skills reload request failed"
1257 );
1258 } else {
1259 tracing::debug!(
1260 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1261 session_id = %session_id,
1262 "Client::resume_session skills reload request completed successfully"
1263 );
1264 }
1265
1266 *capabilities.write() = resume_result.capabilities.unwrap_or_default();
1267 {
1272 let mut snapshots = open_canvases.write();
1273 for snapshot in resume_result.open_canvases.unwrap_or_default() {
1274 upsert_open_canvas_snapshot(&mut snapshots, snapshot);
1275 }
1276 }
1277
1278 tracing::debug!(
1279 elapsed_ms = total_start.elapsed().as_millis(),
1280 session_id = %session_id,
1281 "Client::resume_session complete"
1282 );
1283 registration.disarm();
1284 let session = Session {
1285 id: session_id,
1286 cwd: self.cwd().clone(),
1287 workspace_path: resume_result.workspace_path,
1288 remote_url: resume_result.remote_url,
1289 client: self.clone(),
1290 event_loop: ParkingLotMutex::new(Some(event_loop)),
1291 shutdown,
1292 idle_waiter,
1293 capabilities,
1294 open_canvases,
1295 event_tx,
1296 };
1297 apply_mode_post_create_patch(
1298 &session,
1299 mode,
1300 opt_skip_custom_instructions,
1301 opt_custom_agents_local_only,
1302 opt_coauthor_enabled,
1303 opt_manage_schedule_enabled,
1304 )
1305 .await?;
1306 Ok(session)
1307 }
1308}
1309
1310type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1311
1312async fn apply_mode_post_create_patch(
1313 session: &Session,
1314 mode: crate::ClientMode,
1315 opt_skip_custom_instructions: Option<bool>,
1316 opt_custom_agents_local_only: Option<bool>,
1317 opt_coauthor_enabled: Option<bool>,
1318 opt_manage_schedule_enabled: Option<bool>,
1319) -> Result<(), Error> {
1320 use crate::generated::api_types::SessionUpdateOptionsParams;
1321 let mut patch = SessionUpdateOptionsParams::default();
1322 let should_send = if mode == crate::ClientMode::Empty {
1323 patch.skip_custom_instructions = Some(opt_skip_custom_instructions.unwrap_or(true));
1324 patch.custom_agents_local_only = Some(opt_custom_agents_local_only.unwrap_or(true));
1325 patch.coauthor_enabled = Some(opt_coauthor_enabled.unwrap_or(false));
1326 patch.manage_schedule_enabled = Some(opt_manage_schedule_enabled.unwrap_or(false));
1327 patch.installed_plugins = Some(Vec::new());
1328 true
1329 } else {
1330 let mut any = false;
1331 if let Some(v) = opt_skip_custom_instructions {
1332 patch.skip_custom_instructions = Some(v);
1333 any = true;
1334 }
1335 if let Some(v) = opt_custom_agents_local_only {
1336 patch.custom_agents_local_only = Some(v);
1337 any = true;
1338 }
1339 if let Some(v) = opt_coauthor_enabled {
1340 patch.coauthor_enabled = Some(v);
1341 any = true;
1342 }
1343 if let Some(v) = opt_manage_schedule_enabled {
1344 patch.manage_schedule_enabled = Some(v);
1345 any = true;
1346 }
1347 any
1348 };
1349 if !should_send {
1350 return Ok(());
1351 }
1352 if let Err(error) = session.rpc().options().update(patch).await {
1353 let _ = session.disconnect().await;
1354 return Err(error);
1355 }
1356 Ok(())
1357}
1358
1359fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1360 let map = match commands {
1361 Some(commands) => commands
1362 .iter()
1363 .filter(|cmd| !cmd.name.is_empty())
1364 .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1365 .collect(),
1366 None => HashMap::new(),
1367 };
1368 Arc::new(map)
1369}
1370
1371fn upsert_open_canvas_snapshot(
1372 snapshots: &mut Vec<OpenCanvasInstance>,
1373 snapshot: OpenCanvasInstance,
1374) {
1375 if let Some(existing) = snapshots
1376 .iter_mut()
1377 .find(|open| open.instance_id == snapshot.instance_id)
1378 {
1379 *existing = snapshot;
1380 } else {
1381 snapshots.push(snapshot);
1382 }
1383}
1384
1385fn remove_open_canvas_snapshot(snapshots: &mut Vec<OpenCanvasInstance>, instance_id: &str) {
1386 snapshots.retain(|open| open.instance_id != instance_id);
1387}
1388
1389#[allow(clippy::too_many_arguments)]
1390fn spawn_event_loop(
1391 session_id: SessionId,
1392 client: Client,
1393 handlers: SessionHandlers,
1394 hooks: Option<Arc<dyn SessionHooks>>,
1395 transforms: Option<Arc<dyn SystemMessageTransform>>,
1396 command_handlers: Arc<CommandHandlerMap>,
1397 canvas_handler: Option<Arc<dyn CanvasHandler>>,
1398 session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1399 bearer_token_providers: HashMap<String, Arc<dyn BearerTokenProvider>>,
1400 channels: crate::router::SessionChannels,
1401 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1402 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1403 open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1404 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1405 shutdown: CancellationToken,
1406) -> JoinHandle<()> {
1407 let crate::router::SessionChannels {
1408 mut notifications,
1409 mut requests,
1410 } = channels;
1411
1412 let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1413 tokio::spawn(
1414 async move {
1415 loop {
1416 tokio::select! {
1427 _ = shutdown.cancelled() => break,
1428 Some(notification) = notifications.recv() => {
1429 handle_notification(
1430 &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
1431 ).await;
1432 }
1433 Some(request) = requests.recv() => {
1434 let ctx = RequestDispatchContext {
1435 client: &client,
1436 handlers: &handlers,
1437 hooks: hooks.as_deref(),
1438 transforms: transforms.as_deref(),
1439 canvas_handler: canvas_handler.as_ref(),
1440 session_fs_provider: session_fs_provider.as_ref(),
1441 bearer_token_providers: &bearer_token_providers,
1442 };
1443 handle_request(&session_id, ctx, request).await;
1444 }
1445 else => break,
1446 }
1447 }
1448 if let Some(waiter) = idle_waiter.lock().take() {
1451 let _ = waiter
1452 .tx
1453 .send(Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()));
1454 }
1455 }
1456 .instrument(span),
1457 )
1458}
1459
1460fn extract_request_id(data: &Value) -> Option<RequestId> {
1461 data.get("requestId")
1462 .and_then(|v| v.as_str())
1463 .filter(|s| !s.is_empty())
1464 .map(RequestId::new)
1465}
1466
1467fn notification_permission_payload(result: &PermissionResult) -> Option<Value> {
1472 match result {
1473 PermissionResult::NoResult => None,
1474 PermissionResult::Decision(decision) => Some(
1475 serde_json::to_value(decision).expect("serializing permission decision should succeed"),
1476 ),
1477 }
1478}
1479
1480fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1481 let message = message.into();
1482 ToolResult::Expanded(ToolResultExpanded {
1483 text_result_for_llm: message.clone(),
1484 result_type: "failure".to_string(),
1485 binary_results_for_llm: None,
1486 session_log: None,
1487 error: Some(message),
1488 tool_telemetry: None,
1489 })
1490}
1491
1492#[allow(clippy::too_many_arguments)]
1494async fn handle_notification(
1495 session_id: &SessionId,
1496 client: &Client,
1497 handlers: &SessionHandlers,
1498 command_handlers: &Arc<CommandHandlerMap>,
1499 notification: SessionEventNotification,
1500 idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1501 capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1502 open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1503 event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1504) {
1505 let dispatch_start = Instant::now();
1506 let event = notification.event.clone();
1507 let event_type = event.parsed_type();
1508 if event_type == SessionEventType::PermissionRequested {
1509 tracing::debug!(
1510 session_id = %session_id,
1511 event_type = %event.event_type,
1512 "Session::handle_notification permission request received"
1513 );
1514 }
1515
1516 match event_type {
1519 SessionEventType::AssistantMessage
1520 | SessionEventType::SessionIdle
1521 | SessionEventType::SessionError => {
1522 let mut guard = idle_waiter.lock();
1523 if let Some(waiter) = guard.as_mut() {
1524 match event_type {
1525 SessionEventType::AssistantMessage => {
1526 if !waiter.first_assistant_message_seen {
1527 waiter.first_assistant_message_seen = true;
1528 tracing::debug!(
1529 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1530 session_id = %session_id,
1531 "Session::send_and_wait first assistant message"
1532 );
1533 }
1534 waiter.last_assistant_message = Some(event.clone());
1535 }
1536 SessionEventType::SessionIdle | SessionEventType::SessionError => {
1537 if let Some(waiter) = guard.take() {
1538 if event_type == SessionEventType::SessionIdle {
1539 tracing::debug!(
1540 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1541 session_id = %session_id,
1542 "Session::send_and_wait idle received"
1543 );
1544 let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1545 } else {
1546 let error_msg = event
1547 .typed_data::<SessionErrorData>()
1548 .map(|d| d.message)
1549 .or_else(|| {
1550 event
1551 .data
1552 .get("message")
1553 .and_then(|v| v.as_str())
1554 .map(|s| s.to_string())
1555 })
1556 .unwrap_or_else(|| "session error".to_string());
1557 let _ = waiter.tx.send(Err(Error::with_message(
1558 ErrorKind::Session(SessionErrorKind::AgentError),
1559 error_msg,
1560 )));
1561 }
1562 }
1563 }
1564 _ => {}
1565 }
1566 }
1567 }
1568 _ => {}
1569 }
1570
1571 if event_type == SessionEventType::CapabilitiesChanged {
1575 match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1576 Ok(changed) => *capabilities.write() = changed,
1577 Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1578 }
1579 }
1580 if event_type == SessionEventType::SessionCanvasOpened {
1581 match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
1582 Ok(open_canvas) => {
1583 upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
1584 }
1585 Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
1586 }
1587 }
1588 if event_type == SessionEventType::SessionCanvasClosed {
1589 match serde_json::from_value::<SessionCanvasClosedData>(notification.event.data.clone()) {
1590 Ok(closed) => {
1591 if closed.instance_id.is_empty() {
1592 warn!("failed to deserialize session.canvas.closed payload");
1593 } else {
1594 remove_open_canvas_snapshot(&mut open_canvases.write(), &closed.instance_id);
1595 }
1596 }
1597 Err(e) => warn!(error = %e, "failed to deserialize session.canvas.closed payload"),
1598 }
1599 }
1600
1601 let _ = event_tx.send(event.clone());
1605
1606 tracing::debug!(
1607 elapsed_ms = dispatch_start.elapsed().as_millis(),
1608 session_id = %session_id,
1609 event_type = %notification.event.event_type,
1610 "Session::handle_notification dispatch"
1611 );
1612
1613 match event_type {
1616 SessionEventType::PermissionRequested => {
1617 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1618 return;
1619 };
1620 if notification
1624 .event
1625 .data
1626 .get("resolvedByHook")
1627 .and_then(|v| v.as_bool())
1628 .unwrap_or(false)
1629 {
1630 return;
1631 }
1632 let Some(permission_handler) = handlers.permission.clone() else {
1636 return;
1637 };
1638 let client = client.clone();
1639 let sid = session_id.clone();
1640 let data: PermissionRequestData =
1641 serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1642 PermissionRequestData {
1643 kind: None,
1644 tool_call_id: None,
1645 extra: notification.event.data.clone(),
1646 }
1647 });
1648 let span = tracing::error_span!(
1649 "permission_request_handler",
1650 session_id = %sid,
1651 request_id = %request_id
1652 );
1653 tokio::spawn(
1654 async move {
1655 let handler_start = Instant::now();
1656 let result = permission_handler
1657 .handle(sid.clone(), request_id.clone(), data)
1658 .await;
1659 tracing::debug!(
1660 elapsed_ms = handler_start.elapsed().as_millis(),
1661 session_id = %sid,
1662 request_id = %request_id,
1663 "PermissionHandler::handle dispatch"
1664 );
1665 let Some(result_value) = notification_permission_payload(&result) else {
1666 return;
1670 };
1671 let rpc_start = Instant::now();
1672 let _ = client
1673 .call(
1674 "session.permissions.handlePendingPermissionRequest",
1675 Some(serde_json::json!({
1676 "sessionId": sid,
1677 "requestId": request_id,
1678 "result": result_value,
1679 })),
1680 )
1681 .await;
1682 tracing::debug!(
1683 elapsed_ms = rpc_start.elapsed().as_millis(),
1684 session_id = %sid,
1685 request_id = %request_id,
1686 "Session::handle_notification response sent successfully"
1687 );
1688 }
1689 .instrument(span),
1690 );
1691 }
1692 SessionEventType::ExternalToolRequested => {
1693 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1694 return;
1695 };
1696 let data: ExternalToolRequestedData =
1697 match serde_json::from_value(notification.event.data.clone()) {
1698 Ok(d) => d,
1699 Err(e) => {
1700 warn!(error = %e, "failed to deserialize external_tool.requested");
1701 let client = client.clone();
1702 let sid = session_id.clone();
1703 let span = tracing::error_span!(
1704 "external_tool_deserialize_error",
1705 session_id = %sid,
1706 request_id = %request_id
1707 );
1708 tokio::spawn(
1709 async move {
1710 let rpc_start = Instant::now();
1711 let _ = client
1712 .call(
1713 "session.tools.handlePendingToolCall",
1714 Some(serde_json::json!({
1715 "sessionId": sid,
1716 "requestId": request_id,
1717 "error": format!("Failed to deserialize tool request: {e}"),
1718 })),
1719 )
1720 .await;
1721 tracing::debug!(
1722 elapsed_ms = rpc_start.elapsed().as_millis(),
1723 session_id = %sid,
1724 request_id = %request_id,
1725 "Session::handle_notification response sent successfully"
1726 );
1727 }
1728 .instrument(span),
1729 );
1730 return;
1731 }
1732 };
1733 let tool_handler = if data.tool_name.is_empty() {
1737 None
1738 } else {
1739 handlers.tools.get(&data.tool_name).cloned()
1740 };
1741 let Some(tool_handler) = tool_handler else {
1742 return;
1743 };
1744 let client = client.clone();
1745 let sid = session_id.clone();
1746 let span = tracing::error_span!(
1747 "external_tool_handler",
1748 session_id = %sid,
1749 request_id = %request_id
1750 );
1751 tokio::spawn(
1752 async move {
1753 if data.tool_call_id.is_empty() {
1758 let error_msg = "Missing toolCallId";
1759 let rpc_start = Instant::now();
1760 let _ = client
1761 .call(
1762 "session.tools.handlePendingToolCall",
1763 Some(serde_json::json!({
1764 "sessionId": sid,
1765 "requestId": request_id,
1766 "error": error_msg,
1767 })),
1768 )
1769 .await;
1770 tracing::debug!(
1771 elapsed_ms = rpc_start.elapsed().as_millis(),
1772 session_id = %sid,
1773 request_id = %request_id,
1774 "Session::handle_notification response sent successfully"
1775 );
1776 return;
1777 }
1778 let tool_call_id = data.tool_call_id.clone();
1779 let tool_name = data.tool_name.clone();
1780 let invocation = ToolInvocation {
1781 session_id: sid.clone(),
1782 tool_call_id: data.tool_call_id,
1783 tool_name: data.tool_name,
1784 arguments: data
1785 .arguments
1786 .unwrap_or(Value::Object(serde_json::Map::new())),
1787 traceparent: data.traceparent,
1788 tracestate: data.tracestate,
1789 };
1790 let handler_start = Instant::now();
1791 let tool_result = match tool_handler.call(invocation).await {
1792 Ok(r) => r,
1793 Err(e) => tool_failure_result(e.to_string()),
1794 };
1795 tracing::debug!(
1796 elapsed_ms = handler_start.elapsed().as_millis(),
1797 session_id = %sid,
1798 request_id = %request_id,
1799 tool_call_id = %tool_call_id,
1800 tool_name = %tool_name,
1801 "ToolHandler::call dispatch"
1802 );
1803 let result_value = serde_json::to_value(tool_result).unwrap_or(Value::Null);
1804 let rpc_start = Instant::now();
1805 let _ = client
1806 .call(
1807 "session.tools.handlePendingToolCall",
1808 Some(serde_json::json!({
1809 "sessionId": sid,
1810 "requestId": request_id,
1811 "result": result_value,
1812 })),
1813 )
1814 .await;
1815 tracing::debug!(
1816 elapsed_ms = rpc_start.elapsed().as_millis(),
1817 session_id = %sid,
1818 request_id = %request_id,
1819 tool_call_id = %tool_call_id,
1820 tool_name = %tool_name,
1821 "Session::handle_notification response sent successfully"
1822 );
1823 }
1824 .instrument(span),
1825 );
1826 }
1827 SessionEventType::UserInputRequested => {
1828 }
1835 SessionEventType::ElicitationRequested => {
1836 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1837 return;
1838 };
1839 let Some(elicitation_handler) = handlers.elicitation.clone() else {
1843 return;
1844 };
1845 let elicitation_data: ElicitationRequestedData =
1846 match serde_json::from_value(notification.event.data.clone()) {
1847 Ok(d) => d,
1848 Err(e) => {
1849 warn!(error = %e, "failed to deserialize elicitation request");
1850 return;
1851 }
1852 };
1853 let request = ElicitationRequest {
1854 message: elicitation_data.message,
1855 requested_schema: elicitation_data
1856 .requested_schema
1857 .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1858 mode: elicitation_data.mode.map(|m| match m {
1859 crate::generated::session_events::ElicitationRequestedMode::Form => {
1860 crate::types::ElicitationMode::Form
1861 }
1862 crate::generated::session_events::ElicitationRequestedMode::Url => {
1863 crate::types::ElicitationMode::Url
1864 }
1865 _ => crate::types::ElicitationMode::Unknown,
1866 }),
1867 elicitation_source: elicitation_data.elicitation_source,
1868 url: elicitation_data.url,
1869 };
1870 let client = client.clone();
1871 let sid = session_id.clone();
1872 let span = tracing::error_span!(
1873 "elicitation_request_handler",
1874 session_id = %sid,
1875 request_id = %request_id
1876 );
1877 tokio::spawn(
1878 async move {
1879 let cancel = ElicitationResult {
1880 action: "cancel".to_string(),
1881 content: None,
1882 };
1883 let handler_task = tokio::spawn({
1885 let sid = sid.clone();
1886 let request_id = request_id.clone();
1887 let span = tracing::error_span!(
1888 "elicitation_callback",
1889 session_id = %sid,
1890 request_id = %request_id
1891 );
1892 async move {
1893 let handler_start = Instant::now();
1894 let response = elicitation_handler
1895 .handle(sid.clone(), request_id.clone(), request)
1896 .await;
1897 tracing::debug!(
1898 elapsed_ms = handler_start.elapsed().as_millis(),
1899 session_id = %sid,
1900 request_id = %request_id,
1901 "ElicitationHandler::handle dispatch"
1902 );
1903 response
1904 }
1905 .instrument(span)
1906 });
1907 let result = match handler_task.await {
1908 Ok(r) => r,
1909 Err(_) => cancel.clone(),
1910 };
1911 let rpc_start = Instant::now();
1912 if let Err(e) = client
1913 .call(
1914 "session.ui.handlePendingElicitation",
1915 Some(serde_json::json!({
1916 "sessionId": sid,
1917 "requestId": request_id,
1918 "result": result,
1919 })),
1920 )
1921 .await
1922 {
1923 warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1925 let _ = client
1926 .call(
1927 "session.ui.handlePendingElicitation",
1928 Some(serde_json::json!({
1929 "sessionId": sid,
1930 "requestId": request_id,
1931 "result": cancel,
1932 })),
1933 )
1934 .await;
1935 } else {
1936 tracing::debug!(
1937 elapsed_ms = rpc_start.elapsed().as_millis(),
1938 session_id = %sid,
1939 request_id = %request_id,
1940 "Session::handle_notification response sent successfully"
1941 );
1942 }
1943 }
1944 .instrument(span),
1945 );
1946 }
1947 SessionEventType::CommandExecute => {
1948 let data: CommandExecuteData =
1949 match serde_json::from_value(notification.event.data.clone()) {
1950 Ok(d) => d,
1951 Err(e) => {
1952 warn!(error = %e, "failed to deserialize command.execute");
1953 return;
1954 }
1955 };
1956 let client = client.clone();
1957 let command_handlers = command_handlers.clone();
1958 let sid = session_id.clone();
1959 let span = tracing::error_span!("command_handler", session_id = %sid);
1960 tokio::spawn(
1961 async move {
1962 let request_id = data.request_id;
1963 let ack_error = match command_handlers.get(&data.command_name).cloned() {
1964 None => Some(format!("Unknown command: {}", data.command_name)),
1965 Some(handler) => {
1966 let command_name = data.command_name.clone();
1967 let ctx = CommandContext {
1968 session_id: sid.clone(),
1969 command: data.command,
1970 command_name: data.command_name,
1971 args: data.args,
1972 };
1973 let handler_start = Instant::now();
1974 let result = handler.on_command(ctx).await;
1975 tracing::debug!(
1976 elapsed_ms = handler_start.elapsed().as_millis(),
1977 session_id = %sid,
1978 request_id = %request_id,
1979 command_name = %command_name,
1980 "CommandHandler::call dispatch"
1981 );
1982 match result {
1983 Ok(()) => None,
1984 Err(e) => Some(e.to_string()),
1985 }
1986 }
1987 };
1988 let mut params = serde_json::json!({
1989 "sessionId": sid,
1990 "requestId": request_id,
1991 });
1992 if let Some(error_msg) = ack_error {
1993 params["error"] = serde_json::Value::String(error_msg);
1994 }
1995 let rpc_start = Instant::now();
1996 let _ = client
1997 .call("session.commands.handlePendingCommand", Some(params))
1998 .await;
1999 tracing::debug!(
2000 elapsed_ms = rpc_start.elapsed().as_millis(),
2001 session_id = %sid,
2002 request_id = %request_id,
2003 "Session::handle_notification response sent successfully"
2004 );
2005 }
2006 .instrument(span),
2007 );
2008 }
2009 _ => {}
2010 }
2011}
2012
2013struct RequestDispatchContext<'a> {
2014 client: &'a Client,
2015 handlers: &'a SessionHandlers,
2016 hooks: Option<&'a dyn SessionHooks>,
2017 transforms: Option<&'a dyn SystemMessageTransform>,
2018 canvas_handler: Option<&'a Arc<dyn CanvasHandler>>,
2019 session_fs_provider: Option<&'a Arc<dyn SessionFsProvider>>,
2020 bearer_token_providers: &'a HashMap<String, Arc<dyn BearerTokenProvider>>,
2021}
2022
2023async fn handle_request(
2025 session_id: &SessionId,
2026 ctx: RequestDispatchContext<'_>,
2027 request: crate::JsonRpcRequest,
2028) {
2029 let sid = session_id.clone();
2030 let client = ctx.client;
2031 let handlers = ctx.handlers;
2032 let hooks = ctx.hooks;
2033 let transforms = ctx.transforms;
2034 let canvas_handler = ctx.canvas_handler;
2035 let session_fs_provider = ctx.session_fs_provider;
2036 let bearer_token_providers = ctx.bearer_token_providers;
2037
2038 if request.method.starts_with("sessionFs.") {
2039 crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
2040 return;
2041 }
2042
2043 if request.method.starts_with("canvas.") {
2044 crate::canvas_dispatch::dispatch(client, canvas_handler, request).await;
2045 return;
2046 }
2047
2048 if request.method == crate::generated::api_types::rpc_methods::PROVIDERTOKEN_GETTOKEN {
2049 crate::provider_token_dispatch::dispatch(client, bearer_token_providers, request).await;
2050 return;
2051 }
2052
2053 match request.method.as_str() {
2054 "hooks.invoke" => {
2055 let params = request.params.as_ref();
2056 let hook_type = params
2057 .and_then(|p| p.get("hookType"))
2058 .and_then(|v| v.as_str())
2059 .unwrap_or("");
2060 let input = params
2061 .and_then(|p| p.get("input"))
2062 .cloned()
2063 .unwrap_or(Value::Object(Default::default()));
2064
2065 let rpc_result = if let Some(hooks) = hooks {
2066 match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
2067 Ok(output) => output,
2068 Err(e) => {
2069 warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
2070 serde_json::json!({ "output": {} })
2071 }
2072 }
2073 } else {
2074 serde_json::json!({ "output": {} })
2075 };
2076
2077 let rpc_response = JsonRpcResponse {
2078 jsonrpc: "2.0".to_string(),
2079 id: request.id,
2080 result: Some(rpc_result),
2081 error: None,
2082 };
2083 let _ = client.send_response(&rpc_response).await;
2084 }
2085
2086 "userInput.request" => {
2087 let params = request.params.as_ref();
2088 let Some(question) = params
2089 .and_then(|p| p.get("question"))
2090 .and_then(|v| v.as_str())
2091 else {
2092 warn!("userInput.request missing 'question' field");
2093 let rpc_response = JsonRpcResponse {
2094 jsonrpc: "2.0".to_string(),
2095 id: request.id,
2096 result: None,
2097 error: Some(crate::JsonRpcError {
2098 code: error_codes::INVALID_PARAMS,
2099 message: "missing required field: question".to_string(),
2100 data: None,
2101 }),
2102 };
2103 let _ = client.send_response(&rpc_response).await;
2104 return;
2105 };
2106 let question = question.to_string();
2107 let choices = params
2108 .and_then(|p| p.get("choices"))
2109 .and_then(|v| v.as_array())
2110 .map(|arr| {
2111 arr.iter()
2112 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2113 .collect()
2114 });
2115 let allow_freeform = params
2116 .and_then(|p| p.get("allowFreeform"))
2117 .and_then(|v| v.as_bool());
2118
2119 let handler_start = Instant::now();
2120 let response = if let Some(user_input_handler) = handlers.user_input.as_ref() {
2121 user_input_handler
2122 .handle(sid.clone(), question, choices, allow_freeform)
2123 .await
2124 } else {
2125 None
2126 };
2127 tracing::debug!(
2128 elapsed_ms = handler_start.elapsed().as_millis(),
2129 session_id = %sid,
2130 "UserInputHandler::handle dispatch"
2131 );
2132
2133 let rpc_result = match response {
2134 Some(UserInputResponse {
2135 answer,
2136 was_freeform,
2137 }) => serde_json::json!({
2138 "answer": answer,
2139 "wasFreeform": was_freeform,
2140 }),
2141 None => serde_json::json!({ "noResponse": true }),
2142 };
2143 let rpc_response = JsonRpcResponse {
2144 jsonrpc: "2.0".to_string(),
2145 id: request.id,
2146 result: Some(rpc_result),
2147 error: None,
2148 };
2149 let _ = client.send_response(&rpc_response).await;
2150 }
2151
2152 "exitPlanMode.request" => {
2153 let params = request
2154 .params
2155 .as_ref()
2156 .cloned()
2157 .unwrap_or(Value::Object(serde_json::Map::new()));
2158 let data: ExitPlanModeData = match serde_json::from_value(params) {
2159 Ok(d) => d,
2160 Err(e) => {
2161 warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
2162 ExitPlanModeData::default()
2163 }
2164 };
2165
2166 let rpc_result = if let Some(exit_plan_handler) = handlers.exit_plan_mode.as_ref() {
2167 let result = exit_plan_handler.handle(sid, data).await;
2168 serde_json::to_value(result).expect("ExitPlanModeResult serialization cannot fail")
2169 } else {
2170 serde_json::json!({ "approved": true })
2171 };
2172 let rpc_response = JsonRpcResponse {
2173 jsonrpc: "2.0".to_string(),
2174 id: request.id,
2175 result: Some(rpc_result),
2176 error: None,
2177 };
2178 let _ = client.send_response(&rpc_response).await;
2179 }
2180
2181 "autoModeSwitch.request" => {
2182 let error_code = request
2183 .params
2184 .as_ref()
2185 .and_then(|p| p.get("errorCode"))
2186 .and_then(|v| v.as_str())
2187 .map(|s| s.to_string());
2188 let retry_after_seconds = request
2189 .params
2190 .as_ref()
2191 .and_then(|p| p.get("retryAfterSeconds"))
2192 .and_then(|v| v.as_f64());
2193
2194 let answer = if let Some(auto_mode_handler) = handlers.auto_mode_switch.as_ref() {
2195 auto_mode_handler
2196 .handle(sid, error_code, retry_after_seconds)
2197 .await
2198 } else {
2199 AutoModeSwitchResponse::No
2200 };
2201 let rpc_response = JsonRpcResponse {
2202 jsonrpc: "2.0".to_string(),
2203 id: request.id,
2204 result: Some(serde_json::json!({ "response": answer })),
2205 error: None,
2206 };
2207 let _ = client.send_response(&rpc_response).await;
2208 }
2209
2210 "systemMessage.transform" => {
2211 let params = request.params.as_ref();
2212 let sections: HashMap<String, crate::transforms::TransformSection> =
2213 match params.and_then(|p| p.get("sections")) {
2214 Some(v) => match serde_json::from_value(v.clone()) {
2215 Ok(s) => s,
2216 Err(e) => {
2217 let _ = send_error_response(
2218 client,
2219 request.id,
2220 error_codes::INVALID_PARAMS,
2221 &format!("invalid sections: {e}"),
2222 )
2223 .await;
2224 return;
2225 }
2226 },
2227 None => {
2228 let _ = send_error_response(
2229 client,
2230 request.id,
2231 error_codes::INVALID_PARAMS,
2232 "missing sections parameter",
2233 )
2234 .await;
2235 return;
2236 }
2237 };
2238
2239 let rpc_result = if let Some(transforms) = transforms {
2240 let transform_start = Instant::now();
2241 let response =
2242 crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2243 tracing::debug!(
2244 elapsed_ms = transform_start.elapsed().as_millis(),
2245 session_id = %sid,
2246 "SystemMessageTransform::transform_section dispatch"
2247 );
2248 match serde_json::to_value(response) {
2249 Ok(v) => v,
2250 Err(e) => {
2251 warn!(error = %e, "failed to serialize transform response");
2252 serde_json::json!({ "sections": {} })
2253 }
2254 }
2255 } else {
2256 let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2258 serde_json::json!({ "sections": passthrough })
2259 };
2260
2261 let rpc_response = JsonRpcResponse {
2262 jsonrpc: "2.0".to_string(),
2263 id: request.id,
2264 result: Some(rpc_result),
2265 error: None,
2266 };
2267 let _ = client.send_response(&rpc_response).await;
2268 }
2269
2270 method => {
2271 warn!(
2272 method = method,
2273 "unhandled request method in session event loop"
2274 );
2275 let _ = send_error_response(
2276 client,
2277 request.id,
2278 error_codes::METHOD_NOT_FOUND,
2279 &format!("unknown method: {method}"),
2280 )
2281 .await;
2282 }
2283 }
2284}
2285
2286async fn send_error_response(
2287 client: &Client,
2288 id: u64,
2289 code: i32,
2290 message: &str,
2291) -> Result<(), Error> {
2292 let response = JsonRpcResponse {
2293 jsonrpc: "2.0".to_string(),
2294 id,
2295 result: None,
2296 error: Some(crate::JsonRpcError {
2297 code,
2298 message: message.to_string(),
2299 data: None,
2300 }),
2301 };
2302 client.send_response(&response).await
2303}
2304
2305fn apply_transform_sections(
2309 sys_msg: &mut SystemMessageConfig,
2310 transforms: &dyn SystemMessageTransform,
2311) {
2312 sys_msg.mode = Some("customize".to_string());
2313 let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2314 for id in transforms.section_ids() {
2315 sections.entry(id).or_insert_with(|| SectionOverride {
2316 action: Some("transform".to_string()),
2317 content: None,
2318 });
2319 }
2320}
2321
2322fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2323 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2324 apply_transform_sections(sys_msg, transforms);
2325}
2326
2327fn inject_transform_sections_resume(
2328 config: &mut ResumeSessionConfig,
2329 transforms: &dyn SystemMessageTransform,
2330) {
2331 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2332 apply_transform_sections(sys_msg, transforms);
2333}
2334
2335#[cfg(test)]
2336mod tests {
2337 use serde_json::json;
2338
2339 use super::notification_permission_payload;
2340 use crate::handler::PermissionResult;
2341
2342 #[test]
2343 fn notification_payload_suppresses_no_result() {
2344 assert!(notification_permission_payload(&PermissionResult::NoResult).is_none());
2345 }
2346
2347 #[test]
2348 fn notification_payload_serializes_decisions() {
2349 assert_eq!(
2350 notification_permission_payload(&PermissionResult::approve_once()),
2351 Some(json!({ "kind": "approve-once" }))
2352 );
2353 assert_eq!(
2354 notification_permission_payload(&PermissionResult::reject(None)),
2355 Some(json!({ "kind": "reject" }))
2356 );
2357 assert_eq!(
2358 notification_permission_payload(&PermissionResult::reject(Some("bad".to_string()))),
2359 Some(json!({ "kind": "reject", "feedback": "bad" }))
2360 );
2361 assert_eq!(
2362 notification_permission_payload(&PermissionResult::user_not_available()),
2363 Some(json!({ "kind": "user-not-available" }))
2364 );
2365 }
2366}