1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::canvas::CanvasHandler;
14use crate::generated::api_types::{LogRequest, ModelSwitchToRequest, OpenCanvasInstance};
15use crate::generated::session_events::{
16 CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData,
17 SessionCanvasClosedData, SessionErrorData, SessionEventType,
18};
19use crate::handler::{
20 AutoModeSwitchHandler, AutoModeSwitchResponse, ElicitationHandler, ExitPlanModeHandler,
21 PermissionHandler, PermissionResult, UserInputHandler, UserInputResponse,
22};
23use crate::hooks::SessionHooks;
24use crate::session_fs::SessionFsProvider;
25use crate::trace_context::inject_trace_context;
26use crate::transforms::SystemMessageTransform;
27use crate::types::{
28 CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
29 ElicitationResult, ExitPlanModeData, GetMessagesResponse, MessageOptions,
30 PermissionRequestData, RequestId, ResumeSessionConfig, ResumeSessionResult, SectionOverride,
31 SessionCapabilities, SessionConfig, SessionEvent, SessionId, SetModelOptions,
32 SystemMessageConfig, ToolInvocation, ToolResult, ToolResultExpanded, TraceContext,
33 UiInputOptions, ensure_attachment_display_names,
34};
35use crate::{
36 Client, Error, ErrorKind, JsonRpcResponse, SessionErrorKind, SessionEventNotification,
37 error_codes,
38};
39
40#[derive(Clone)]
48pub(crate) struct SessionHandlers {
49 pub permission: Option<Arc<dyn PermissionHandler>>,
50 pub elicitation: Option<Arc<dyn ElicitationHandler>>,
51 pub user_input: Option<Arc<dyn UserInputHandler>>,
52 pub exit_plan_mode: Option<Arc<dyn ExitPlanModeHandler>>,
53 pub auto_mode_switch: Option<Arc<dyn AutoModeSwitchHandler>>,
54 pub tools: Arc<HashMap<String, Arc<dyn crate::tool::ToolHandler>>>,
55}
56
57struct IdleWaiter {
59 tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
60 last_assistant_message: Option<SessionEvent>,
61 started_at: Instant,
62 first_assistant_message_seen: bool,
63}
64
65struct WaiterGuard {
77 slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
78}
79
80impl Drop for WaiterGuard {
81 fn drop(&mut self) {
82 self.slot.lock().take();
83 }
84}
85
86struct PendingSessionRegistration {
87 client: Client,
88 session_id: SessionId,
89 shutdown: CancellationToken,
90 disarmed: bool,
91}
92
93impl PendingSessionRegistration {
94 fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
95 Self {
96 client,
97 session_id,
98 shutdown,
99 disarmed: false,
100 }
101 }
102
103 async fn cleanup(mut self, event_loop: JoinHandle<()>) {
104 self.shutdown.cancel();
105 let _ = event_loop.await;
106 self.client.unregister_session(&self.session_id);
107 self.disarmed = true;
108 }
109
110 fn disarm(&mut self) {
111 self.disarmed = true;
112 }
113}
114
115impl Drop for PendingSessionRegistration {
116 fn drop(&mut self) {
117 if !self.disarmed {
118 self.shutdown.cancel();
119 self.client.unregister_session(&self.session_id);
120 }
121 }
122}
123
124pub struct Session {
137 id: SessionId,
138 cwd: PathBuf,
139 workspace_path: Option<PathBuf>,
140 remote_url: Option<String>,
141 client: Client,
142 event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
147 shutdown: CancellationToken,
161 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
168 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
170 open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
172 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
174}
175
176impl Session {
177 pub fn id(&self) -> &SessionId {
179 &self.id
180 }
181
182 pub fn cwd(&self) -> &PathBuf {
184 &self.cwd
185 }
186
187 pub fn workspace_path(&self) -> Option<&Path> {
189 self.workspace_path.as_deref()
190 }
191
192 pub fn remote_url(&self) -> Option<&str> {
194 self.remote_url.as_deref()
195 }
196
197 pub fn capabilities(&self) -> SessionCapabilities {
202 self.capabilities.read().clone()
203 }
204
205 pub fn open_canvases(&self) -> Vec<OpenCanvasInstance> {
208 self.open_canvases.read().clone()
209 }
210
211 pub fn cancellation_token(&self) -> CancellationToken {
238 self.shutdown.child_token()
239 }
240
241 pub fn subscribe(&self) -> crate::subscription::EventSubscription {
280 crate::subscription::EventSubscription::new(self.event_tx.subscribe())
281 }
282
283 pub fn client(&self) -> &Client {
285 &self.client
286 }
287
288 pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
299 crate::generated::rpc::SessionRpc { session: self }
300 }
301
302 pub async fn stop_event_loop(&self) {
310 self.shutdown.cancel();
311 let handle = self.event_loop.lock().take();
312 if let Some(handle) = handle {
313 let _ = handle.await;
314 }
315 if let Some(waiter) = self.idle_waiter.lock().take() {
317 let _ = waiter.tx.send(Err(
318 ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()
319 ));
320 }
321 }
322
323 pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
348 if self.idle_waiter.lock().is_some() {
349 return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
350 }
351 self.send_inner(opts.into()).await
352 }
353
354 async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
355 let mut params = serde_json::json!({
356 "sessionId": self.id,
357 "prompt": opts.prompt,
358 });
359 if let Some(m) = opts.mode {
360 params["mode"] = serde_json::to_value(m)?;
361 }
362 if let Some(am) = opts.agent_mode {
363 params["agentMode"] = serde_json::to_value(am)?;
364 }
365 if let Some(mut a) = opts.attachments {
366 ensure_attachment_display_names(&mut a);
367 params["attachments"] = serde_json::to_value(a)?;
368 }
369 if let Some(headers) = opts.request_headers
370 && !headers.is_empty()
371 {
372 params["requestHeaders"] = serde_json::to_value(headers)?;
373 }
374 if let Some(display_prompt) = opts.display_prompt {
375 params["displayPrompt"] = serde_json::to_value(display_prompt)?;
376 }
377 let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
378 TraceContext {
379 traceparent: opts.traceparent,
380 tracestate: opts.tracestate,
381 }
382 } else {
383 self.client.resolve_trace_context().await
384 };
385 inject_trace_context(&mut params, &trace_ctx);
386 let rpc_start = Instant::now();
387 let result = self.client.call("session.send", Some(params)).await?;
388 let message_id = result
389 .get("messageId")
390 .and_then(|v| v.as_str())
391 .map(|s| s.to_string())
392 .unwrap_or_default();
393 tracing::debug!(
394 elapsed_ms = rpc_start.elapsed().as_millis(),
395 session_id = %self.id,
396 message_id = %message_id,
397 "Session::send completed successfully"
398 );
399 Ok(message_id)
400 }
401
402 pub async fn send_and_wait(
422 &self,
423 opts: impl Into<MessageOptions>,
424 ) -> Result<Option<SessionEvent>, Error> {
425 let total_start = Instant::now();
426 let opts = opts.into();
427 let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
428 let (tx, rx) = oneshot::channel();
429
430 {
431 let mut guard = self.idle_waiter.lock();
432 if guard.is_some() {
433 return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
434 }
435 *guard = Some(IdleWaiter {
436 tx,
437 last_assistant_message: None,
438 started_at: total_start,
439 first_assistant_message_seen: false,
440 });
441 }
442
443 let _waiter_guard = WaiterGuard {
448 slot: self.idle_waiter.clone(),
449 };
450
451 let result = tokio::time::timeout(timeout_duration, async {
452 self.send_inner(opts).await?;
453 match rx.await {
454 Ok(result) => result,
455 Err(_) => Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()),
456 }
457 })
458 .await;
459
460 match result {
461 Ok(inner) => {
462 tracing::debug!(
463 elapsed_ms = total_start.elapsed().as_millis(),
464 session_id = %self.id,
465 completed_by = if inner.is_ok() { "idle" } else { "error" },
466 "Session::send_and_wait complete"
467 );
468 inner
469 }
470 Err(_) => {
471 tracing::warn!(
472 elapsed_ms = total_start.elapsed().as_millis(),
473 session_id = %self.id,
474 completed_by = "timeout",
475 "Session::send_and_wait failed"
476 );
477 Err(ErrorKind::Session(SessionErrorKind::Timeout(timeout_duration)).into())
478 }
479 }
480 }
481
482 pub async fn get_events(&self) -> Result<Vec<SessionEvent>, Error> {
484 let result = self
485 .client
486 .call(
487 "session.getMessages",
488 Some(serde_json::json!({ "sessionId": self.id })),
489 )
490 .await?;
491 let response: GetMessagesResponse = serde_json::from_value(result)?;
492 Ok(response.events)
493 }
494
495 #[deprecated(since = "0.1.0", note = "Use `get_events()` instead")]
497 pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
498 self.get_events().await
499 }
500
501 pub async fn abort(&self) -> Result<(), Error> {
509 self.client
510 .call(
511 "session.abort",
512 Some(serde_json::json!({ "sessionId": self.id })),
513 )
514 .await?;
515 Ok(())
516 }
517
518 pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
522 let opts = opts.unwrap_or_default();
523 let request = ModelSwitchToRequest {
524 model_id: model.to_string(),
525 reasoning_effort: opts.reasoning_effort,
526 reasoning_summary: opts.reasoning_summary,
527 context_tier: opts.context_tier,
528 model_capabilities: opts.model_capabilities,
529 };
530 self.rpc().model().switch_to(request).await?;
531 Ok(())
532 }
533
534 pub async fn disconnect(&self) -> Result<(), Error> {
551 self.client
552 .call(
553 "session.destroy",
554 Some(serde_json::json!({ "sessionId": self.id })),
555 )
556 .await?;
557 self.stop_event_loop().await;
558 self.client.unregister_session(&self.id);
559 Ok(())
560 }
561
562 #[deprecated(since = "0.1.0", note = "Use `disconnect()` instead")]
567 pub async fn destroy(&self) -> Result<(), Error> {
568 self.disconnect().await
569 }
570
571 pub async fn log(
575 &self,
576 message: &str,
577 opts: Option<crate::types::LogOptions>,
578 ) -> Result<(), Error> {
579 let opts = opts.unwrap_or_default();
580 let level = match opts.level {
581 Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
582 None => None,
583 };
584 let request = LogRequest {
585 message: message.to_string(),
586 level,
587 ephemeral: opts.ephemeral,
588 r#type: None,
589 tip: None,
590 url: None,
591 };
592 self.rpc().log(request).await?;
593 Ok(())
594 }
595
596 pub fn ui(&self) -> SessionUi<'_> {
602 SessionUi { session: self }
603 }
604
605 fn assert_elicitation(&self) -> Result<(), Error> {
607 if self
608 .capabilities
609 .read()
610 .ui
611 .as_ref()
612 .and_then(|u| u.elicitation)
613 != Some(true)
614 {
615 return Err(ErrorKind::Session(SessionErrorKind::ElicitationNotSupported).into());
616 }
617 Ok(())
618 }
619}
620
621impl Drop for Session {
622 fn drop(&mut self) {
623 self.shutdown.cancel();
635 self.client.unregister_session(&self.id);
636 }
637}
638
639pub struct SessionUi<'a> {
646 session: &'a Session,
647}
648
649impl<'a> SessionUi<'a> {
650 pub async fn elicitation(
658 &self,
659 message: &str,
660 schema: Value,
661 ) -> Result<ElicitationResult, Error> {
662 self.session.assert_elicitation()?;
663 let result = self
664 .session
665 .client
666 .call(
667 "session.ui.elicitation",
668 Some(serde_json::json!({
669 "sessionId": self.session.id,
670 "message": message,
671 "requestedSchema": schema,
672 })),
673 )
674 .await?;
675 let elicitation: ElicitationResult = serde_json::from_value(result)?;
676 Ok(elicitation)
677 }
678
679 pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
683 self.session.assert_elicitation()?;
684 let schema = serde_json::json!({
685 "type": "object",
686 "properties": {
687 "confirmed": {
688 "type": "boolean",
689 "default": true,
690 }
691 },
692 "required": ["confirmed"]
693 });
694 let result = self.elicitation(message, schema).await?;
695 Ok(result.action == "accept"
696 && result
697 .content
698 .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
699 == Some(true))
700 }
701
702 pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
706 self.session.assert_elicitation()?;
707 let schema = serde_json::json!({
708 "type": "object",
709 "properties": {
710 "selection": {
711 "type": "string",
712 "enum": options,
713 }
714 },
715 "required": ["selection"]
716 });
717 let result = self.elicitation(message, schema).await?;
718 if result.action != "accept" {
719 return Ok(None);
720 }
721 let selection = result.content.and_then(|c| {
722 c.get("selection")
723 .and_then(|v| v.as_str())
724 .map(String::from)
725 });
726 Ok(selection)
727 }
728
729 pub async fn input(
734 &self,
735 message: &str,
736 options: Option<&UiInputOptions<'_>>,
737 ) -> Result<Option<String>, Error> {
738 self.session.assert_elicitation()?;
739 let mut field = serde_json::json!({ "type": "string" });
740 if let Some(opts) = options {
741 if let Some(title) = opts.title {
742 field["title"] = Value::String(title.to_string());
743 }
744 if let Some(desc) = opts.description {
745 field["description"] = Value::String(desc.to_string());
746 }
747 if let Some(min) = opts.min_length {
748 field["minLength"] = Value::Number(min.into());
749 }
750 if let Some(max) = opts.max_length {
751 field["maxLength"] = Value::Number(max.into());
752 }
753 if let Some(fmt) = &opts.format {
754 field["format"] = Value::String(fmt.as_str().to_string());
755 }
756 if let Some(default) = opts.default {
757 field["default"] = Value::String(default.to_string());
758 }
759 }
760 let schema = serde_json::json!({
761 "type": "object",
762 "properties": { "value": field },
763 "required": ["value"]
764 });
765 let result = self.elicitation(message, schema).await?;
766 if result.action != "accept" {
767 return Ok(None);
768 }
769 let value = result
770 .content
771 .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
772 Ok(value)
773 }
774}
775
776impl Client {
777 pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
799 let total_start = Instant::now();
800 let caller_session_id = config.session_id.clone();
808 let use_server_generated_id = config.cloud.is_some() && caller_session_id.is_none();
809 let local_session_id: Option<SessionId> = if use_server_generated_id {
810 None
811 } else {
812 Some(
813 caller_session_id
814 .clone()
815 .unwrap_or_else(|| SessionId::new(uuid::Uuid::new_v4().to_string())),
816 )
817 };
818 if config.hooks_handler.is_some() && config.hooks.is_none() {
819 config.hooks = Some(true);
820 }
821 if let Some(transforms) = config.system_message_transform.clone() {
822 inject_transform_sections(&mut config, transforms.as_ref());
823 }
824 let mode = self.inner.mode;
825 if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
826 return Err(Error::with_message(
827 ErrorKind::InvalidConfig,
828 "ClientMode::Empty requires available_tools to be set on the session config. \
829 Use ToolSet to specify which tools the session may use (e.g. \
830 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
831 ));
832 }
833 crate::mode::validate_tool_filter_list(
834 "available_tools",
835 config.available_tools.as_deref(),
836 )?;
837 crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
838 config.system_message =
839 crate::mode::system_message_for_mode(mode, config.system_message.take());
840 config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
841 if mode == crate::ClientMode::Empty {
842 if config.enable_session_telemetry.is_none() {
843 config.enable_session_telemetry = Some(false);
844 }
845 if config.skip_embedding_retrieval.is_none() {
846 config.skip_embedding_retrieval = Some(true);
847 }
848 if config.enable_on_demand_instruction_discovery.is_none() {
849 config.enable_on_demand_instruction_discovery = Some(false);
850 }
851 if config.enable_file_hooks.is_none() {
852 config.enable_file_hooks = Some(false);
853 }
854 if config.enable_host_git_operations.is_none() {
855 config.enable_host_git_operations = Some(false);
856 }
857 if config.enable_session_store.is_none() {
858 config.enable_session_store = Some(false);
859 }
860 if config.enable_skills.is_none() {
861 config.enable_skills = Some(false);
862 }
863 }
864 if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
865 config.mcp_oauth_token_storage = Some("in-memory".into());
866 }
867 if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
868 config.embedding_cache_storage = Some("in-memory".into());
869 }
870 let opt_skip_custom_instructions = config.skip_custom_instructions;
871 let opt_custom_agents_local_only = config.custom_agents_local_only;
872 let opt_coauthor_enabled = config.coauthor_enabled;
873 let opt_manage_schedule_enabled = config.manage_schedule_enabled;
874 let (wire, mut runtime) = config.into_wire(local_session_id.clone())?;
875
876 let permission_handler = crate::permission::resolve_handler(
877 runtime.permission_handler.take(),
878 runtime.permission_policy.take(),
879 );
880 let handlers = SessionHandlers {
881 permission: permission_handler,
882 elicitation: runtime.elicitation_handler.take(),
883 user_input: runtime.user_input_handler.take(),
884 exit_plan_mode: runtime.exit_plan_mode_handler.take(),
885 auto_mode_switch: runtime.auto_mode_switch_handler.take(),
886 tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
887 };
888 let hooks = runtime.hooks_handler.take();
889 let transforms = runtime.system_message_transform.take();
890 let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
891 let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
892 let has_hooks = hooks.is_some();
893 let command_handlers = build_command_handler_map(runtime.commands.as_deref());
894 let canvas_handler = runtime.canvas_handler.take();
895 let session_fs_provider = runtime.session_fs_provider.take();
896 if self.inner.session_fs_configured && session_fs_provider.is_none() {
897 return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
898 }
899 if self.inner.session_fs_sqlite_declared
900 && let Some(ref provider) = session_fs_provider
901 && provider.sqlite().is_none()
902 {
903 return Err(Error::with_message(
904 ErrorKind::InvalidConfig,
905 "SessionFs capabilities declare SQLite support but the provider \
906 does not implement SessionFsSqliteProvider",
907 ));
908 }
909
910 let mut params = serde_json::to_value(&wire)?;
911 let trace_ctx = self.resolve_trace_context().await;
912 inject_trace_context(&mut params, &trace_ctx);
913
914 let setup_start = Instant::now();
915 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
916 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
917 let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
918 let shutdown = CancellationToken::new();
919 let (event_tx, _) = tokio::sync::broadcast::channel(512);
920
921 let inline_stash: Arc<
927 ParkingLotMutex<Option<(SessionId, crate::router::SessionChannels)>>,
928 > = Arc::new(ParkingLotMutex::new(None));
929
930 let inline_callback: Option<crate::jsonrpc::InlineResponseCallback> = if let Some(ref sid) =
931 local_session_id
932 {
933 let channels = self.register_session(sid);
934 *inline_stash.lock() = Some((sid.clone(), channels));
935 None
936 } else {
937 let client = self.clone();
938 let stash = inline_stash.clone();
939 let expected = caller_session_id.clone();
940 Some(Box::new(move |response| {
941 let result = response.result.as_ref().ok_or_else(|| {
942 Error::with_message(ErrorKind::Json, "session.create response had no result")
943 })?;
944 let parsed: CreateSessionResult =
945 serde_json::from_value(result.clone()).map_err(Error::from)?;
946 if let Some(requested) = expected.as_ref()
947 && parsed.session_id != *requested
948 {
949 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
950 requested: requested.clone(),
951 returned: parsed.session_id,
952 })
953 .into());
954 }
955 let channels = client.register_session(&parsed.session_id);
956 *stash.lock() = Some((parsed.session_id, channels));
957 Ok(())
958 }))
959 };
960
961 let rpc_start = Instant::now();
962 let result = match self
963 .call_with_inline_callback("session.create", Some(params), inline_callback)
964 .await
965 {
966 Ok(result) => result,
967 Err(error) => {
968 if let Some((id, _channels)) = inline_stash.lock().take() {
969 self.unregister_session(&id);
970 }
971 return Err(error);
972 }
973 };
974 tracing::debug!(
975 elapsed_ms = rpc_start.elapsed().as_millis(),
976 "Client::create_session session creation request completed successfully"
977 );
978 let create_result: CreateSessionResult = match serde_json::from_value(result) {
979 Ok(result) => result,
980 Err(error) => {
981 if let Some((id, _channels)) = inline_stash.lock().take() {
982 self.unregister_session(&id);
983 }
984 return Err(error.into());
985 }
986 };
987
988 if let Some(ref requested) = local_session_id
989 && create_result.session_id != *requested
990 {
991 if let Some((id, _channels)) = inline_stash.lock().take() {
992 self.unregister_session(&id);
993 }
994 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
995 requested: requested.clone(),
996 returned: create_result.session_id.clone(),
997 })
998 .into());
999 }
1000
1001 let (session_id, channels) = inline_stash
1002 .lock()
1003 .take()
1004 .expect("session registration must have populated stash on success");
1005 let event_loop = spawn_event_loop(
1006 session_id.clone(),
1007 self.clone(),
1008 handlers,
1009 hooks,
1010 transforms,
1011 command_handlers,
1012 canvas_handler,
1013 session_fs_provider,
1014 channels,
1015 idle_waiter.clone(),
1016 capabilities.clone(),
1017 open_canvases.clone(),
1018 event_tx.clone(),
1019 shutdown.clone(),
1020 );
1021 tracing::debug!(
1022 elapsed_ms = setup_start.elapsed().as_millis(),
1023 session_id = %session_id,
1024 tools_count,
1025 commands_count,
1026 has_hooks,
1027 "Client::create_session local setup complete"
1028 );
1029 *capabilities.write() = create_result.capabilities.unwrap_or_default();
1030
1031 tracing::debug!(
1032 elapsed_ms = total_start.elapsed().as_millis(),
1033 session_id = %session_id,
1034 "Client::create_session complete"
1035 );
1036 let session = Session {
1037 id: session_id,
1038 cwd: self.cwd().clone(),
1039 workspace_path: create_result.workspace_path,
1040 remote_url: create_result.remote_url,
1041 client: self.clone(),
1042 event_loop: ParkingLotMutex::new(Some(event_loop)),
1043 shutdown,
1044 idle_waiter,
1045 capabilities,
1046 open_canvases,
1047 event_tx,
1048 };
1049 apply_mode_post_create_patch(
1050 &session,
1051 mode,
1052 opt_skip_custom_instructions,
1053 opt_custom_agents_local_only,
1054 opt_coauthor_enabled,
1055 opt_manage_schedule_enabled,
1056 )
1057 .await?;
1058 Ok(session)
1059 }
1060
1061 pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
1072 let total_start = Instant::now();
1073 let session_id = config.session_id.clone();
1074 if config.hooks_handler.is_some() && config.hooks.is_none() {
1075 config.hooks = Some(true);
1076 }
1077 if let Some(transforms) = config.system_message_transform.clone() {
1078 inject_transform_sections_resume(&mut config, transforms.as_ref());
1079 }
1080 let mode = self.inner.mode;
1081 if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
1082 return Err(Error::with_message(
1083 ErrorKind::InvalidConfig,
1084 "ClientMode::Empty requires available_tools to be set on the session config. \
1085 Use ToolSet to specify which tools the session may use (e.g. \
1086 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
1087 ));
1088 }
1089 crate::mode::validate_tool_filter_list(
1090 "available_tools",
1091 config.available_tools.as_deref(),
1092 )?;
1093 crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
1094 config.system_message =
1095 crate::mode::system_message_for_mode(mode, config.system_message.take());
1096 config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
1097 if mode == crate::ClientMode::Empty {
1098 if config.enable_session_telemetry.is_none() {
1099 config.enable_session_telemetry = Some(false);
1100 }
1101 if config.skip_embedding_retrieval.is_none() {
1102 config.skip_embedding_retrieval = Some(true);
1103 }
1104 if config.enable_on_demand_instruction_discovery.is_none() {
1105 config.enable_on_demand_instruction_discovery = Some(false);
1106 }
1107 if config.enable_file_hooks.is_none() {
1108 config.enable_file_hooks = Some(false);
1109 }
1110 if config.enable_host_git_operations.is_none() {
1111 config.enable_host_git_operations = Some(false);
1112 }
1113 if config.enable_session_store.is_none() {
1114 config.enable_session_store = Some(false);
1115 }
1116 if config.enable_skills.is_none() {
1117 config.enable_skills = Some(false);
1118 }
1119 }
1120 if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
1121 config.mcp_oauth_token_storage = Some("in-memory".into());
1122 }
1123 if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
1124 config.embedding_cache_storage = Some("in-memory".into());
1125 }
1126 let opt_skip_custom_instructions = config.skip_custom_instructions;
1127 let opt_custom_agents_local_only = config.custom_agents_local_only;
1128 let opt_coauthor_enabled = config.coauthor_enabled;
1129 let opt_manage_schedule_enabled = config.manage_schedule_enabled;
1130 let (wire, mut runtime) = config.into_wire()?;
1131
1132 let permission_handler = crate::permission::resolve_handler(
1133 runtime.permission_handler.take(),
1134 runtime.permission_policy.take(),
1135 );
1136 let handlers = SessionHandlers {
1137 permission: permission_handler,
1138 elicitation: runtime.elicitation_handler.take(),
1139 user_input: runtime.user_input_handler.take(),
1140 exit_plan_mode: runtime.exit_plan_mode_handler.take(),
1141 auto_mode_switch: runtime.auto_mode_switch_handler.take(),
1142 tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
1143 };
1144 let hooks = runtime.hooks_handler.take();
1145 let transforms = runtime.system_message_transform.take();
1146 let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
1147 let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
1148 let has_hooks = hooks.is_some();
1149 let command_handlers = build_command_handler_map(runtime.commands.as_deref());
1150 let canvas_handler = runtime.canvas_handler.take();
1151 let session_fs_provider = runtime.session_fs_provider.take();
1152 if self.inner.session_fs_configured && session_fs_provider.is_none() {
1153 return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
1154 }
1155 if self.inner.session_fs_sqlite_declared
1156 && let Some(ref provider) = session_fs_provider
1157 && provider.sqlite().is_none()
1158 {
1159 return Err(Error::with_message(
1160 ErrorKind::InvalidConfig,
1161 "SessionFs capabilities declare SQLite support but the provider \
1162 does not implement SessionFsSqliteProvider",
1163 ));
1164 }
1165
1166 let mut params = serde_json::to_value(&wire)?;
1167 let trace_ctx = self.resolve_trace_context().await;
1168 inject_trace_context(&mut params, &trace_ctx);
1169
1170 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
1171 let setup_start = Instant::now();
1172 let channels = self.register_session(&session_id);
1173 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
1174 let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
1175 let shutdown = CancellationToken::new();
1176 let (event_tx, _) = tokio::sync::broadcast::channel(512);
1177 let event_loop = spawn_event_loop(
1178 session_id.clone(),
1179 self.clone(),
1180 handlers,
1181 hooks,
1182 transforms,
1183 command_handlers,
1184 canvas_handler,
1185 session_fs_provider,
1186 channels,
1187 idle_waiter.clone(),
1188 capabilities.clone(),
1189 open_canvases.clone(),
1190 event_tx.clone(),
1191 shutdown.clone(),
1192 );
1193 let mut registration =
1194 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
1195 tracing::debug!(
1196 elapsed_ms = setup_start.elapsed().as_millis(),
1197 session_id = %session_id,
1198 tools_count,
1199 commands_count,
1200 has_hooks,
1201 "Client::resume_session local setup complete"
1202 );
1203
1204 let rpc_start = Instant::now();
1205 let result = match self.call("session.resume", Some(params)).await {
1206 Ok(result) => result,
1207 Err(error) => {
1208 registration.cleanup(event_loop).await;
1209 return Err(error);
1210 }
1211 };
1212 tracing::debug!(
1213 elapsed_ms = rpc_start.elapsed().as_millis(),
1214 session_id = %session_id,
1215 "Client::resume_session session resume request completed successfully"
1216 );
1217
1218 let resume_result: ResumeSessionResult = match serde_json::from_value(result) {
1219 Ok(result) => result,
1220 Err(error) => {
1221 registration.cleanup(event_loop).await;
1222 return Err(error.into());
1223 }
1224 };
1225 let cli_session_id = resume_result
1226 .session_id
1227 .clone()
1228 .unwrap_or_else(|| session_id.clone());
1229 if cli_session_id != session_id {
1230 registration.cleanup(event_loop).await;
1231 return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1232 requested: session_id,
1233 returned: cli_session_id,
1234 })
1235 .into());
1236 }
1237
1238 let skills_reload_start = Instant::now();
1240 if let Err(e) = self
1241 .call(
1242 "session.skills.reload",
1243 Some(serde_json::json!({ "sessionId": session_id })),
1244 )
1245 .await
1246 {
1247 warn!(
1248 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1249 session_id = %session_id,
1250 error = %e,
1251 "Client::resume_session skills reload request failed"
1252 );
1253 } else {
1254 tracing::debug!(
1255 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1256 session_id = %session_id,
1257 "Client::resume_session skills reload request completed successfully"
1258 );
1259 }
1260
1261 *capabilities.write() = resume_result.capabilities.unwrap_or_default();
1262 {
1267 let mut snapshots = open_canvases.write();
1268 for snapshot in resume_result.open_canvases.unwrap_or_default() {
1269 upsert_open_canvas_snapshot(&mut snapshots, snapshot);
1270 }
1271 }
1272
1273 tracing::debug!(
1274 elapsed_ms = total_start.elapsed().as_millis(),
1275 session_id = %session_id,
1276 "Client::resume_session complete"
1277 );
1278 registration.disarm();
1279 let session = Session {
1280 id: session_id,
1281 cwd: self.cwd().clone(),
1282 workspace_path: resume_result.workspace_path,
1283 remote_url: resume_result.remote_url,
1284 client: self.clone(),
1285 event_loop: ParkingLotMutex::new(Some(event_loop)),
1286 shutdown,
1287 idle_waiter,
1288 capabilities,
1289 open_canvases,
1290 event_tx,
1291 };
1292 apply_mode_post_create_patch(
1293 &session,
1294 mode,
1295 opt_skip_custom_instructions,
1296 opt_custom_agents_local_only,
1297 opt_coauthor_enabled,
1298 opt_manage_schedule_enabled,
1299 )
1300 .await?;
1301 Ok(session)
1302 }
1303}
1304
1305type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1306
1307async fn apply_mode_post_create_patch(
1308 session: &Session,
1309 mode: crate::ClientMode,
1310 opt_skip_custom_instructions: Option<bool>,
1311 opt_custom_agents_local_only: Option<bool>,
1312 opt_coauthor_enabled: Option<bool>,
1313 opt_manage_schedule_enabled: Option<bool>,
1314) -> Result<(), Error> {
1315 use crate::generated::api_types::SessionUpdateOptionsParams;
1316 let mut patch = SessionUpdateOptionsParams::default();
1317 let should_send = if mode == crate::ClientMode::Empty {
1318 patch.skip_custom_instructions = Some(opt_skip_custom_instructions.unwrap_or(true));
1319 patch.custom_agents_local_only = Some(opt_custom_agents_local_only.unwrap_or(true));
1320 patch.coauthor_enabled = Some(opt_coauthor_enabled.unwrap_or(false));
1321 patch.manage_schedule_enabled = Some(opt_manage_schedule_enabled.unwrap_or(false));
1322 patch.installed_plugins = Some(Vec::new());
1323 true
1324 } else {
1325 let mut any = false;
1326 if let Some(v) = opt_skip_custom_instructions {
1327 patch.skip_custom_instructions = Some(v);
1328 any = true;
1329 }
1330 if let Some(v) = opt_custom_agents_local_only {
1331 patch.custom_agents_local_only = Some(v);
1332 any = true;
1333 }
1334 if let Some(v) = opt_coauthor_enabled {
1335 patch.coauthor_enabled = Some(v);
1336 any = true;
1337 }
1338 if let Some(v) = opt_manage_schedule_enabled {
1339 patch.manage_schedule_enabled = Some(v);
1340 any = true;
1341 }
1342 any
1343 };
1344 if !should_send {
1345 return Ok(());
1346 }
1347 if let Err(error) = session.rpc().options().update(patch).await {
1348 let _ = session.disconnect().await;
1349 return Err(error);
1350 }
1351 Ok(())
1352}
1353
1354fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1355 let map = match commands {
1356 Some(commands) => commands
1357 .iter()
1358 .filter(|cmd| !cmd.name.is_empty())
1359 .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1360 .collect(),
1361 None => HashMap::new(),
1362 };
1363 Arc::new(map)
1364}
1365
1366fn upsert_open_canvas_snapshot(
1367 snapshots: &mut Vec<OpenCanvasInstance>,
1368 snapshot: OpenCanvasInstance,
1369) {
1370 if let Some(existing) = snapshots
1371 .iter_mut()
1372 .find(|open| open.instance_id == snapshot.instance_id)
1373 {
1374 *existing = snapshot;
1375 } else {
1376 snapshots.push(snapshot);
1377 }
1378}
1379
1380fn remove_open_canvas_snapshot(snapshots: &mut Vec<OpenCanvasInstance>, instance_id: &str) {
1381 snapshots.retain(|open| open.instance_id != instance_id);
1382}
1383
1384#[allow(clippy::too_many_arguments)]
1385fn spawn_event_loop(
1386 session_id: SessionId,
1387 client: Client,
1388 handlers: SessionHandlers,
1389 hooks: Option<Arc<dyn SessionHooks>>,
1390 transforms: Option<Arc<dyn SystemMessageTransform>>,
1391 command_handlers: Arc<CommandHandlerMap>,
1392 canvas_handler: Option<Arc<dyn CanvasHandler>>,
1393 session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1394 channels: crate::router::SessionChannels,
1395 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1396 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1397 open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1398 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1399 shutdown: CancellationToken,
1400) -> JoinHandle<()> {
1401 let crate::router::SessionChannels {
1402 mut notifications,
1403 mut requests,
1404 } = channels;
1405
1406 let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1407 tokio::spawn(
1408 async move {
1409 loop {
1410 tokio::select! {
1421 _ = shutdown.cancelled() => break,
1422 Some(notification) = notifications.recv() => {
1423 handle_notification(
1424 &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
1425 ).await;
1426 }
1427 Some(request) = requests.recv() => {
1428 let ctx = RequestDispatchContext {
1429 client: &client,
1430 handlers: &handlers,
1431 hooks: hooks.as_deref(),
1432 transforms: transforms.as_deref(),
1433 canvas_handler: canvas_handler.as_ref(),
1434 session_fs_provider: session_fs_provider.as_ref(),
1435 };
1436 handle_request(&session_id, ctx, request).await;
1437 }
1438 else => break,
1439 }
1440 }
1441 if let Some(waiter) = idle_waiter.lock().take() {
1444 let _ = waiter
1445 .tx
1446 .send(Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()));
1447 }
1448 }
1449 .instrument(span),
1450 )
1451}
1452
1453fn extract_request_id(data: &Value) -> Option<RequestId> {
1454 data.get("requestId")
1455 .and_then(|v| v.as_str())
1456 .filter(|s| !s.is_empty())
1457 .map(RequestId::new)
1458}
1459
1460fn notification_permission_payload(result: &PermissionResult) -> Option<Value> {
1465 match result {
1466 PermissionResult::NoResult => None,
1467 PermissionResult::Decision(decision) => Some(
1468 serde_json::to_value(decision).expect("serializing permission decision should succeed"),
1469 ),
1470 }
1471}
1472
1473fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1474 let message = message.into();
1475 ToolResult::Expanded(ToolResultExpanded {
1476 text_result_for_llm: message.clone(),
1477 result_type: "failure".to_string(),
1478 binary_results_for_llm: None,
1479 session_log: None,
1480 error: Some(message),
1481 tool_telemetry: None,
1482 })
1483}
1484
1485#[allow(clippy::too_many_arguments)]
1487async fn handle_notification(
1488 session_id: &SessionId,
1489 client: &Client,
1490 handlers: &SessionHandlers,
1491 command_handlers: &Arc<CommandHandlerMap>,
1492 notification: SessionEventNotification,
1493 idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1494 capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1495 open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1496 event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1497) {
1498 let dispatch_start = Instant::now();
1499 let event = notification.event.clone();
1500 let event_type = event.parsed_type();
1501 if event_type == SessionEventType::PermissionRequested {
1502 tracing::debug!(
1503 session_id = %session_id,
1504 event_type = %event.event_type,
1505 "Session::handle_notification permission request received"
1506 );
1507 }
1508
1509 match event_type {
1512 SessionEventType::AssistantMessage
1513 | SessionEventType::SessionIdle
1514 | SessionEventType::SessionError => {
1515 let mut guard = idle_waiter.lock();
1516 if let Some(waiter) = guard.as_mut() {
1517 match event_type {
1518 SessionEventType::AssistantMessage => {
1519 if !waiter.first_assistant_message_seen {
1520 waiter.first_assistant_message_seen = true;
1521 tracing::debug!(
1522 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1523 session_id = %session_id,
1524 "Session::send_and_wait first assistant message"
1525 );
1526 }
1527 waiter.last_assistant_message = Some(event.clone());
1528 }
1529 SessionEventType::SessionIdle | SessionEventType::SessionError => {
1530 if let Some(waiter) = guard.take() {
1531 if event_type == SessionEventType::SessionIdle {
1532 tracing::debug!(
1533 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1534 session_id = %session_id,
1535 "Session::send_and_wait idle received"
1536 );
1537 let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1538 } else {
1539 let error_msg = event
1540 .typed_data::<SessionErrorData>()
1541 .map(|d| d.message)
1542 .or_else(|| {
1543 event
1544 .data
1545 .get("message")
1546 .and_then(|v| v.as_str())
1547 .map(|s| s.to_string())
1548 })
1549 .unwrap_or_else(|| "session error".to_string());
1550 let _ = waiter.tx.send(Err(Error::with_message(
1551 ErrorKind::Session(SessionErrorKind::AgentError),
1552 error_msg,
1553 )));
1554 }
1555 }
1556 }
1557 _ => {}
1558 }
1559 }
1560 }
1561 _ => {}
1562 }
1563
1564 if event_type == SessionEventType::CapabilitiesChanged {
1568 match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1569 Ok(changed) => *capabilities.write() = changed,
1570 Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1571 }
1572 }
1573 if event_type == SessionEventType::SessionCanvasOpened {
1574 match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
1575 Ok(open_canvas) => {
1576 upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
1577 }
1578 Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
1579 }
1580 }
1581 if event_type == SessionEventType::SessionCanvasClosed {
1582 match serde_json::from_value::<SessionCanvasClosedData>(notification.event.data.clone()) {
1583 Ok(closed) => {
1584 if closed.instance_id.is_empty() {
1585 warn!("failed to deserialize session.canvas.closed payload");
1586 } else {
1587 remove_open_canvas_snapshot(&mut open_canvases.write(), &closed.instance_id);
1588 }
1589 }
1590 Err(e) => warn!(error = %e, "failed to deserialize session.canvas.closed payload"),
1591 }
1592 }
1593
1594 let _ = event_tx.send(event.clone());
1598
1599 tracing::debug!(
1600 elapsed_ms = dispatch_start.elapsed().as_millis(),
1601 session_id = %session_id,
1602 event_type = %notification.event.event_type,
1603 "Session::handle_notification dispatch"
1604 );
1605
1606 match event_type {
1609 SessionEventType::PermissionRequested => {
1610 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1611 return;
1612 };
1613 if notification
1617 .event
1618 .data
1619 .get("resolvedByHook")
1620 .and_then(|v| v.as_bool())
1621 .unwrap_or(false)
1622 {
1623 return;
1624 }
1625 let Some(permission_handler) = handlers.permission.clone() else {
1629 return;
1630 };
1631 let client = client.clone();
1632 let sid = session_id.clone();
1633 let data: PermissionRequestData =
1634 serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1635 PermissionRequestData {
1636 kind: None,
1637 tool_call_id: None,
1638 extra: notification.event.data.clone(),
1639 }
1640 });
1641 let span = tracing::error_span!(
1642 "permission_request_handler",
1643 session_id = %sid,
1644 request_id = %request_id
1645 );
1646 tokio::spawn(
1647 async move {
1648 let handler_start = Instant::now();
1649 let result = permission_handler
1650 .handle(sid.clone(), request_id.clone(), data)
1651 .await;
1652 tracing::debug!(
1653 elapsed_ms = handler_start.elapsed().as_millis(),
1654 session_id = %sid,
1655 request_id = %request_id,
1656 "PermissionHandler::handle dispatch"
1657 );
1658 let Some(result_value) = notification_permission_payload(&result) else {
1659 return;
1663 };
1664 let rpc_start = Instant::now();
1665 let _ = client
1666 .call(
1667 "session.permissions.handlePendingPermissionRequest",
1668 Some(serde_json::json!({
1669 "sessionId": sid,
1670 "requestId": request_id,
1671 "result": result_value,
1672 })),
1673 )
1674 .await;
1675 tracing::debug!(
1676 elapsed_ms = rpc_start.elapsed().as_millis(),
1677 session_id = %sid,
1678 request_id = %request_id,
1679 "Session::handle_notification response sent successfully"
1680 );
1681 }
1682 .instrument(span),
1683 );
1684 }
1685 SessionEventType::ExternalToolRequested => {
1686 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1687 return;
1688 };
1689 let data: ExternalToolRequestedData =
1690 match serde_json::from_value(notification.event.data.clone()) {
1691 Ok(d) => d,
1692 Err(e) => {
1693 warn!(error = %e, "failed to deserialize external_tool.requested");
1694 let client = client.clone();
1695 let sid = session_id.clone();
1696 let span = tracing::error_span!(
1697 "external_tool_deserialize_error",
1698 session_id = %sid,
1699 request_id = %request_id
1700 );
1701 tokio::spawn(
1702 async move {
1703 let rpc_start = Instant::now();
1704 let _ = client
1705 .call(
1706 "session.tools.handlePendingToolCall",
1707 Some(serde_json::json!({
1708 "sessionId": sid,
1709 "requestId": request_id,
1710 "error": format!("Failed to deserialize tool request: {e}"),
1711 })),
1712 )
1713 .await;
1714 tracing::debug!(
1715 elapsed_ms = rpc_start.elapsed().as_millis(),
1716 session_id = %sid,
1717 request_id = %request_id,
1718 "Session::handle_notification response sent successfully"
1719 );
1720 }
1721 .instrument(span),
1722 );
1723 return;
1724 }
1725 };
1726 let tool_handler = if data.tool_name.is_empty() {
1730 None
1731 } else {
1732 handlers.tools.get(&data.tool_name).cloned()
1733 };
1734 let Some(tool_handler) = tool_handler else {
1735 return;
1736 };
1737 let client = client.clone();
1738 let sid = session_id.clone();
1739 let span = tracing::error_span!(
1740 "external_tool_handler",
1741 session_id = %sid,
1742 request_id = %request_id
1743 );
1744 tokio::spawn(
1745 async move {
1746 if data.tool_call_id.is_empty() {
1751 let error_msg = "Missing toolCallId";
1752 let rpc_start = Instant::now();
1753 let _ = client
1754 .call(
1755 "session.tools.handlePendingToolCall",
1756 Some(serde_json::json!({
1757 "sessionId": sid,
1758 "requestId": request_id,
1759 "error": error_msg,
1760 })),
1761 )
1762 .await;
1763 tracing::debug!(
1764 elapsed_ms = rpc_start.elapsed().as_millis(),
1765 session_id = %sid,
1766 request_id = %request_id,
1767 "Session::handle_notification response sent successfully"
1768 );
1769 return;
1770 }
1771 let tool_call_id = data.tool_call_id.clone();
1772 let tool_name = data.tool_name.clone();
1773 let invocation = ToolInvocation {
1774 session_id: sid.clone(),
1775 tool_call_id: data.tool_call_id,
1776 tool_name: data.tool_name,
1777 arguments: data
1778 .arguments
1779 .unwrap_or(Value::Object(serde_json::Map::new())),
1780 traceparent: data.traceparent,
1781 tracestate: data.tracestate,
1782 };
1783 let handler_start = Instant::now();
1784 let tool_result = match tool_handler.call(invocation).await {
1785 Ok(r) => r,
1786 Err(e) => tool_failure_result(e.to_string()),
1787 };
1788 tracing::debug!(
1789 elapsed_ms = handler_start.elapsed().as_millis(),
1790 session_id = %sid,
1791 request_id = %request_id,
1792 tool_call_id = %tool_call_id,
1793 tool_name = %tool_name,
1794 "ToolHandler::call dispatch"
1795 );
1796 let result_value = serde_json::to_value(tool_result).unwrap_or(Value::Null);
1797 let rpc_start = Instant::now();
1798 let _ = client
1799 .call(
1800 "session.tools.handlePendingToolCall",
1801 Some(serde_json::json!({
1802 "sessionId": sid,
1803 "requestId": request_id,
1804 "result": result_value,
1805 })),
1806 )
1807 .await;
1808 tracing::debug!(
1809 elapsed_ms = rpc_start.elapsed().as_millis(),
1810 session_id = %sid,
1811 request_id = %request_id,
1812 tool_call_id = %tool_call_id,
1813 tool_name = %tool_name,
1814 "Session::handle_notification response sent successfully"
1815 );
1816 }
1817 .instrument(span),
1818 );
1819 }
1820 SessionEventType::UserInputRequested => {
1821 }
1828 SessionEventType::ElicitationRequested => {
1829 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1830 return;
1831 };
1832 let Some(elicitation_handler) = handlers.elicitation.clone() else {
1836 return;
1837 };
1838 let elicitation_data: ElicitationRequestedData =
1839 match serde_json::from_value(notification.event.data.clone()) {
1840 Ok(d) => d,
1841 Err(e) => {
1842 warn!(error = %e, "failed to deserialize elicitation request");
1843 return;
1844 }
1845 };
1846 let request = ElicitationRequest {
1847 message: elicitation_data.message,
1848 requested_schema: elicitation_data
1849 .requested_schema
1850 .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1851 mode: elicitation_data.mode.map(|m| match m {
1852 crate::generated::session_events::ElicitationRequestedMode::Form => {
1853 crate::types::ElicitationMode::Form
1854 }
1855 crate::generated::session_events::ElicitationRequestedMode::Url => {
1856 crate::types::ElicitationMode::Url
1857 }
1858 _ => crate::types::ElicitationMode::Unknown,
1859 }),
1860 elicitation_source: elicitation_data.elicitation_source,
1861 url: elicitation_data.url,
1862 };
1863 let client = client.clone();
1864 let sid = session_id.clone();
1865 let span = tracing::error_span!(
1866 "elicitation_request_handler",
1867 session_id = %sid,
1868 request_id = %request_id
1869 );
1870 tokio::spawn(
1871 async move {
1872 let cancel = ElicitationResult {
1873 action: "cancel".to_string(),
1874 content: None,
1875 };
1876 let handler_task = tokio::spawn({
1878 let sid = sid.clone();
1879 let request_id = request_id.clone();
1880 let span = tracing::error_span!(
1881 "elicitation_callback",
1882 session_id = %sid,
1883 request_id = %request_id
1884 );
1885 async move {
1886 let handler_start = Instant::now();
1887 let response = elicitation_handler
1888 .handle(sid.clone(), request_id.clone(), request)
1889 .await;
1890 tracing::debug!(
1891 elapsed_ms = handler_start.elapsed().as_millis(),
1892 session_id = %sid,
1893 request_id = %request_id,
1894 "ElicitationHandler::handle dispatch"
1895 );
1896 response
1897 }
1898 .instrument(span)
1899 });
1900 let result = match handler_task.await {
1901 Ok(r) => r,
1902 Err(_) => cancel.clone(),
1903 };
1904 let rpc_start = Instant::now();
1905 if let Err(e) = client
1906 .call(
1907 "session.ui.handlePendingElicitation",
1908 Some(serde_json::json!({
1909 "sessionId": sid,
1910 "requestId": request_id,
1911 "result": result,
1912 })),
1913 )
1914 .await
1915 {
1916 warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1918 let _ = client
1919 .call(
1920 "session.ui.handlePendingElicitation",
1921 Some(serde_json::json!({
1922 "sessionId": sid,
1923 "requestId": request_id,
1924 "result": cancel,
1925 })),
1926 )
1927 .await;
1928 } else {
1929 tracing::debug!(
1930 elapsed_ms = rpc_start.elapsed().as_millis(),
1931 session_id = %sid,
1932 request_id = %request_id,
1933 "Session::handle_notification response sent successfully"
1934 );
1935 }
1936 }
1937 .instrument(span),
1938 );
1939 }
1940 SessionEventType::CommandExecute => {
1941 let data: CommandExecuteData =
1942 match serde_json::from_value(notification.event.data.clone()) {
1943 Ok(d) => d,
1944 Err(e) => {
1945 warn!(error = %e, "failed to deserialize command.execute");
1946 return;
1947 }
1948 };
1949 let client = client.clone();
1950 let command_handlers = command_handlers.clone();
1951 let sid = session_id.clone();
1952 let span = tracing::error_span!("command_handler", session_id = %sid);
1953 tokio::spawn(
1954 async move {
1955 let request_id = data.request_id;
1956 let ack_error = match command_handlers.get(&data.command_name).cloned() {
1957 None => Some(format!("Unknown command: {}", data.command_name)),
1958 Some(handler) => {
1959 let command_name = data.command_name.clone();
1960 let ctx = CommandContext {
1961 session_id: sid.clone(),
1962 command: data.command,
1963 command_name: data.command_name,
1964 args: data.args,
1965 };
1966 let handler_start = Instant::now();
1967 let result = handler.on_command(ctx).await;
1968 tracing::debug!(
1969 elapsed_ms = handler_start.elapsed().as_millis(),
1970 session_id = %sid,
1971 request_id = %request_id,
1972 command_name = %command_name,
1973 "CommandHandler::call dispatch"
1974 );
1975 match result {
1976 Ok(()) => None,
1977 Err(e) => Some(e.to_string()),
1978 }
1979 }
1980 };
1981 let mut params = serde_json::json!({
1982 "sessionId": sid,
1983 "requestId": request_id,
1984 });
1985 if let Some(error_msg) = ack_error {
1986 params["error"] = serde_json::Value::String(error_msg);
1987 }
1988 let rpc_start = Instant::now();
1989 let _ = client
1990 .call("session.commands.handlePendingCommand", Some(params))
1991 .await;
1992 tracing::debug!(
1993 elapsed_ms = rpc_start.elapsed().as_millis(),
1994 session_id = %sid,
1995 request_id = %request_id,
1996 "Session::handle_notification response sent successfully"
1997 );
1998 }
1999 .instrument(span),
2000 );
2001 }
2002 _ => {}
2003 }
2004}
2005
2006struct RequestDispatchContext<'a> {
2007 client: &'a Client,
2008 handlers: &'a SessionHandlers,
2009 hooks: Option<&'a dyn SessionHooks>,
2010 transforms: Option<&'a dyn SystemMessageTransform>,
2011 canvas_handler: Option<&'a Arc<dyn CanvasHandler>>,
2012 session_fs_provider: Option<&'a Arc<dyn SessionFsProvider>>,
2013}
2014
2015async fn handle_request(
2017 session_id: &SessionId,
2018 ctx: RequestDispatchContext<'_>,
2019 request: crate::JsonRpcRequest,
2020) {
2021 let sid = session_id.clone();
2022 let client = ctx.client;
2023 let handlers = ctx.handlers;
2024 let hooks = ctx.hooks;
2025 let transforms = ctx.transforms;
2026 let canvas_handler = ctx.canvas_handler;
2027 let session_fs_provider = ctx.session_fs_provider;
2028
2029 if request.method.starts_with("sessionFs.") {
2030 crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
2031 return;
2032 }
2033
2034 if request.method.starts_with("canvas.") {
2035 crate::canvas_dispatch::dispatch(client, canvas_handler, request).await;
2036 return;
2037 }
2038
2039 match request.method.as_str() {
2040 "hooks.invoke" => {
2041 let params = request.params.as_ref();
2042 let hook_type = params
2043 .and_then(|p| p.get("hookType"))
2044 .and_then(|v| v.as_str())
2045 .unwrap_or("");
2046 let input = params
2047 .and_then(|p| p.get("input"))
2048 .cloned()
2049 .unwrap_or(Value::Object(Default::default()));
2050
2051 let rpc_result = if let Some(hooks) = hooks {
2052 match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
2053 Ok(output) => output,
2054 Err(e) => {
2055 warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
2056 serde_json::json!({ "output": {} })
2057 }
2058 }
2059 } else {
2060 serde_json::json!({ "output": {} })
2061 };
2062
2063 let rpc_response = JsonRpcResponse {
2064 jsonrpc: "2.0".to_string(),
2065 id: request.id,
2066 result: Some(rpc_result),
2067 error: None,
2068 };
2069 let _ = client.send_response(&rpc_response).await;
2070 }
2071
2072 "userInput.request" => {
2073 let params = request.params.as_ref();
2074 let Some(question) = params
2075 .and_then(|p| p.get("question"))
2076 .and_then(|v| v.as_str())
2077 else {
2078 warn!("userInput.request missing 'question' field");
2079 let rpc_response = JsonRpcResponse {
2080 jsonrpc: "2.0".to_string(),
2081 id: request.id,
2082 result: None,
2083 error: Some(crate::JsonRpcError {
2084 code: error_codes::INVALID_PARAMS,
2085 message: "missing required field: question".to_string(),
2086 data: None,
2087 }),
2088 };
2089 let _ = client.send_response(&rpc_response).await;
2090 return;
2091 };
2092 let question = question.to_string();
2093 let choices = params
2094 .and_then(|p| p.get("choices"))
2095 .and_then(|v| v.as_array())
2096 .map(|arr| {
2097 arr.iter()
2098 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2099 .collect()
2100 });
2101 let allow_freeform = params
2102 .and_then(|p| p.get("allowFreeform"))
2103 .and_then(|v| v.as_bool());
2104
2105 let handler_start = Instant::now();
2106 let response = if let Some(user_input_handler) = handlers.user_input.as_ref() {
2107 user_input_handler
2108 .handle(sid.clone(), question, choices, allow_freeform)
2109 .await
2110 } else {
2111 None
2112 };
2113 tracing::debug!(
2114 elapsed_ms = handler_start.elapsed().as_millis(),
2115 session_id = %sid,
2116 "UserInputHandler::handle dispatch"
2117 );
2118
2119 let rpc_result = match response {
2120 Some(UserInputResponse {
2121 answer,
2122 was_freeform,
2123 }) => serde_json::json!({
2124 "answer": answer,
2125 "wasFreeform": was_freeform,
2126 }),
2127 None => serde_json::json!({ "noResponse": true }),
2128 };
2129 let rpc_response = JsonRpcResponse {
2130 jsonrpc: "2.0".to_string(),
2131 id: request.id,
2132 result: Some(rpc_result),
2133 error: None,
2134 };
2135 let _ = client.send_response(&rpc_response).await;
2136 }
2137
2138 "exitPlanMode.request" => {
2139 let params = request
2140 .params
2141 .as_ref()
2142 .cloned()
2143 .unwrap_or(Value::Object(serde_json::Map::new()));
2144 let data: ExitPlanModeData = match serde_json::from_value(params) {
2145 Ok(d) => d,
2146 Err(e) => {
2147 warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
2148 ExitPlanModeData::default()
2149 }
2150 };
2151
2152 let rpc_result = if let Some(exit_plan_handler) = handlers.exit_plan_mode.as_ref() {
2153 let result = exit_plan_handler.handle(sid, data).await;
2154 serde_json::to_value(result).expect("ExitPlanModeResult serialization cannot fail")
2155 } else {
2156 serde_json::json!({ "approved": true })
2157 };
2158 let rpc_response = JsonRpcResponse {
2159 jsonrpc: "2.0".to_string(),
2160 id: request.id,
2161 result: Some(rpc_result),
2162 error: None,
2163 };
2164 let _ = client.send_response(&rpc_response).await;
2165 }
2166
2167 "autoModeSwitch.request" => {
2168 let error_code = request
2169 .params
2170 .as_ref()
2171 .and_then(|p| p.get("errorCode"))
2172 .and_then(|v| v.as_str())
2173 .map(|s| s.to_string());
2174 let retry_after_seconds = request
2175 .params
2176 .as_ref()
2177 .and_then(|p| p.get("retryAfterSeconds"))
2178 .and_then(|v| v.as_f64());
2179
2180 let answer = if let Some(auto_mode_handler) = handlers.auto_mode_switch.as_ref() {
2181 auto_mode_handler
2182 .handle(sid, error_code, retry_after_seconds)
2183 .await
2184 } else {
2185 AutoModeSwitchResponse::No
2186 };
2187 let rpc_response = JsonRpcResponse {
2188 jsonrpc: "2.0".to_string(),
2189 id: request.id,
2190 result: Some(serde_json::json!({ "response": answer })),
2191 error: None,
2192 };
2193 let _ = client.send_response(&rpc_response).await;
2194 }
2195
2196 "systemMessage.transform" => {
2197 let params = request.params.as_ref();
2198 let sections: HashMap<String, crate::transforms::TransformSection> =
2199 match params.and_then(|p| p.get("sections")) {
2200 Some(v) => match serde_json::from_value(v.clone()) {
2201 Ok(s) => s,
2202 Err(e) => {
2203 let _ = send_error_response(
2204 client,
2205 request.id,
2206 error_codes::INVALID_PARAMS,
2207 &format!("invalid sections: {e}"),
2208 )
2209 .await;
2210 return;
2211 }
2212 },
2213 None => {
2214 let _ = send_error_response(
2215 client,
2216 request.id,
2217 error_codes::INVALID_PARAMS,
2218 "missing sections parameter",
2219 )
2220 .await;
2221 return;
2222 }
2223 };
2224
2225 let rpc_result = if let Some(transforms) = transforms {
2226 let transform_start = Instant::now();
2227 let response =
2228 crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2229 tracing::debug!(
2230 elapsed_ms = transform_start.elapsed().as_millis(),
2231 session_id = %sid,
2232 "SystemMessageTransform::transform_section dispatch"
2233 );
2234 match serde_json::to_value(response) {
2235 Ok(v) => v,
2236 Err(e) => {
2237 warn!(error = %e, "failed to serialize transform response");
2238 serde_json::json!({ "sections": {} })
2239 }
2240 }
2241 } else {
2242 let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2244 serde_json::json!({ "sections": passthrough })
2245 };
2246
2247 let rpc_response = JsonRpcResponse {
2248 jsonrpc: "2.0".to_string(),
2249 id: request.id,
2250 result: Some(rpc_result),
2251 error: None,
2252 };
2253 let _ = client.send_response(&rpc_response).await;
2254 }
2255
2256 method => {
2257 warn!(
2258 method = method,
2259 "unhandled request method in session event loop"
2260 );
2261 let _ = send_error_response(
2262 client,
2263 request.id,
2264 error_codes::METHOD_NOT_FOUND,
2265 &format!("unknown method: {method}"),
2266 )
2267 .await;
2268 }
2269 }
2270}
2271
2272async fn send_error_response(
2273 client: &Client,
2274 id: u64,
2275 code: i32,
2276 message: &str,
2277) -> Result<(), Error> {
2278 let response = JsonRpcResponse {
2279 jsonrpc: "2.0".to_string(),
2280 id,
2281 result: None,
2282 error: Some(crate::JsonRpcError {
2283 code,
2284 message: message.to_string(),
2285 data: None,
2286 }),
2287 };
2288 client.send_response(&response).await
2289}
2290
2291fn apply_transform_sections(
2295 sys_msg: &mut SystemMessageConfig,
2296 transforms: &dyn SystemMessageTransform,
2297) {
2298 sys_msg.mode = Some("customize".to_string());
2299 let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2300 for id in transforms.section_ids() {
2301 sections.entry(id).or_insert_with(|| SectionOverride {
2302 action: Some("transform".to_string()),
2303 content: None,
2304 });
2305 }
2306}
2307
2308fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2309 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2310 apply_transform_sections(sys_msg, transforms);
2311}
2312
2313fn inject_transform_sections_resume(
2314 config: &mut ResumeSessionConfig,
2315 transforms: &dyn SystemMessageTransform,
2316) {
2317 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2318 apply_transform_sections(sys_msg, transforms);
2319}
2320
2321#[cfg(test)]
2322mod tests {
2323 use serde_json::json;
2324
2325 use super::notification_permission_payload;
2326 use crate::handler::PermissionResult;
2327
2328 #[test]
2329 fn notification_payload_suppresses_no_result() {
2330 assert!(notification_permission_payload(&PermissionResult::NoResult).is_none());
2331 }
2332
2333 #[test]
2334 fn notification_payload_serializes_decisions() {
2335 assert_eq!(
2336 notification_permission_payload(&PermissionResult::approve_once()),
2337 Some(json!({ "kind": "approve-once" }))
2338 );
2339 assert_eq!(
2340 notification_permission_payload(&PermissionResult::reject(None)),
2341 Some(json!({ "kind": "reject" }))
2342 );
2343 assert_eq!(
2344 notification_permission_payload(&PermissionResult::reject(Some("bad".to_string()))),
2345 Some(json!({ "kind": "reject", "feedback": "bad" }))
2346 );
2347 assert_eq!(
2348 notification_permission_payload(&PermissionResult::user_not_available()),
2349 Some(json!({ "kind": "user-not-available" }))
2350 );
2351 }
2352}