1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::generated::api_types::{
14 LogRequest, ModelSwitchToRequest, PermissionDecision, PermissionDecisionApproveOnce,
15 PermissionDecisionApproveOnceKind, PermissionDecisionReject, PermissionDecisionRejectKind,
16};
17use crate::generated::session_events::{
18 CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, SessionErrorData,
19 SessionEventType,
20};
21use crate::handler::{
22 AutoModeSwitchResponse, HandlerEvent, HandlerResponse, PermissionResult, SessionHandler,
23 UserInputResponse,
24};
25use crate::hooks::SessionHooks;
26use crate::session_fs::SessionFsProvider;
27use crate::trace_context::inject_trace_context;
28use crate::transforms::SystemMessageTransform;
29use crate::types::{
30 CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
31 ElicitationResult, ExitPlanModeData, GetMessagesResponse, InputOptions, MessageOptions,
32 PermissionRequestData, RequestId, ResumeSessionConfig, SectionOverride, SessionCapabilities,
33 SessionConfig, SessionEvent, SessionId, SetModelOptions, SystemMessageConfig, ToolInvocation,
34 ToolResult, ToolResultExpanded, ToolResultResponse, TraceContext,
35 ensure_attachment_display_names,
36};
37use crate::{Client, Error, JsonRpcResponse, SessionError, SessionEventNotification, error_codes};
38
39struct IdleWaiter {
41 tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
42 last_assistant_message: Option<SessionEvent>,
43 started_at: Instant,
44 first_assistant_message_seen: bool,
45}
46
47struct WaiterGuard {
59 slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
60}
61
62impl Drop for WaiterGuard {
63 fn drop(&mut self) {
64 self.slot.lock().take();
65 }
66}
67
68struct PendingSessionRegistration {
69 client: Client,
70 session_id: SessionId,
71 shutdown: CancellationToken,
72 disarmed: bool,
73}
74
75impl PendingSessionRegistration {
76 fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
77 Self {
78 client,
79 session_id,
80 shutdown,
81 disarmed: false,
82 }
83 }
84
85 async fn cleanup(mut self, event_loop: JoinHandle<()>) {
86 self.shutdown.cancel();
87 let _ = event_loop.await;
88 self.client.unregister_session(&self.session_id);
89 self.disarmed = true;
90 }
91
92 fn disarm(&mut self) {
93 self.disarmed = true;
94 }
95}
96
97impl Drop for PendingSessionRegistration {
98 fn drop(&mut self) {
99 if !self.disarmed {
100 self.shutdown.cancel();
101 self.client.unregister_session(&self.session_id);
102 }
103 }
104}
105
106pub struct Session {
118 id: SessionId,
119 cwd: PathBuf,
120 workspace_path: Option<PathBuf>,
121 remote_url: Option<String>,
122 client: Client,
123 event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
128 shutdown: CancellationToken,
142 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
149 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
151 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
153}
154
155impl Session {
156 pub fn id(&self) -> &SessionId {
158 &self.id
159 }
160
161 pub fn cwd(&self) -> &PathBuf {
163 &self.cwd
164 }
165
166 pub fn workspace_path(&self) -> Option<&Path> {
168 self.workspace_path.as_deref()
169 }
170
171 pub fn remote_url(&self) -> Option<&str> {
173 self.remote_url.as_deref()
174 }
175
176 pub fn capabilities(&self) -> SessionCapabilities {
181 self.capabilities.read().clone()
182 }
183
184 pub fn cancellation_token(&self) -> CancellationToken {
211 self.shutdown.child_token()
212 }
213
214 pub fn subscribe(&self) -> crate::subscription::EventSubscription {
253 crate::subscription::EventSubscription::new(self.event_tx.subscribe())
254 }
255
256 pub fn client(&self) -> &Client {
258 &self.client
259 }
260
261 pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
272 crate::generated::rpc::SessionRpc { session: self }
273 }
274
275 pub async fn stop_event_loop(&self) {
283 self.shutdown.cancel();
284 let handle = self.event_loop.lock().take();
285 if let Some(handle) = handle {
286 let _ = handle.await;
287 }
288 if let Some(waiter) = self.idle_waiter.lock().take() {
290 let _ = waiter
291 .tx
292 .send(Err(Error::Session(SessionError::EventLoopClosed)));
293 }
294 }
295
296 pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
321 if self.idle_waiter.lock().is_some() {
322 return Err(Error::Session(SessionError::SendWhileWaiting));
323 }
324 self.send_inner(opts.into()).await
325 }
326
327 async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
328 let mut params = serde_json::json!({
329 "sessionId": self.id,
330 "prompt": opts.prompt,
331 });
332 if let Some(m) = opts.mode {
333 params["mode"] = serde_json::to_value(m)?;
334 }
335 if let Some(mut a) = opts.attachments {
336 ensure_attachment_display_names(&mut a);
337 params["attachments"] = serde_json::to_value(a)?;
338 }
339 if let Some(headers) = opts.request_headers
340 && !headers.is_empty()
341 {
342 params["requestHeaders"] = serde_json::to_value(headers)?;
343 }
344 let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
345 TraceContext {
346 traceparent: opts.traceparent,
347 tracestate: opts.tracestate,
348 }
349 } else {
350 self.client.resolve_trace_context().await
351 };
352 inject_trace_context(&mut params, &trace_ctx);
353 let rpc_start = Instant::now();
354 let result = self.client.call("session.send", Some(params)).await?;
355 let message_id = result
356 .get("messageId")
357 .and_then(|v| v.as_str())
358 .map(|s| s.to_string())
359 .unwrap_or_default();
360 tracing::debug!(
361 elapsed_ms = rpc_start.elapsed().as_millis(),
362 session_id = %self.id,
363 message_id = %message_id,
364 "Session::send completed successfully"
365 );
366 Ok(message_id)
367 }
368
369 pub async fn send_and_wait(
389 &self,
390 opts: impl Into<MessageOptions>,
391 ) -> Result<Option<SessionEvent>, Error> {
392 let total_start = Instant::now();
393 let opts = opts.into();
394 let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
395 let (tx, rx) = oneshot::channel();
396
397 {
398 let mut guard = self.idle_waiter.lock();
399 if guard.is_some() {
400 return Err(Error::Session(SessionError::SendWhileWaiting));
401 }
402 *guard = Some(IdleWaiter {
403 tx,
404 last_assistant_message: None,
405 started_at: total_start,
406 first_assistant_message_seen: false,
407 });
408 }
409
410 let _waiter_guard = WaiterGuard {
415 slot: self.idle_waiter.clone(),
416 };
417
418 let result = tokio::time::timeout(timeout_duration, async {
419 self.send_inner(opts).await?;
420 match rx.await {
421 Ok(result) => result,
422 Err(_) => Err(Error::Session(SessionError::EventLoopClosed)),
423 }
424 })
425 .await;
426
427 match result {
428 Ok(inner) => {
429 tracing::debug!(
430 elapsed_ms = total_start.elapsed().as_millis(),
431 session_id = %self.id,
432 completed_by = if inner.is_ok() { "idle" } else { "error" },
433 "Session::send_and_wait complete"
434 );
435 inner
436 }
437 Err(_) => {
438 tracing::warn!(
439 elapsed_ms = total_start.elapsed().as_millis(),
440 session_id = %self.id,
441 completed_by = "timeout",
442 "Session::send_and_wait failed"
443 );
444 Err(Error::Session(SessionError::Timeout(timeout_duration)))
445 }
446 }
447 }
448
449 pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
451 let result = self
452 .client
453 .call(
454 "session.getMessages",
455 Some(serde_json::json!({ "sessionId": self.id })),
456 )
457 .await?;
458 let response: GetMessagesResponse = serde_json::from_value(result)?;
459 Ok(response.events)
460 }
461
462 pub async fn abort(&self) -> Result<(), Error> {
470 self.client
471 .call(
472 "session.abort",
473 Some(serde_json::json!({ "sessionId": self.id })),
474 )
475 .await?;
476 Ok(())
477 }
478
479 pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
483 let opts = opts.unwrap_or_default();
484 let request = ModelSwitchToRequest {
485 model_id: model.to_string(),
486 reasoning_effort: opts.reasoning_effort,
487 reasoning_summary: None,
488 model_capabilities: opts.model_capabilities,
489 };
490 self.rpc().model().switch_to(request).await?;
491 Ok(())
492 }
493
494 pub async fn disconnect(&self) -> Result<(), Error> {
511 self.client
512 .call(
513 "session.destroy",
514 Some(serde_json::json!({ "sessionId": self.id })),
515 )
516 .await?;
517 self.stop_event_loop().await;
518 self.client.unregister_session(&self.id);
519 Ok(())
520 }
521
522 pub async fn destroy(&self) -> Result<(), Error> {
528 self.disconnect().await
529 }
530
531 pub async fn log(
535 &self,
536 message: &str,
537 opts: Option<crate::types::LogOptions>,
538 ) -> Result<(), Error> {
539 let opts = opts.unwrap_or_default();
540 let level = match opts.level {
541 Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
542 None => None,
543 };
544 let request = LogRequest {
545 message: message.to_string(),
546 level,
547 ephemeral: opts.ephemeral,
548 r#type: None,
549 tip: None,
550 url: None,
551 };
552 self.rpc().log(request).await?;
553 Ok(())
554 }
555
556 pub fn ui(&self) -> SessionUi<'_> {
562 SessionUi { session: self }
563 }
564
565 fn assert_elicitation(&self) -> Result<(), Error> {
567 if self
568 .capabilities
569 .read()
570 .ui
571 .as_ref()
572 .and_then(|u| u.elicitation)
573 != Some(true)
574 {
575 return Err(Error::Session(SessionError::ElicitationNotSupported));
576 }
577 Ok(())
578 }
579}
580
581impl Drop for Session {
582 fn drop(&mut self) {
583 self.shutdown.cancel();
595 self.client.unregister_session(&self.id);
596 }
597}
598
599pub struct SessionUi<'a> {
606 session: &'a Session,
607}
608
609impl<'a> SessionUi<'a> {
610 pub async fn elicitation(
618 &self,
619 message: &str,
620 schema: Value,
621 ) -> Result<ElicitationResult, Error> {
622 self.session.assert_elicitation()?;
623 let result = self
624 .session
625 .client
626 .call(
627 "session.ui.elicitation",
628 Some(serde_json::json!({
629 "sessionId": self.session.id,
630 "message": message,
631 "requestedSchema": schema,
632 })),
633 )
634 .await?;
635 let elicitation: ElicitationResult = serde_json::from_value(result)?;
636 Ok(elicitation)
637 }
638
639 pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
643 self.session.assert_elicitation()?;
644 let schema = serde_json::json!({
645 "type": "object",
646 "properties": {
647 "confirmed": {
648 "type": "boolean",
649 "default": true,
650 }
651 },
652 "required": ["confirmed"]
653 });
654 let result = self.elicitation(message, schema).await?;
655 Ok(result.action == "accept"
656 && result
657 .content
658 .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
659 == Some(true))
660 }
661
662 pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
666 self.session.assert_elicitation()?;
667 let schema = serde_json::json!({
668 "type": "object",
669 "properties": {
670 "selection": {
671 "type": "string",
672 "enum": options,
673 }
674 },
675 "required": ["selection"]
676 });
677 let result = self.elicitation(message, schema).await?;
678 if result.action != "accept" {
679 return Ok(None);
680 }
681 let selection = result.content.and_then(|c| {
682 c.get("selection")
683 .and_then(|v| v.as_str())
684 .map(String::from)
685 });
686 Ok(selection)
687 }
688
689 pub async fn input(
694 &self,
695 message: &str,
696 options: Option<&InputOptions<'_>>,
697 ) -> Result<Option<String>, Error> {
698 self.session.assert_elicitation()?;
699 let mut field = serde_json::json!({ "type": "string" });
700 if let Some(opts) = options {
701 if let Some(title) = opts.title {
702 field["title"] = Value::String(title.to_string());
703 }
704 if let Some(desc) = opts.description {
705 field["description"] = Value::String(desc.to_string());
706 }
707 if let Some(min) = opts.min_length {
708 field["minLength"] = Value::Number(min.into());
709 }
710 if let Some(max) = opts.max_length {
711 field["maxLength"] = Value::Number(max.into());
712 }
713 if let Some(fmt) = &opts.format {
714 field["format"] = Value::String(fmt.as_str().to_string());
715 }
716 if let Some(default) = opts.default {
717 field["default"] = Value::String(default.to_string());
718 }
719 }
720 let schema = serde_json::json!({
721 "type": "object",
722 "properties": { "value": field },
723 "required": ["value"]
724 });
725 let result = self.elicitation(message, schema).await?;
726 if result.action != "accept" {
727 return Ok(None);
728 }
729 let value = result
730 .content
731 .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
732 Ok(value)
733 }
734}
735
736impl Client {
737 pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
759 let total_start = Instant::now();
760 let handler = config
761 .handler
762 .take()
763 .unwrap_or_else(|| Arc::new(crate::handler::NoopHandler));
764 let hooks = config.hooks_handler.take();
765 let transforms = config.transform.take();
766 let tools_count = config.tools.as_ref().map_or(0, Vec::len);
767 let commands_count = config.commands.as_ref().map_or(0, Vec::len);
768 let has_hooks = hooks.is_some();
769 let command_handlers = build_command_handler_map(config.commands.as_deref());
770 let session_fs_provider = config.session_fs_provider.take();
771 if self.inner.session_fs_configured && session_fs_provider.is_none() {
772 return Err(Error::Session(SessionError::SessionFsProviderRequired));
773 }
774 if self.inner.session_fs_sqlite_declared
775 && let Some(ref provider) = session_fs_provider
776 && provider.sqlite().is_none()
777 {
778 return Err(Error::InvalidConfig(
779 "SessionFs capabilities declare SQLite support but the provider \
780 does not implement SessionFsSqliteProvider"
781 .to_string(),
782 ));
783 }
784
785 if hooks.is_some() && config.hooks.is_none() {
786 config.hooks = Some(true);
787 }
788 if let Some(ref transforms) = transforms {
789 inject_transform_sections(&mut config, transforms.as_ref());
790 }
791 let session_id = config
792 .session_id
793 .clone()
794 .unwrap_or_else(|| SessionId::from(uuid::Uuid::new_v4().to_string()));
795 config.session_id = Some(session_id.clone());
796 let mut params = serde_json::to_value(&config)?;
797 let trace_ctx = self.resolve_trace_context().await;
798 inject_trace_context(&mut params, &trace_ctx);
799
800 let setup_start = Instant::now();
801 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
802 let channels = self.register_session(&session_id);
803 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
804 let shutdown = CancellationToken::new();
805 let (event_tx, _) = tokio::sync::broadcast::channel(512);
806 let event_loop = spawn_event_loop(
807 session_id.clone(),
808 self.clone(),
809 handler,
810 hooks,
811 transforms,
812 command_handlers,
813 session_fs_provider,
814 channels,
815 idle_waiter.clone(),
816 capabilities.clone(),
817 event_tx.clone(),
818 shutdown.clone(),
819 );
820 let mut registration =
821 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
822 tracing::debug!(
823 elapsed_ms = setup_start.elapsed().as_millis(),
824 session_id = %session_id,
825 tools_count,
826 commands_count,
827 has_hooks,
828 "Client::create_session local setup complete"
829 );
830
831 let rpc_start = Instant::now();
832 let result = match self.call("session.create", Some(params)).await {
833 Ok(result) => result,
834 Err(error) => {
835 registration.cleanup(event_loop).await;
836 return Err(error);
837 }
838 };
839 tracing::debug!(
840 elapsed_ms = rpc_start.elapsed().as_millis(),
841 "Client::create_session session creation request completed successfully"
842 );
843 let create_result: CreateSessionResult = match serde_json::from_value(result) {
844 Ok(result) => result,
845 Err(error) => {
846 registration.cleanup(event_loop).await;
847 return Err(error.into());
848 }
849 };
850 if create_result.session_id != session_id {
851 registration.cleanup(event_loop).await;
852 return Err(Error::Session(SessionError::SessionIdMismatch {
853 requested: session_id,
854 returned: create_result.session_id,
855 }));
856 }
857 *capabilities.write() = create_result.capabilities.unwrap_or_default();
858
859 tracing::debug!(
860 elapsed_ms = total_start.elapsed().as_millis(),
861 session_id = %session_id,
862 "Client::create_session complete"
863 );
864 registration.disarm();
865 Ok(Session {
866 id: session_id,
867 cwd: self.cwd().clone(),
868 workspace_path: create_result.workspace_path,
869 remote_url: create_result.remote_url,
870 client: self.clone(),
871 event_loop: ParkingLotMutex::new(Some(event_loop)),
872 shutdown,
873 idle_waiter,
874 capabilities,
875 event_tx,
876 })
877 }
878
879 pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
890 let total_start = Instant::now();
891 let handler = config
892 .handler
893 .take()
894 .unwrap_or_else(|| Arc::new(crate::handler::NoopHandler));
895 let hooks = config.hooks_handler.take();
896 let transforms = config.transform.take();
897 let tools_count = config.tools.as_ref().map_or(0, Vec::len);
898 let commands_count = config.commands.as_ref().map_or(0, Vec::len);
899 let has_hooks = hooks.is_some();
900 let command_handlers = build_command_handler_map(config.commands.as_deref());
901 let session_fs_provider = config.session_fs_provider.take();
902 if self.inner.session_fs_configured && session_fs_provider.is_none() {
903 return Err(Error::Session(SessionError::SessionFsProviderRequired));
904 }
905 if self.inner.session_fs_sqlite_declared
906 && let Some(ref provider) = session_fs_provider
907 && provider.sqlite().is_none()
908 {
909 return Err(Error::InvalidConfig(
910 "SessionFs capabilities declare SQLite support but the provider \
911 does not implement SessionFsSqliteProvider"
912 .to_string(),
913 ));
914 }
915
916 if hooks.is_some() && config.hooks.is_none() {
917 config.hooks = Some(true);
918 }
919 if let Some(ref transforms) = transforms {
920 inject_transform_sections_resume(&mut config, transforms.as_ref());
921 }
922 let session_id = config.session_id.clone();
923 let mut params = serde_json::to_value(&config)?;
924 let trace_ctx = self.resolve_trace_context().await;
925 inject_trace_context(&mut params, &trace_ctx);
926
927 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
928 let setup_start = Instant::now();
929 let channels = self.register_session(&session_id);
930 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
931 let shutdown = CancellationToken::new();
932 let (event_tx, _) = tokio::sync::broadcast::channel(512);
933 let event_loop = spawn_event_loop(
934 session_id.clone(),
935 self.clone(),
936 handler,
937 hooks,
938 transforms,
939 command_handlers,
940 session_fs_provider,
941 channels,
942 idle_waiter.clone(),
943 capabilities.clone(),
944 event_tx.clone(),
945 shutdown.clone(),
946 );
947 let mut registration =
948 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
949 tracing::debug!(
950 elapsed_ms = setup_start.elapsed().as_millis(),
951 session_id = %session_id,
952 tools_count,
953 commands_count,
954 has_hooks,
955 "Client::resume_session local setup complete"
956 );
957
958 let rpc_start = Instant::now();
959 let result = match self.call("session.resume", Some(params)).await {
960 Ok(result) => result,
961 Err(error) => {
962 registration.cleanup(event_loop).await;
963 return Err(error);
964 }
965 };
966 tracing::debug!(
967 elapsed_ms = rpc_start.elapsed().as_millis(),
968 session_id = %session_id,
969 "Client::resume_session session resume request completed successfully"
970 );
971
972 let cli_session_id: SessionId = result
974 .get("sessionId")
975 .and_then(|v| v.as_str())
976 .unwrap_or(&session_id)
977 .into();
978 if cli_session_id != session_id {
979 registration.cleanup(event_loop).await;
980 return Err(Error::Session(SessionError::SessionIdMismatch {
981 requested: session_id,
982 returned: cli_session_id,
983 }));
984 }
985
986 let resume_capabilities: Option<SessionCapabilities> = result
987 .get("capabilities")
988 .and_then(|v| {
989 serde_json::from_value(v.clone())
990 .map_err(|e| warn!(error = %e, "failed to deserialize capabilities from resume response"))
991 .ok()
992 });
993 let remote_url = result
994 .get("remoteUrl")
995 .or_else(|| result.get("remote_url"))
996 .and_then(|value| value.as_str())
997 .map(ToString::to_string);
998
999 let skills_reload_start = Instant::now();
1001 if let Err(e) = self
1002 .call(
1003 "session.skills.reload",
1004 Some(serde_json::json!({ "sessionId": session_id })),
1005 )
1006 .await
1007 {
1008 warn!(
1009 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1010 session_id = %session_id,
1011 error = %e,
1012 "Client::resume_session skills reload request failed"
1013 );
1014 } else {
1015 tracing::debug!(
1016 elapsed_ms = skills_reload_start.elapsed().as_millis(),
1017 session_id = %session_id,
1018 "Client::resume_session skills reload request completed successfully"
1019 );
1020 }
1021
1022 *capabilities.write() = resume_capabilities.unwrap_or_default();
1023
1024 tracing::debug!(
1025 elapsed_ms = total_start.elapsed().as_millis(),
1026 session_id = %session_id,
1027 "Client::resume_session complete"
1028 );
1029 registration.disarm();
1030 Ok(Session {
1031 id: session_id,
1032 cwd: self.cwd().clone(),
1033 workspace_path: None,
1034 remote_url,
1035 client: self.clone(),
1036 event_loop: ParkingLotMutex::new(Some(event_loop)),
1037 shutdown,
1038 idle_waiter,
1039 capabilities,
1040 event_tx,
1041 })
1042 }
1043}
1044
1045type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1046
1047fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1048 let map = match commands {
1049 Some(commands) => commands
1050 .iter()
1051 .filter(|cmd| !cmd.name.is_empty())
1052 .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1053 .collect(),
1054 None => HashMap::new(),
1055 };
1056 Arc::new(map)
1057}
1058
1059#[allow(clippy::too_many_arguments)]
1060fn spawn_event_loop(
1061 session_id: SessionId,
1062 client: Client,
1063 handler: Arc<dyn SessionHandler>,
1064 hooks: Option<Arc<dyn SessionHooks>>,
1065 transforms: Option<Arc<dyn SystemMessageTransform>>,
1066 command_handlers: Arc<CommandHandlerMap>,
1067 session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1068 channels: crate::router::SessionChannels,
1069 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1070 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1071 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1072 shutdown: CancellationToken,
1073) -> JoinHandle<()> {
1074 let crate::router::SessionChannels {
1075 mut notifications,
1076 mut requests,
1077 } = channels;
1078
1079 let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1080 tokio::spawn(
1081 async move {
1082 loop {
1083 tokio::select! {
1094 _ = shutdown.cancelled() => break,
1095 Some(notification) = notifications.recv() => {
1096 handle_notification(
1097 &session_id, &client, &handler, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx,
1098 ).await;
1099 }
1100 Some(request) = requests.recv() => {
1101 handle_request(
1102 &session_id, &client, &handler, hooks.as_deref(), transforms.as_deref(), session_fs_provider.as_ref(), request,
1103 ).await;
1104 }
1105 else => break,
1106 }
1107 }
1108 if let Some(waiter) = idle_waiter.lock().take() {
1111 let _ = waiter
1112 .tx
1113 .send(Err(Error::Session(SessionError::EventLoopClosed)));
1114 }
1115 }
1116 .instrument(span),
1117 )
1118}
1119
1120fn extract_request_id(data: &Value) -> Option<RequestId> {
1121 data.get("requestId")
1122 .and_then(|v| v.as_str())
1123 .filter(|s| !s.is_empty())
1124 .map(RequestId::new)
1125}
1126
1127fn pending_permission_result_kind(response: &HandlerResponse) -> &'static str {
1128 match response {
1129 HandlerResponse::Permission(PermissionResult::Approved) => "approve-once",
1130 HandlerResponse::Permission(PermissionResult::Denied) => "reject",
1131 _ => "user-not-available",
1135 }
1136}
1137
1138fn permission_request_response(response: &HandlerResponse) -> PermissionDecision {
1139 match response {
1140 HandlerResponse::Permission(PermissionResult::Approved) => {
1141 PermissionDecision::ApproveOnce(PermissionDecisionApproveOnce {
1142 kind: PermissionDecisionApproveOnceKind::ApproveOnce,
1143 })
1144 }
1145 _ => PermissionDecision::Reject(PermissionDecisionReject {
1146 kind: PermissionDecisionRejectKind::Reject,
1147 feedback: None,
1148 }),
1149 }
1150}
1151
1152fn notification_permission_payload(response: &HandlerResponse) -> Option<Value> {
1157 match response {
1158 HandlerResponse::Permission(PermissionResult::Deferred | PermissionResult::NoResult) => {
1159 None
1160 }
1161 HandlerResponse::Permission(PermissionResult::Custom(value)) => Some(value.clone()),
1162 _ => Some(serde_json::json!({
1163 "kind": pending_permission_result_kind(response),
1164 })),
1165 }
1166}
1167
1168fn direct_permission_payload(response: &HandlerResponse) -> Value {
1175 match response {
1176 HandlerResponse::Permission(PermissionResult::Custom(value)) => value.clone(),
1177 HandlerResponse::Permission(PermissionResult::Deferred) => serde_json::to_value(
1178 permission_request_response(&HandlerResponse::Permission(PermissionResult::Approved)),
1179 )
1180 .expect("serializing direct permission response should succeed"),
1181 HandlerResponse::Permission(
1182 PermissionResult::NoResult | PermissionResult::UserNotAvailable,
1183 ) => serde_json::json!({
1184 "kind": pending_permission_result_kind(response),
1185 }),
1186 _ => serde_json::to_value(permission_request_response(response))
1187 .expect("serializing direct permission response should succeed"),
1188 }
1189}
1190
1191fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1192 let message = message.into();
1193 ToolResult::Expanded(ToolResultExpanded {
1194 text_result_for_llm: message.clone(),
1195 result_type: "failure".to_string(),
1196 binary_results_for_llm: None,
1197 session_log: None,
1198 error: Some(message),
1199 tool_telemetry: None,
1200 })
1201}
1202
1203fn notification_tool_payload(response: HandlerResponse) -> Option<Value> {
1204 match response {
1205 HandlerResponse::ToolResult(result) => {
1206 Some(serde_json::to_value(result).unwrap_or(Value::Null))
1207 }
1208 HandlerResponse::NoResult => None,
1209 _ => Some(
1210 serde_json::to_value(tool_failure_result("Unexpected handler response"))
1211 .unwrap_or(Value::Null),
1212 ),
1213 }
1214}
1215
1216fn direct_tool_result(response: HandlerResponse) -> ToolResult {
1217 match response {
1218 HandlerResponse::ToolResult(result) => result,
1219 HandlerResponse::NoResult => tool_failure_result("No tool handler available"),
1220 _ => tool_failure_result("Unexpected handler response"),
1221 }
1222}
1223
1224#[allow(clippy::too_many_arguments)]
1226async fn handle_notification(
1227 session_id: &SessionId,
1228 client: &Client,
1229 handler: &Arc<dyn SessionHandler>,
1230 command_handlers: &Arc<CommandHandlerMap>,
1231 notification: SessionEventNotification,
1232 idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1233 capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1234 event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1235) {
1236 let dispatch_start = Instant::now();
1237 let event = notification.event.clone();
1238 let event_type = event.parsed_type();
1239 if event_type == SessionEventType::PermissionRequested {
1240 tracing::debug!(
1241 session_id = %session_id,
1242 event_type = %event.event_type,
1243 "Session::handle_notification permission request received"
1244 );
1245 }
1246
1247 match event_type {
1250 SessionEventType::AssistantMessage
1251 | SessionEventType::SessionIdle
1252 | SessionEventType::SessionError => {
1253 let mut guard = idle_waiter.lock();
1254 if let Some(waiter) = guard.as_mut() {
1255 match event_type {
1256 SessionEventType::AssistantMessage => {
1257 if !waiter.first_assistant_message_seen {
1258 waiter.first_assistant_message_seen = true;
1259 tracing::debug!(
1260 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1261 session_id = %session_id,
1262 "Session::send_and_wait first assistant message"
1263 );
1264 }
1265 waiter.last_assistant_message = Some(event.clone());
1266 }
1267 SessionEventType::SessionIdle | SessionEventType::SessionError => {
1268 if let Some(waiter) = guard.take() {
1269 if event_type == SessionEventType::SessionIdle {
1270 tracing::debug!(
1271 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1272 session_id = %session_id,
1273 "Session::send_and_wait idle received"
1274 );
1275 let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1276 } else {
1277 let error_msg = event
1278 .typed_data::<SessionErrorData>()
1279 .map(|d| d.message)
1280 .or_else(|| {
1281 event
1282 .data
1283 .get("message")
1284 .and_then(|v| v.as_str())
1285 .map(|s| s.to_string())
1286 })
1287 .unwrap_or_else(|| "session error".to_string());
1288 let _ = waiter
1289 .tx
1290 .send(Err(Error::Session(SessionError::AgentError(error_msg))));
1291 }
1292 }
1293 }
1294 _ => {}
1295 }
1296 }
1297 }
1298 _ => {}
1299 }
1300
1301 let _ = event_tx.send(event.clone());
1305
1306 handler
1308 .on_event(HandlerEvent::SessionEvent {
1309 session_id: session_id.clone(),
1310 event,
1311 })
1312 .await;
1313
1314 if event_type == SessionEventType::CapabilitiesChanged {
1318 match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1319 Ok(changed) => *capabilities.write() = changed,
1320 Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1321 }
1322 }
1323
1324 tracing::debug!(
1325 elapsed_ms = dispatch_start.elapsed().as_millis(),
1326 session_id = %session_id,
1327 event_type = %notification.event.event_type,
1328 "Session::handle_notification dispatch"
1329 );
1330
1331 match event_type {
1334 SessionEventType::PermissionRequested => {
1335 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1336 return;
1337 };
1338 let client = client.clone();
1339 let handler = handler.clone();
1340 let sid = session_id.clone();
1341 let data: PermissionRequestData =
1342 serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1343 PermissionRequestData {
1344 kind: None,
1345 tool_call_id: None,
1346 extra: notification.event.data.clone(),
1347 }
1348 });
1349 let span = tracing::error_span!(
1350 "permission_request_handler",
1351 session_id = %sid,
1352 request_id = %request_id
1353 );
1354 tokio::spawn(
1355 async move {
1356 let handler_start = Instant::now();
1357 let response = handler
1358 .on_event(HandlerEvent::PermissionRequest {
1359 session_id: sid.clone(),
1360 request_id: request_id.clone(),
1361 data,
1362 })
1363 .await;
1364 tracing::debug!(
1365 elapsed_ms = handler_start.elapsed().as_millis(),
1366 session_id = %sid,
1367 request_id = %request_id,
1368 "SessionHandler::on_permission_request dispatch"
1369 );
1370 let Some(result_value) = notification_permission_payload(&response) else {
1371 return;
1374 };
1375 let rpc_start = Instant::now();
1376 let _ = client
1377 .call(
1378 "session.permissions.handlePendingPermissionRequest",
1379 Some(serde_json::json!({
1380 "sessionId": sid,
1381 "requestId": request_id,
1382 "result": result_value,
1383 })),
1384 )
1385 .await;
1386 tracing::debug!(
1387 elapsed_ms = rpc_start.elapsed().as_millis(),
1388 session_id = %sid,
1389 request_id = %request_id,
1390 "Session::handle_notification response sent successfully"
1391 );
1392 }
1393 .instrument(span),
1394 );
1395 }
1396 SessionEventType::ExternalToolRequested => {
1397 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1398 return;
1399 };
1400 let data: ExternalToolRequestedData =
1401 match serde_json::from_value(notification.event.data.clone()) {
1402 Ok(d) => d,
1403 Err(e) => {
1404 warn!(error = %e, "failed to deserialize external_tool.requested");
1405 let client = client.clone();
1406 let sid = session_id.clone();
1407 let span = tracing::error_span!(
1408 "external_tool_deserialize_error",
1409 session_id = %sid,
1410 request_id = %request_id
1411 );
1412 tokio::spawn(
1413 async move {
1414 let rpc_start = Instant::now();
1415 let _ = client
1416 .call(
1417 "session.tools.handlePendingToolCall",
1418 Some(serde_json::json!({
1419 "sessionId": sid,
1420 "requestId": request_id,
1421 "error": format!("Failed to deserialize tool request: {e}"),
1422 })),
1423 )
1424 .await;
1425 tracing::debug!(
1426 elapsed_ms = rpc_start.elapsed().as_millis(),
1427 session_id = %sid,
1428 request_id = %request_id,
1429 "Session::handle_notification response sent successfully"
1430 );
1431 }
1432 .instrument(span),
1433 );
1434 return;
1435 }
1436 };
1437 let client = client.clone();
1438 let handler = handler.clone();
1439 let sid = session_id.clone();
1440 let span = tracing::error_span!(
1441 "external_tool_handler",
1442 session_id = %sid,
1443 request_id = %request_id
1444 );
1445 tokio::spawn(
1446 async move {
1447 if data.tool_call_id.is_empty() || data.tool_name.is_empty() {
1448 let error_msg = if data.tool_call_id.is_empty() {
1449 "Missing toolCallId"
1450 } else {
1451 "Missing toolName"
1452 };
1453 let rpc_start = Instant::now();
1454 let _ = client
1455 .call(
1456 "session.tools.handlePendingToolCall",
1457 Some(serde_json::json!({
1458 "sessionId": sid,
1459 "requestId": request_id,
1460 "error": error_msg,
1461 })),
1462 )
1463 .await;
1464 tracing::debug!(
1465 elapsed_ms = rpc_start.elapsed().as_millis(),
1466 session_id = %sid,
1467 request_id = %request_id,
1468 "Session::handle_notification response sent successfully"
1469 );
1470 return;
1471 }
1472 let tool_call_id = data.tool_call_id.clone();
1473 let tool_name = data.tool_name.clone();
1474 let invocation = ToolInvocation {
1475 session_id: sid.clone(),
1476 tool_call_id: data.tool_call_id,
1477 tool_name: data.tool_name,
1478 arguments: data
1479 .arguments
1480 .unwrap_or(Value::Object(serde_json::Map::new())),
1481 traceparent: data.traceparent,
1482 tracestate: data.tracestate,
1483 };
1484 let handler_start = Instant::now();
1485 let response = handler
1486 .on_event(HandlerEvent::ExternalTool { invocation })
1487 .await;
1488 tracing::debug!(
1489 elapsed_ms = handler_start.elapsed().as_millis(),
1490 session_id = %sid,
1491 request_id = %request_id,
1492 tool_call_id = %tool_call_id,
1493 tool_name = %tool_name,
1494 "ToolHandler::call dispatch"
1495 );
1496 let Some(result_value) = notification_tool_payload(response) else {
1497 return;
1498 };
1499 let rpc_start = Instant::now();
1500 let _ = client
1501 .call(
1502 "session.tools.handlePendingToolCall",
1503 Some(serde_json::json!({
1504 "sessionId": sid,
1505 "requestId": request_id,
1506 "result": result_value,
1507 })),
1508 )
1509 .await;
1510 tracing::debug!(
1511 elapsed_ms = rpc_start.elapsed().as_millis(),
1512 session_id = %sid,
1513 request_id = %request_id,
1514 tool_call_id = %tool_call_id,
1515 tool_name = %tool_name,
1516 "Session::handle_notification response sent successfully"
1517 );
1518 }
1519 .instrument(span),
1520 );
1521 }
1522 SessionEventType::UserInputRequested => {
1523 }
1530 SessionEventType::ElicitationRequested => {
1531 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1532 return;
1533 };
1534 let elicitation_data: ElicitationRequestedData =
1535 match serde_json::from_value(notification.event.data.clone()) {
1536 Ok(d) => d,
1537 Err(e) => {
1538 warn!(error = %e, "failed to deserialize elicitation request");
1539 return;
1540 }
1541 };
1542 let request = ElicitationRequest {
1543 message: elicitation_data.message,
1544 requested_schema: elicitation_data
1545 .requested_schema
1546 .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1547 mode: elicitation_data.mode.map(|m| match m {
1548 crate::generated::session_events::ElicitationRequestedMode::Form => {
1549 crate::types::ElicitationMode::Form
1550 }
1551 crate::generated::session_events::ElicitationRequestedMode::Url => {
1552 crate::types::ElicitationMode::Url
1553 }
1554 _ => crate::types::ElicitationMode::Unknown,
1555 }),
1556 elicitation_source: elicitation_data.elicitation_source,
1557 url: elicitation_data.url,
1558 };
1559 let client = client.clone();
1560 let handler = handler.clone();
1561 let sid = session_id.clone();
1562 let span = tracing::error_span!(
1563 "elicitation_request_handler",
1564 session_id = %sid,
1565 request_id = %request_id
1566 );
1567 tokio::spawn(
1568 async move {
1569 let cancel = ElicitationResult {
1570 action: "cancel".to_string(),
1571 content: None,
1572 };
1573 let handler_task = tokio::spawn({
1575 let sid = sid.clone();
1576 let request_id = request_id.clone();
1577 let span = tracing::error_span!(
1578 "elicitation_callback",
1579 session_id = %sid,
1580 request_id = %request_id
1581 );
1582 async move {
1583 let handler_start = Instant::now();
1584 let response = handler
1585 .on_event(HandlerEvent::ElicitationRequest {
1586 session_id: sid.clone(),
1587 request_id: request_id.clone(),
1588 request,
1589 })
1590 .await;
1591 tracing::debug!(
1592 elapsed_ms = handler_start.elapsed().as_millis(),
1593 session_id = %sid,
1594 request_id = %request_id,
1595 "SessionHandler::on_elicitation dispatch"
1596 );
1597 response
1598 }
1599 .instrument(span)
1600 });
1601 let result = match handler_task.await {
1602 Ok(HandlerResponse::Elicitation(r)) => r,
1603 _ => cancel.clone(),
1604 };
1605 let rpc_start = Instant::now();
1606 if let Err(e) = client
1607 .call(
1608 "session.ui.handlePendingElicitation",
1609 Some(serde_json::json!({
1610 "sessionId": sid,
1611 "requestId": request_id,
1612 "result": result,
1613 })),
1614 )
1615 .await
1616 {
1617 warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1619 let _ = client
1620 .call(
1621 "session.ui.handlePendingElicitation",
1622 Some(serde_json::json!({
1623 "sessionId": sid,
1624 "requestId": request_id,
1625 "result": cancel,
1626 })),
1627 )
1628 .await;
1629 } else {
1630 tracing::debug!(
1631 elapsed_ms = rpc_start.elapsed().as_millis(),
1632 session_id = %sid,
1633 request_id = %request_id,
1634 "Session::handle_notification response sent successfully"
1635 );
1636 }
1637 }
1638 .instrument(span),
1639 );
1640 }
1641 SessionEventType::CommandExecute => {
1642 let data: CommandExecuteData =
1643 match serde_json::from_value(notification.event.data.clone()) {
1644 Ok(d) => d,
1645 Err(e) => {
1646 warn!(error = %e, "failed to deserialize command.execute");
1647 return;
1648 }
1649 };
1650 let client = client.clone();
1651 let command_handlers = command_handlers.clone();
1652 let sid = session_id.clone();
1653 let span = tracing::error_span!("command_handler", session_id = %sid);
1654 tokio::spawn(
1655 async move {
1656 let request_id = data.request_id;
1657 let ack_error = match command_handlers.get(&data.command_name).cloned() {
1658 None => Some(format!("Unknown command: {}", data.command_name)),
1659 Some(handler) => {
1660 let command_name = data.command_name.clone();
1661 let ctx = CommandContext {
1662 session_id: sid.clone(),
1663 command: data.command,
1664 command_name: data.command_name,
1665 args: data.args,
1666 };
1667 let handler_start = Instant::now();
1668 let result = handler.on_command(ctx).await;
1669 tracing::debug!(
1670 elapsed_ms = handler_start.elapsed().as_millis(),
1671 session_id = %sid,
1672 request_id = %request_id,
1673 command_name = %command_name,
1674 "CommandHandler::call dispatch"
1675 );
1676 match result {
1677 Ok(()) => None,
1678 Err(e) => Some(e.to_string()),
1679 }
1680 }
1681 };
1682 let mut params = serde_json::json!({
1683 "sessionId": sid,
1684 "requestId": request_id,
1685 });
1686 if let Some(error_msg) = ack_error {
1687 params["error"] = serde_json::Value::String(error_msg);
1688 }
1689 let rpc_start = Instant::now();
1690 let _ = client
1691 .call("session.commands.handlePendingCommand", Some(params))
1692 .await;
1693 tracing::debug!(
1694 elapsed_ms = rpc_start.elapsed().as_millis(),
1695 session_id = %sid,
1696 request_id = %request_id,
1697 "Session::handle_notification response sent successfully"
1698 );
1699 }
1700 .instrument(span),
1701 );
1702 }
1703 _ => {}
1704 }
1705}
1706
1707async fn handle_request(
1709 session_id: &SessionId,
1710 client: &Client,
1711 handler: &Arc<dyn SessionHandler>,
1712 hooks: Option<&dyn SessionHooks>,
1713 transforms: Option<&dyn SystemMessageTransform>,
1714 session_fs_provider: Option<&Arc<dyn SessionFsProvider>>,
1715 request: crate::JsonRpcRequest,
1716) {
1717 let sid = session_id.clone();
1718
1719 if request.method.starts_with("sessionFs.") {
1720 crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
1721 return;
1722 }
1723
1724 match request.method.as_str() {
1725 "hooks.invoke" => {
1726 let params = request.params.as_ref();
1727 let hook_type = params
1728 .and_then(|p| p.get("hookType"))
1729 .and_then(|v| v.as_str())
1730 .unwrap_or("");
1731 let input = params
1732 .and_then(|p| p.get("input"))
1733 .cloned()
1734 .unwrap_or(Value::Object(Default::default()));
1735
1736 let rpc_result = if let Some(hooks) = hooks {
1737 match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
1738 Ok(output) => output,
1739 Err(e) => {
1740 warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
1741 serde_json::json!({ "output": {} })
1742 }
1743 }
1744 } else {
1745 serde_json::json!({ "output": {} })
1746 };
1747
1748 let rpc_response = JsonRpcResponse {
1749 jsonrpc: "2.0".to_string(),
1750 id: request.id,
1751 result: Some(rpc_result),
1752 error: None,
1753 };
1754 let _ = client.send_response(&rpc_response).await;
1755 }
1756
1757 "tool.call" => {
1758 let invocation: ToolInvocation = match request
1759 .params
1760 .as_ref()
1761 .and_then(|p| serde_json::from_value::<ToolInvocation>(p.clone()).ok())
1762 {
1763 Some(inv) => inv,
1764 None => {
1765 let _ = send_error_response(
1766 client,
1767 request.id,
1768 error_codes::INVALID_PARAMS,
1769 "invalid tool.call params",
1770 )
1771 .await;
1772 return;
1773 }
1774 };
1775 let tool_call_id = invocation.tool_call_id.clone();
1776 let tool_name = invocation.tool_name.clone();
1777 let handler_start = Instant::now();
1778 let response = handler
1779 .on_event(HandlerEvent::ExternalTool { invocation })
1780 .await;
1781 tracing::debug!(
1782 elapsed_ms = handler_start.elapsed().as_millis(),
1783 session_id = %sid,
1784 tool_call_id = %tool_call_id,
1785 tool_name = %tool_name,
1786 "ToolHandler::call dispatch"
1787 );
1788 let tool_result = direct_tool_result(response);
1789 let rpc_response = JsonRpcResponse {
1790 jsonrpc: "2.0".to_string(),
1791 id: request.id,
1792 result: Some(serde_json::json!(ToolResultResponse {
1793 result: tool_result
1794 })),
1795 error: None,
1796 };
1797 let _ = client.send_response(&rpc_response).await;
1798 }
1799
1800 "userInput.request" => {
1801 let params = request.params.as_ref();
1802 let Some(question) = params
1803 .and_then(|p| p.get("question"))
1804 .and_then(|v| v.as_str())
1805 else {
1806 warn!("userInput.request missing 'question' field");
1807 let rpc_response = JsonRpcResponse {
1808 jsonrpc: "2.0".to_string(),
1809 id: request.id,
1810 result: None,
1811 error: Some(crate::JsonRpcError {
1812 code: error_codes::INVALID_PARAMS,
1813 message: "missing required field: question".to_string(),
1814 data: None,
1815 }),
1816 };
1817 let _ = client.send_response(&rpc_response).await;
1818 return;
1819 };
1820 let question = question.to_string();
1821 let choices = params
1822 .and_then(|p| p.get("choices"))
1823 .and_then(|v| v.as_array())
1824 .map(|arr| {
1825 arr.iter()
1826 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1827 .collect()
1828 });
1829 let allow_freeform = params
1830 .and_then(|p| p.get("allowFreeform"))
1831 .and_then(|v| v.as_bool());
1832
1833 let handler_start = Instant::now();
1834 let response = handler
1835 .on_event(HandlerEvent::UserInput {
1836 session_id: sid.clone(),
1837 question,
1838 choices,
1839 allow_freeform,
1840 })
1841 .await;
1842 tracing::debug!(
1843 elapsed_ms = handler_start.elapsed().as_millis(),
1844 session_id = %sid,
1845 "SessionHandler::on_user_input dispatch"
1846 );
1847
1848 let rpc_result = match response {
1849 HandlerResponse::UserInput(Some(UserInputResponse {
1850 answer,
1851 was_freeform,
1852 })) => serde_json::json!({
1853 "answer": answer,
1854 "wasFreeform": was_freeform,
1855 }),
1856 _ => serde_json::json!({ "noResponse": true }),
1857 };
1858 let rpc_response = JsonRpcResponse {
1859 jsonrpc: "2.0".to_string(),
1860 id: request.id,
1861 result: Some(rpc_result),
1862 error: None,
1863 };
1864 let _ = client.send_response(&rpc_response).await;
1865 }
1866
1867 "exitPlanMode.request" => {
1868 let params = request
1869 .params
1870 .as_ref()
1871 .cloned()
1872 .unwrap_or(Value::Object(serde_json::Map::new()));
1873 let data: ExitPlanModeData = match serde_json::from_value(params) {
1874 Ok(d) => d,
1875 Err(e) => {
1876 warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
1877 ExitPlanModeData::default()
1878 }
1879 };
1880
1881 let response = handler
1882 .on_event(HandlerEvent::ExitPlanMode {
1883 session_id: sid,
1884 data,
1885 })
1886 .await;
1887
1888 let rpc_result = match response {
1889 HandlerResponse::ExitPlanMode(result) => serde_json::to_value(result)
1890 .expect("ExitPlanModeResult serialization cannot fail"),
1891 _ => serde_json::json!({ "approved": true }),
1892 };
1893 let rpc_response = JsonRpcResponse {
1894 jsonrpc: "2.0".to_string(),
1895 id: request.id,
1896 result: Some(rpc_result),
1897 error: None,
1898 };
1899 let _ = client.send_response(&rpc_response).await;
1900 }
1901
1902 "autoModeSwitch.request" => {
1903 let error_code = request
1904 .params
1905 .as_ref()
1906 .and_then(|p| p.get("errorCode"))
1907 .and_then(|v| v.as_str())
1908 .map(|s| s.to_string());
1909 let retry_after_seconds = request
1910 .params
1911 .as_ref()
1912 .and_then(|p| p.get("retryAfterSeconds"))
1913 .and_then(|v| v.as_f64());
1914
1915 let response = handler
1916 .on_event(HandlerEvent::AutoModeSwitch {
1917 session_id: sid,
1918 error_code,
1919 retry_after_seconds,
1920 })
1921 .await;
1922
1923 let answer = match response {
1924 HandlerResponse::AutoModeSwitch(answer) => answer,
1925 _ => AutoModeSwitchResponse::No,
1926 };
1927 let rpc_response = JsonRpcResponse {
1928 jsonrpc: "2.0".to_string(),
1929 id: request.id,
1930 result: Some(serde_json::json!({ "response": answer })),
1931 error: None,
1932 };
1933 let _ = client.send_response(&rpc_response).await;
1934 }
1935
1936 "permission.request" => {
1937 let Some(request_id) = request
1938 .params
1939 .as_ref()
1940 .and_then(|p| p.get("requestId"))
1941 .and_then(|v| v.as_str())
1942 .filter(|s| !s.is_empty())
1943 else {
1944 warn!("permission.request missing 'requestId' field");
1945 let rpc_response = JsonRpcResponse {
1946 jsonrpc: "2.0".to_string(),
1947 id: request.id,
1948 result: None,
1949 error: Some(crate::JsonRpcError {
1950 code: error_codes::INVALID_PARAMS,
1951 message: "missing required field: requestId".to_string(),
1952 data: None,
1953 }),
1954 };
1955 let _ = client.send_response(&rpc_response).await;
1956 return;
1957 };
1958 let request_id = RequestId::new(request_id);
1959 let raw_params = request
1960 .params
1961 .as_ref()
1962 .cloned()
1963 .unwrap_or(Value::Object(serde_json::Map::new()));
1964 let data: PermissionRequestData =
1965 serde_json::from_value(raw_params.clone()).unwrap_or(PermissionRequestData {
1966 kind: None,
1967 tool_call_id: None,
1968 extra: raw_params,
1969 });
1970
1971 let handler_start = Instant::now();
1972 let response = handler
1973 .on_event(HandlerEvent::PermissionRequest {
1974 session_id: sid.clone(),
1975 request_id: request_id.clone(),
1976 data,
1977 })
1978 .await;
1979 tracing::debug!(
1980 elapsed_ms = handler_start.elapsed().as_millis(),
1981 session_id = %sid,
1982 request_id = %request_id,
1983 "SessionHandler::on_permission_request dispatch"
1984 );
1985 let rpc_response = JsonRpcResponse {
1986 jsonrpc: "2.0".to_string(),
1987 id: request.id,
1988 result: Some(direct_permission_payload(&response)),
1989 error: None,
1990 };
1991 let _ = client.send_response(&rpc_response).await;
1992 }
1993
1994 "systemMessage.transform" => {
1995 let params = request.params.as_ref();
1996 let sections: HashMap<String, crate::transforms::TransformSection> =
1997 match params.and_then(|p| p.get("sections")) {
1998 Some(v) => match serde_json::from_value(v.clone()) {
1999 Ok(s) => s,
2000 Err(e) => {
2001 let _ = send_error_response(
2002 client,
2003 request.id,
2004 error_codes::INVALID_PARAMS,
2005 &format!("invalid sections: {e}"),
2006 )
2007 .await;
2008 return;
2009 }
2010 },
2011 None => {
2012 let _ = send_error_response(
2013 client,
2014 request.id,
2015 error_codes::INVALID_PARAMS,
2016 "missing sections parameter",
2017 )
2018 .await;
2019 return;
2020 }
2021 };
2022
2023 let rpc_result = if let Some(transforms) = transforms {
2024 let transform_start = Instant::now();
2025 let response =
2026 crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2027 tracing::debug!(
2028 elapsed_ms = transform_start.elapsed().as_millis(),
2029 session_id = %sid,
2030 "SystemMessageTransform::transform_section dispatch"
2031 );
2032 match serde_json::to_value(response) {
2033 Ok(v) => v,
2034 Err(e) => {
2035 warn!(error = %e, "failed to serialize transform response");
2036 serde_json::json!({ "sections": {} })
2037 }
2038 }
2039 } else {
2040 let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2042 serde_json::json!({ "sections": passthrough })
2043 };
2044
2045 let rpc_response = JsonRpcResponse {
2046 jsonrpc: "2.0".to_string(),
2047 id: request.id,
2048 result: Some(rpc_result),
2049 error: None,
2050 };
2051 let _ = client.send_response(&rpc_response).await;
2052 }
2053
2054 method => {
2055 warn!(
2056 method = method,
2057 "unhandled request method in session event loop"
2058 );
2059 let _ = send_error_response(
2060 client,
2061 request.id,
2062 error_codes::METHOD_NOT_FOUND,
2063 &format!("unknown method: {method}"),
2064 )
2065 .await;
2066 }
2067 }
2068}
2069
2070async fn send_error_response(
2071 client: &Client,
2072 id: u64,
2073 code: i32,
2074 message: &str,
2075) -> Result<(), Error> {
2076 let response = JsonRpcResponse {
2077 jsonrpc: "2.0".to_string(),
2078 id,
2079 result: None,
2080 error: Some(crate::JsonRpcError {
2081 code,
2082 message: message.to_string(),
2083 data: None,
2084 }),
2085 };
2086 client.send_response(&response).await
2087}
2088
2089fn apply_transform_sections(
2093 sys_msg: &mut SystemMessageConfig,
2094 transforms: &dyn SystemMessageTransform,
2095) {
2096 sys_msg.mode = Some("customize".to_string());
2097 let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2098 for id in transforms.section_ids() {
2099 sections.entry(id).or_insert_with(|| SectionOverride {
2100 action: Some("transform".to_string()),
2101 content: None,
2102 });
2103 }
2104}
2105
2106fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2107 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2108 apply_transform_sections(sys_msg, transforms);
2109}
2110
2111fn inject_transform_sections_resume(
2112 config: &mut ResumeSessionConfig,
2113 transforms: &dyn SystemMessageTransform,
2114) {
2115 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2116 apply_transform_sections(sys_msg, transforms);
2117}
2118
2119#[cfg(test)]
2120mod tests {
2121 use serde_json::json;
2122
2123 use super::{
2124 direct_permission_payload, notification_permission_payload, pending_permission_result_kind,
2125 permission_request_response,
2126 };
2127 use crate::handler::{HandlerResponse, PermissionResult};
2128
2129 #[test]
2130 fn pending_permission_requests_use_decision_kinds() {
2131 assert_eq!(
2132 pending_permission_result_kind(&HandlerResponse::Permission(
2133 PermissionResult::Approved,
2134 )),
2135 "approve-once"
2136 );
2137 assert_eq!(
2138 pending_permission_result_kind(&HandlerResponse::Permission(PermissionResult::Denied)),
2139 "reject"
2140 );
2141 assert_eq!(
2142 pending_permission_result_kind(&HandlerResponse::Ok),
2143 "user-not-available"
2144 );
2145 }
2146
2147 #[test]
2148 fn direct_permission_requests_use_decision_response_kinds() {
2149 assert_eq!(
2150 serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2151 PermissionResult::Approved
2152 ),))
2153 .expect("serializing approved permission response should succeed"),
2154 json!({ "kind": "approve-once" })
2155 );
2156 assert_eq!(
2157 serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2158 PermissionResult::Denied
2159 ),))
2160 .expect("serializing denied permission response should succeed"),
2161 json!({ "kind": "reject" })
2162 );
2163 assert_eq!(
2164 serde_json::to_value(permission_request_response(&HandlerResponse::Ok))
2165 .expect("serializing fallback permission response should succeed"),
2166 json!({ "kind": "reject" })
2167 );
2168 }
2169
2170 #[test]
2171 fn notification_payload_handles_non_responses_and_custom() {
2172 assert!(
2174 notification_permission_payload(&HandlerResponse::Permission(
2175 PermissionResult::Deferred,
2176 ))
2177 .is_none()
2178 );
2179 assert!(
2180 notification_permission_payload(&HandlerResponse::Permission(
2181 PermissionResult::NoResult,
2182 ))
2183 .is_none()
2184 );
2185
2186 let custom = json!({
2188 "kind": "approve-and-remember",
2189 "allowlist": ["ls", "grep"],
2190 });
2191 assert_eq!(
2192 notification_permission_payload(&HandlerResponse::Permission(
2193 PermissionResult::Custom(custom.clone()),
2194 )),
2195 Some(custom)
2196 );
2197
2198 assert_eq!(
2200 notification_permission_payload(&HandlerResponse::Permission(
2201 PermissionResult::Approved,
2202 )),
2203 Some(json!({ "kind": "approve-once" }))
2204 );
2205 assert_eq!(
2206 notification_permission_payload(
2207 &HandlerResponse::Permission(PermissionResult::Denied,)
2208 ),
2209 Some(json!({ "kind": "reject" }))
2210 );
2211 }
2212
2213 #[test]
2214 fn direct_payload_handles_deferred_and_custom() {
2215 let custom = json!({
2217 "kind": "approve-and-remember",
2218 "allowlist": ["ls", "grep"],
2219 });
2220 assert_eq!(
2221 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Custom(
2222 custom.clone(),
2223 ))),
2224 custom
2225 );
2226
2227 assert_eq!(
2229 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Deferred)),
2230 json!({ "kind": "approve-once" })
2231 );
2232
2233 assert_eq!(
2235 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::NoResult)),
2236 json!({ "kind": "user-not-available" })
2237 );
2238
2239 assert_eq!(
2241 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Approved)),
2242 json!({ "kind": "approve-once" })
2243 );
2244 assert_eq!(
2245 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Denied)),
2246 json!({ "kind": "reject" })
2247 );
2248 }
2249}