1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::generated::api_types::{
14 LogRequest, ModelSwitchToRequest, PermissionDecision, PermissionDecisionApproveOnce,
15 PermissionDecisionApproveOnceKind, PermissionDecisionReject, PermissionDecisionRejectKind,
16};
17use crate::generated::session_events::{
18 CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, SessionErrorData,
19 SessionEventType,
20};
21use crate::handler::{
22 AutoModeSwitchResponse, HandlerEvent, HandlerResponse, PermissionResult, SessionHandler,
23 UserInputResponse,
24};
25use crate::hooks::SessionHooks;
26use crate::session_fs::SessionFsProvider;
27use crate::trace_context::inject_trace_context;
28use crate::transforms::SystemMessageTransform;
29use crate::types::{
30 CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
31 ElicitationResult, ExitPlanModeData, GetMessagesResponse, InputOptions, MessageOptions,
32 PermissionRequestData, RequestId, ResumeSessionConfig, SectionOverride, SessionCapabilities,
33 SessionConfig, SessionEvent, SessionId, SetModelOptions, SystemMessageConfig, ToolInvocation,
34 ToolResult, ToolResultResponse, TraceContext, ensure_attachment_display_names,
35};
36use crate::{Client, Error, JsonRpcResponse, SessionError, SessionEventNotification, error_codes};
37
38struct IdleWaiter {
40 tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
41 last_assistant_message: Option<SessionEvent>,
42 started_at: Instant,
43 first_assistant_message_seen: bool,
44}
45
46struct WaiterGuard {
58 slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
59}
60
61impl Drop for WaiterGuard {
62 fn drop(&mut self) {
63 self.slot.lock().take();
64 }
65}
66
67struct PendingSessionRegistration {
68 client: Client,
69 session_id: SessionId,
70 shutdown: CancellationToken,
71 disarmed: bool,
72}
73
74impl PendingSessionRegistration {
75 fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
76 Self {
77 client,
78 session_id,
79 shutdown,
80 disarmed: false,
81 }
82 }
83
84 async fn cleanup(mut self, event_loop: JoinHandle<()>) {
85 self.shutdown.cancel();
86 let _ = event_loop.await;
87 self.client.unregister_session(&self.session_id);
88 self.disarmed = true;
89 }
90
91 fn disarm(&mut self) {
92 self.disarmed = true;
93 }
94}
95
96impl Drop for PendingSessionRegistration {
97 fn drop(&mut self) {
98 if !self.disarmed {
99 self.shutdown.cancel();
100 self.client.unregister_session(&self.session_id);
101 }
102 }
103}
104
105pub struct Session {
117 id: SessionId,
118 cwd: PathBuf,
119 workspace_path: Option<PathBuf>,
120 remote_url: Option<String>,
121 client: Client,
122 event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
127 shutdown: CancellationToken,
141 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
148 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
150 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
152}
153
154impl Session {
155 pub fn id(&self) -> &SessionId {
157 &self.id
158 }
159
160 pub fn cwd(&self) -> &PathBuf {
162 &self.cwd
163 }
164
165 pub fn workspace_path(&self) -> Option<&Path> {
167 self.workspace_path.as_deref()
168 }
169
170 pub fn remote_url(&self) -> Option<&str> {
172 self.remote_url.as_deref()
173 }
174
175 pub fn capabilities(&self) -> SessionCapabilities {
180 self.capabilities.read().clone()
181 }
182
183 pub fn cancellation_token(&self) -> CancellationToken {
210 self.shutdown.child_token()
211 }
212
213 pub fn subscribe(&self) -> crate::subscription::EventSubscription {
252 crate::subscription::EventSubscription::new(self.event_tx.subscribe())
253 }
254
255 pub fn client(&self) -> &Client {
257 &self.client
258 }
259
260 pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
271 crate::generated::rpc::SessionRpc { session: self }
272 }
273
274 pub async fn stop_event_loop(&self) {
282 self.shutdown.cancel();
283 let handle = self.event_loop.lock().take();
284 if let Some(handle) = handle {
285 let _ = handle.await;
286 }
287 if let Some(waiter) = self.idle_waiter.lock().take() {
289 let _ = waiter
290 .tx
291 .send(Err(Error::Session(SessionError::EventLoopClosed)));
292 }
293 }
294
295 pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
320 if self.idle_waiter.lock().is_some() {
321 return Err(Error::Session(SessionError::SendWhileWaiting));
322 }
323 self.send_inner(opts.into()).await
324 }
325
326 async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
327 let mut params = serde_json::json!({
328 "sessionId": self.id,
329 "prompt": opts.prompt,
330 });
331 if let Some(m) = opts.mode {
332 params["mode"] = serde_json::to_value(m)?;
333 }
334 if let Some(mut a) = opts.attachments {
335 ensure_attachment_display_names(&mut a);
336 params["attachments"] = serde_json::to_value(a)?;
337 }
338 if let Some(headers) = opts.request_headers
339 && !headers.is_empty()
340 {
341 params["requestHeaders"] = serde_json::to_value(headers)?;
342 }
343 let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
344 TraceContext {
345 traceparent: opts.traceparent,
346 tracestate: opts.tracestate,
347 }
348 } else {
349 self.client.resolve_trace_context().await
350 };
351 inject_trace_context(&mut params, &trace_ctx);
352 let rpc_start = Instant::now();
353 let result = self.client.call("session.send", Some(params)).await?;
354 let message_id = result
355 .get("messageId")
356 .and_then(|v| v.as_str())
357 .map(|s| s.to_string())
358 .unwrap_or_default();
359 tracing::debug!(
360 elapsed_ms = rpc_start.elapsed().as_millis(),
361 session_id = %self.id,
362 message_id = %message_id,
363 "Session::send completed successfully"
364 );
365 Ok(message_id)
366 }
367
368 pub async fn send_and_wait(
388 &self,
389 opts: impl Into<MessageOptions>,
390 ) -> Result<Option<SessionEvent>, Error> {
391 let total_start = Instant::now();
392 let opts = opts.into();
393 let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
394 let (tx, rx) = oneshot::channel();
395
396 {
397 let mut guard = self.idle_waiter.lock();
398 if guard.is_some() {
399 return Err(Error::Session(SessionError::SendWhileWaiting));
400 }
401 *guard = Some(IdleWaiter {
402 tx,
403 last_assistant_message: None,
404 started_at: total_start,
405 first_assistant_message_seen: false,
406 });
407 }
408
409 let _waiter_guard = WaiterGuard {
414 slot: self.idle_waiter.clone(),
415 };
416
417 let result = tokio::time::timeout(timeout_duration, async {
418 self.send_inner(opts).await?;
419 match rx.await {
420 Ok(result) => result,
421 Err(_) => Err(Error::Session(SessionError::EventLoopClosed)),
422 }
423 })
424 .await;
425
426 match result {
427 Ok(inner) => {
428 tracing::debug!(
429 elapsed_ms = total_start.elapsed().as_millis(),
430 session_id = %self.id,
431 completed_by = if inner.is_ok() { "idle" } else { "error" },
432 "Session::send_and_wait complete"
433 );
434 inner
435 }
436 Err(_) => {
437 tracing::warn!(
438 elapsed_ms = total_start.elapsed().as_millis(),
439 session_id = %self.id,
440 completed_by = "timeout",
441 "Session::send_and_wait failed"
442 );
443 Err(Error::Session(SessionError::Timeout(timeout_duration)))
444 }
445 }
446 }
447
448 pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
450 let result = self
451 .client
452 .call(
453 "session.getMessages",
454 Some(serde_json::json!({ "sessionId": self.id })),
455 )
456 .await?;
457 let response: GetMessagesResponse = serde_json::from_value(result)?;
458 Ok(response.events)
459 }
460
461 pub async fn abort(&self) -> Result<(), Error> {
469 self.client
470 .call(
471 "session.abort",
472 Some(serde_json::json!({ "sessionId": self.id })),
473 )
474 .await?;
475 Ok(())
476 }
477
478 pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
482 let opts = opts.unwrap_or_default();
483 let request = ModelSwitchToRequest {
484 model_id: model.to_string(),
485 reasoning_effort: opts.reasoning_effort,
486 model_capabilities: opts.model_capabilities,
487 };
488 self.rpc().model().switch_to(request).await?;
489 Ok(())
490 }
491
492 pub async fn disconnect(&self) -> Result<(), Error> {
509 self.client
510 .call(
511 "session.destroy",
512 Some(serde_json::json!({ "sessionId": self.id })),
513 )
514 .await?;
515 self.stop_event_loop().await;
516 self.client.unregister_session(&self.id);
517 Ok(())
518 }
519
520 pub async fn destroy(&self) -> Result<(), Error> {
526 self.disconnect().await
527 }
528
529 pub async fn log(
533 &self,
534 message: &str,
535 opts: Option<crate::types::LogOptions>,
536 ) -> Result<(), Error> {
537 let opts = opts.unwrap_or_default();
538 let level = match opts.level {
539 Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
540 None => None,
541 };
542 let request = LogRequest {
543 message: message.to_string(),
544 level,
545 ephemeral: opts.ephemeral,
546 url: None,
547 };
548 self.rpc().log(request).await?;
549 Ok(())
550 }
551
552 pub fn ui(&self) -> SessionUi<'_> {
558 SessionUi { session: self }
559 }
560
561 fn assert_elicitation(&self) -> Result<(), Error> {
563 if self
564 .capabilities
565 .read()
566 .ui
567 .as_ref()
568 .and_then(|u| u.elicitation)
569 != Some(true)
570 {
571 return Err(Error::Session(SessionError::ElicitationNotSupported));
572 }
573 Ok(())
574 }
575}
576
577impl Drop for Session {
578 fn drop(&mut self) {
579 self.shutdown.cancel();
591 self.client.unregister_session(&self.id);
592 }
593}
594
595pub struct SessionUi<'a> {
602 session: &'a Session,
603}
604
605impl<'a> SessionUi<'a> {
606 pub async fn elicitation(
614 &self,
615 message: &str,
616 schema: Value,
617 ) -> Result<ElicitationResult, Error> {
618 self.session.assert_elicitation()?;
619 let result = self
620 .session
621 .client
622 .call(
623 "session.ui.elicitation",
624 Some(serde_json::json!({
625 "sessionId": self.session.id,
626 "message": message,
627 "requestedSchema": schema,
628 })),
629 )
630 .await?;
631 let elicitation: ElicitationResult = serde_json::from_value(result)?;
632 Ok(elicitation)
633 }
634
635 pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
639 self.session.assert_elicitation()?;
640 let schema = serde_json::json!({
641 "type": "object",
642 "properties": {
643 "confirmed": {
644 "type": "boolean",
645 "default": true,
646 }
647 },
648 "required": ["confirmed"]
649 });
650 let result = self.elicitation(message, schema).await?;
651 Ok(result.action == "accept"
652 && result
653 .content
654 .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
655 == Some(true))
656 }
657
658 pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
662 self.session.assert_elicitation()?;
663 let schema = serde_json::json!({
664 "type": "object",
665 "properties": {
666 "selection": {
667 "type": "string",
668 "enum": options,
669 }
670 },
671 "required": ["selection"]
672 });
673 let result = self.elicitation(message, schema).await?;
674 if result.action != "accept" {
675 return Ok(None);
676 }
677 let selection = result.content.and_then(|c| {
678 c.get("selection")
679 .and_then(|v| v.as_str())
680 .map(String::from)
681 });
682 Ok(selection)
683 }
684
685 pub async fn input(
690 &self,
691 message: &str,
692 options: Option<&InputOptions<'_>>,
693 ) -> Result<Option<String>, Error> {
694 self.session.assert_elicitation()?;
695 let mut field = serde_json::json!({ "type": "string" });
696 if let Some(opts) = options {
697 if let Some(title) = opts.title {
698 field["title"] = Value::String(title.to_string());
699 }
700 if let Some(desc) = opts.description {
701 field["description"] = Value::String(desc.to_string());
702 }
703 if let Some(min) = opts.min_length {
704 field["minLength"] = Value::Number(min.into());
705 }
706 if let Some(max) = opts.max_length {
707 field["maxLength"] = Value::Number(max.into());
708 }
709 if let Some(fmt) = &opts.format {
710 field["format"] = Value::String(fmt.as_str().to_string());
711 }
712 if let Some(default) = opts.default {
713 field["default"] = Value::String(default.to_string());
714 }
715 }
716 let schema = serde_json::json!({
717 "type": "object",
718 "properties": { "value": field },
719 "required": ["value"]
720 });
721 let result = self.elicitation(message, schema).await?;
722 if result.action != "accept" {
723 return Ok(None);
724 }
725 let value = result
726 .content
727 .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
728 Ok(value)
729 }
730}
731
732impl Client {
733 pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
755 let total_start = Instant::now();
756 let handler = config
757 .handler
758 .take()
759 .unwrap_or_else(|| Arc::new(crate::handler::DenyAllHandler));
760 let hooks = config.hooks_handler.take();
761 let transforms = config.transform.take();
762 let tools_count = config.tools.as_ref().map_or(0, Vec::len);
763 let commands_count = config.commands.as_ref().map_or(0, Vec::len);
764 let has_hooks = hooks.is_some();
765 let command_handlers = build_command_handler_map(config.commands.as_deref());
766 let session_fs_provider = config.session_fs_provider.take();
767 if self.inner.session_fs_configured && session_fs_provider.is_none() {
768 return Err(Error::Session(SessionError::SessionFsProviderRequired));
769 }
770
771 if hooks.is_some() && config.hooks.is_none() {
772 config.hooks = Some(true);
773 }
774 if let Some(ref transforms) = transforms {
775 inject_transform_sections(&mut config, transforms.as_ref());
776 }
777 let session_id = config
778 .session_id
779 .clone()
780 .unwrap_or_else(|| SessionId::from(uuid::Uuid::new_v4().to_string()));
781 config.session_id = Some(session_id.clone());
782 let mut params = serde_json::to_value(&config)?;
783 let trace_ctx = self.resolve_trace_context().await;
784 inject_trace_context(&mut params, &trace_ctx);
785
786 let setup_start = Instant::now();
787 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
788 let channels = self.register_session(&session_id);
789 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
790 let shutdown = CancellationToken::new();
791 let (event_tx, _) = tokio::sync::broadcast::channel(512);
792 let event_loop = spawn_event_loop(
793 session_id.clone(),
794 self.clone(),
795 handler,
796 hooks,
797 transforms,
798 command_handlers,
799 session_fs_provider,
800 channels,
801 idle_waiter.clone(),
802 capabilities.clone(),
803 event_tx.clone(),
804 shutdown.clone(),
805 );
806 let mut registration =
807 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
808 tracing::debug!(
809 elapsed_ms = setup_start.elapsed().as_millis(),
810 session_id = %session_id,
811 tools_count,
812 commands_count,
813 has_hooks,
814 "Client::create_session local setup complete"
815 );
816
817 let rpc_start = Instant::now();
818 let result = match self.call("session.create", Some(params)).await {
819 Ok(result) => result,
820 Err(error) => {
821 registration.cleanup(event_loop).await;
822 return Err(error);
823 }
824 };
825 tracing::debug!(
826 elapsed_ms = rpc_start.elapsed().as_millis(),
827 "Client::create_session session creation request completed successfully"
828 );
829 let create_result: CreateSessionResult = match serde_json::from_value(result) {
830 Ok(result) => result,
831 Err(error) => {
832 registration.cleanup(event_loop).await;
833 return Err(error.into());
834 }
835 };
836 if create_result.session_id != session_id {
837 registration.cleanup(event_loop).await;
838 return Err(Error::Session(SessionError::SessionIdMismatch {
839 requested: session_id,
840 returned: create_result.session_id,
841 }));
842 }
843 *capabilities.write() = create_result.capabilities.unwrap_or_default();
844
845 tracing::debug!(
846 elapsed_ms = total_start.elapsed().as_millis(),
847 session_id = %session_id,
848 "Client::create_session complete"
849 );
850 registration.disarm();
851 Ok(Session {
852 id: session_id,
853 cwd: self.cwd().clone(),
854 workspace_path: create_result.workspace_path,
855 remote_url: create_result.remote_url,
856 client: self.clone(),
857 event_loop: ParkingLotMutex::new(Some(event_loop)),
858 shutdown,
859 idle_waiter,
860 capabilities,
861 event_tx,
862 })
863 }
864
865 pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
876 let total_start = Instant::now();
877 let handler = config
878 .handler
879 .take()
880 .unwrap_or_else(|| Arc::new(crate::handler::DenyAllHandler));
881 let hooks = config.hooks_handler.take();
882 let transforms = config.transform.take();
883 let tools_count = config.tools.as_ref().map_or(0, Vec::len);
884 let commands_count = config.commands.as_ref().map_or(0, Vec::len);
885 let has_hooks = hooks.is_some();
886 let command_handlers = build_command_handler_map(config.commands.as_deref());
887 let session_fs_provider = config.session_fs_provider.take();
888 if self.inner.session_fs_configured && session_fs_provider.is_none() {
889 return Err(Error::Session(SessionError::SessionFsProviderRequired));
890 }
891
892 if hooks.is_some() && config.hooks.is_none() {
893 config.hooks = Some(true);
894 }
895 if let Some(ref transforms) = transforms {
896 inject_transform_sections_resume(&mut config, transforms.as_ref());
897 }
898 let session_id = config.session_id.clone();
899 let mut params = serde_json::to_value(&config)?;
900 let trace_ctx = self.resolve_trace_context().await;
901 inject_trace_context(&mut params, &trace_ctx);
902
903 let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
904 let setup_start = Instant::now();
905 let channels = self.register_session(&session_id);
906 let idle_waiter = Arc::new(ParkingLotMutex::new(None));
907 let shutdown = CancellationToken::new();
908 let (event_tx, _) = tokio::sync::broadcast::channel(512);
909 let event_loop = spawn_event_loop(
910 session_id.clone(),
911 self.clone(),
912 handler,
913 hooks,
914 transforms,
915 command_handlers,
916 session_fs_provider,
917 channels,
918 idle_waiter.clone(),
919 capabilities.clone(),
920 event_tx.clone(),
921 shutdown.clone(),
922 );
923 let mut registration =
924 PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
925 tracing::debug!(
926 elapsed_ms = setup_start.elapsed().as_millis(),
927 session_id = %session_id,
928 tools_count,
929 commands_count,
930 has_hooks,
931 "Client::resume_session local setup complete"
932 );
933
934 let rpc_start = Instant::now();
935 let result = match self.call("session.resume", Some(params)).await {
936 Ok(result) => result,
937 Err(error) => {
938 registration.cleanup(event_loop).await;
939 return Err(error);
940 }
941 };
942 tracing::debug!(
943 elapsed_ms = rpc_start.elapsed().as_millis(),
944 session_id = %session_id,
945 "Client::resume_session session resume request completed successfully"
946 );
947
948 let cli_session_id: SessionId = result
950 .get("sessionId")
951 .and_then(|v| v.as_str())
952 .unwrap_or(&session_id)
953 .into();
954 if cli_session_id != session_id {
955 registration.cleanup(event_loop).await;
956 return Err(Error::Session(SessionError::SessionIdMismatch {
957 requested: session_id,
958 returned: cli_session_id,
959 }));
960 }
961
962 let resume_capabilities: Option<SessionCapabilities> = result
963 .get("capabilities")
964 .and_then(|v| {
965 serde_json::from_value(v.clone())
966 .map_err(|e| warn!(error = %e, "failed to deserialize capabilities from resume response"))
967 .ok()
968 });
969 let remote_url = result
970 .get("remoteUrl")
971 .or_else(|| result.get("remote_url"))
972 .and_then(|value| value.as_str())
973 .map(ToString::to_string);
974
975 let skills_reload_start = Instant::now();
977 if let Err(e) = self
978 .call(
979 "session.skills.reload",
980 Some(serde_json::json!({ "sessionId": session_id })),
981 )
982 .await
983 {
984 warn!(
985 elapsed_ms = skills_reload_start.elapsed().as_millis(),
986 session_id = %session_id,
987 error = %e,
988 "Client::resume_session skills reload request failed"
989 );
990 } else {
991 tracing::debug!(
992 elapsed_ms = skills_reload_start.elapsed().as_millis(),
993 session_id = %session_id,
994 "Client::resume_session skills reload request completed successfully"
995 );
996 }
997
998 *capabilities.write() = resume_capabilities.unwrap_or_default();
999
1000 tracing::debug!(
1001 elapsed_ms = total_start.elapsed().as_millis(),
1002 session_id = %session_id,
1003 "Client::resume_session complete"
1004 );
1005 registration.disarm();
1006 Ok(Session {
1007 id: session_id,
1008 cwd: self.cwd().clone(),
1009 workspace_path: None,
1010 remote_url,
1011 client: self.clone(),
1012 event_loop: ParkingLotMutex::new(Some(event_loop)),
1013 shutdown,
1014 idle_waiter,
1015 capabilities,
1016 event_tx,
1017 })
1018 }
1019}
1020
1021type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1022
1023fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1024 let map = match commands {
1025 Some(commands) => commands
1026 .iter()
1027 .filter(|cmd| !cmd.name.is_empty())
1028 .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1029 .collect(),
1030 None => HashMap::new(),
1031 };
1032 Arc::new(map)
1033}
1034
1035#[allow(clippy::too_many_arguments)]
1036fn spawn_event_loop(
1037 session_id: SessionId,
1038 client: Client,
1039 handler: Arc<dyn SessionHandler>,
1040 hooks: Option<Arc<dyn SessionHooks>>,
1041 transforms: Option<Arc<dyn SystemMessageTransform>>,
1042 command_handlers: Arc<CommandHandlerMap>,
1043 session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1044 channels: crate::router::SessionChannels,
1045 idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1046 capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1047 event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1048 shutdown: CancellationToken,
1049) -> JoinHandle<()> {
1050 let crate::router::SessionChannels {
1051 mut notifications,
1052 mut requests,
1053 } = channels;
1054
1055 let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1056 tokio::spawn(
1057 async move {
1058 loop {
1059 tokio::select! {
1070 _ = shutdown.cancelled() => break,
1071 Some(notification) = notifications.recv() => {
1072 handle_notification(
1073 &session_id, &client, &handler, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx,
1074 ).await;
1075 }
1076 Some(request) = requests.recv() => {
1077 handle_request(
1078 &session_id, &client, &handler, hooks.as_deref(), transforms.as_deref(), session_fs_provider.as_ref(), request,
1079 ).await;
1080 }
1081 else => break,
1082 }
1083 }
1084 if let Some(waiter) = idle_waiter.lock().take() {
1087 let _ = waiter
1088 .tx
1089 .send(Err(Error::Session(SessionError::EventLoopClosed)));
1090 }
1091 }
1092 .instrument(span),
1093 )
1094}
1095
1096fn extract_request_id(data: &Value) -> Option<RequestId> {
1097 data.get("requestId")
1098 .and_then(|v| v.as_str())
1099 .filter(|s| !s.is_empty())
1100 .map(RequestId::new)
1101}
1102
1103fn pending_permission_result_kind(response: &HandlerResponse) -> &'static str {
1104 match response {
1105 HandlerResponse::Permission(PermissionResult::Approved) => "approve-once",
1106 HandlerResponse::Permission(PermissionResult::Denied) => "reject",
1107 HandlerResponse::Permission(PermissionResult::NoResult) => "no-result",
1108 _ => "user-not-available",
1112 }
1113}
1114
1115fn permission_request_response(response: &HandlerResponse) -> PermissionDecision {
1116 match response {
1117 HandlerResponse::Permission(PermissionResult::Approved) => {
1118 PermissionDecision::ApproveOnce(PermissionDecisionApproveOnce {
1119 kind: PermissionDecisionApproveOnceKind::ApproveOnce,
1120 })
1121 }
1122 _ => PermissionDecision::Reject(PermissionDecisionReject {
1123 kind: PermissionDecisionRejectKind::Reject,
1124 feedback: None,
1125 }),
1126 }
1127}
1128
1129fn notification_permission_payload(response: &HandlerResponse) -> Option<Value> {
1136 match response {
1137 HandlerResponse::Permission(PermissionResult::Deferred) => None,
1138 HandlerResponse::Permission(PermissionResult::Custom(value)) => Some(value.clone()),
1139 _ => Some(serde_json::json!({
1140 "kind": pending_permission_result_kind(response),
1141 })),
1142 }
1143}
1144
1145fn direct_permission_payload(response: &HandlerResponse) -> Value {
1152 match response {
1153 HandlerResponse::Permission(PermissionResult::Custom(value)) => value.clone(),
1154 HandlerResponse::Permission(PermissionResult::Deferred) => serde_json::to_value(
1155 permission_request_response(&HandlerResponse::Permission(PermissionResult::Approved)),
1156 )
1157 .expect("serializing direct permission response should succeed"),
1158 HandlerResponse::Permission(PermissionResult::NoResult)
1159 | HandlerResponse::Permission(PermissionResult::UserNotAvailable) => serde_json::json!({
1160 "kind": pending_permission_result_kind(response),
1161 }),
1162 _ => serde_json::to_value(permission_request_response(response))
1163 .expect("serializing direct permission response should succeed"),
1164 }
1165}
1166
1167#[allow(clippy::too_many_arguments)]
1169async fn handle_notification(
1170 session_id: &SessionId,
1171 client: &Client,
1172 handler: &Arc<dyn SessionHandler>,
1173 command_handlers: &Arc<CommandHandlerMap>,
1174 notification: SessionEventNotification,
1175 idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1176 capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1177 event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1178) {
1179 let dispatch_start = Instant::now();
1180 let event = notification.event.clone();
1181 let event_type = event.parsed_type();
1182 if event_type == SessionEventType::PermissionRequested {
1183 tracing::debug!(
1184 session_id = %session_id,
1185 event_type = %event.event_type,
1186 "Session::handle_notification permission request received"
1187 );
1188 }
1189
1190 match event_type {
1193 SessionEventType::AssistantMessage
1194 | SessionEventType::SessionIdle
1195 | SessionEventType::SessionError => {
1196 let mut guard = idle_waiter.lock();
1197 if let Some(waiter) = guard.as_mut() {
1198 match event_type {
1199 SessionEventType::AssistantMessage => {
1200 if !waiter.first_assistant_message_seen {
1201 waiter.first_assistant_message_seen = true;
1202 tracing::debug!(
1203 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1204 session_id = %session_id,
1205 "Session::send_and_wait first assistant message"
1206 );
1207 }
1208 waiter.last_assistant_message = Some(event.clone());
1209 }
1210 SessionEventType::SessionIdle | SessionEventType::SessionError => {
1211 if let Some(waiter) = guard.take() {
1212 if event_type == SessionEventType::SessionIdle {
1213 tracing::debug!(
1214 elapsed_ms = waiter.started_at.elapsed().as_millis(),
1215 session_id = %session_id,
1216 "Session::send_and_wait idle received"
1217 );
1218 let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1219 } else {
1220 let error_msg = event
1221 .typed_data::<SessionErrorData>()
1222 .map(|d| d.message)
1223 .or_else(|| {
1224 event
1225 .data
1226 .get("message")
1227 .and_then(|v| v.as_str())
1228 .map(|s| s.to_string())
1229 })
1230 .unwrap_or_else(|| "session error".to_string());
1231 let _ = waiter
1232 .tx
1233 .send(Err(Error::Session(SessionError::AgentError(error_msg))));
1234 }
1235 }
1236 }
1237 _ => {}
1238 }
1239 }
1240 }
1241 _ => {}
1242 }
1243
1244 let _ = event_tx.send(event.clone());
1248
1249 handler
1251 .on_event(HandlerEvent::SessionEvent {
1252 session_id: session_id.clone(),
1253 event,
1254 })
1255 .await;
1256
1257 if event_type == SessionEventType::CapabilitiesChanged {
1261 match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1262 Ok(changed) => *capabilities.write() = changed,
1263 Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1264 }
1265 }
1266
1267 tracing::debug!(
1268 elapsed_ms = dispatch_start.elapsed().as_millis(),
1269 session_id = %session_id,
1270 event_type = %notification.event.event_type,
1271 "Session::handle_notification dispatch"
1272 );
1273
1274 match event_type {
1277 SessionEventType::PermissionRequested => {
1278 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1279 return;
1280 };
1281 let client = client.clone();
1282 let handler = handler.clone();
1283 let sid = session_id.clone();
1284 let data: PermissionRequestData =
1285 serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1286 PermissionRequestData {
1287 kind: None,
1288 tool_call_id: None,
1289 extra: notification.event.data.clone(),
1290 }
1291 });
1292 let span = tracing::error_span!(
1293 "permission_request_handler",
1294 session_id = %sid,
1295 request_id = %request_id
1296 );
1297 tokio::spawn(
1298 async move {
1299 let handler_start = Instant::now();
1300 let response = handler
1301 .on_event(HandlerEvent::PermissionRequest {
1302 session_id: sid.clone(),
1303 request_id: request_id.clone(),
1304 data,
1305 })
1306 .await;
1307 tracing::debug!(
1308 elapsed_ms = handler_start.elapsed().as_millis(),
1309 session_id = %sid,
1310 request_id = %request_id,
1311 "SessionHandler::on_permission_request dispatch"
1312 );
1313 let Some(result_value) = notification_permission_payload(&response) else {
1314 return;
1317 };
1318 let rpc_start = Instant::now();
1319 let _ = client
1320 .call(
1321 "session.permissions.handlePendingPermissionRequest",
1322 Some(serde_json::json!({
1323 "sessionId": sid,
1324 "requestId": request_id,
1325 "result": result_value,
1326 })),
1327 )
1328 .await;
1329 tracing::debug!(
1330 elapsed_ms = rpc_start.elapsed().as_millis(),
1331 session_id = %sid,
1332 request_id = %request_id,
1333 "Session::handle_notification response sent successfully"
1334 );
1335 }
1336 .instrument(span),
1337 );
1338 }
1339 SessionEventType::ExternalToolRequested => {
1340 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1341 return;
1342 };
1343 let data: ExternalToolRequestedData =
1344 match serde_json::from_value(notification.event.data.clone()) {
1345 Ok(d) => d,
1346 Err(e) => {
1347 warn!(error = %e, "failed to deserialize external_tool.requested");
1348 let client = client.clone();
1349 let sid = session_id.clone();
1350 let span = tracing::error_span!(
1351 "external_tool_deserialize_error",
1352 session_id = %sid,
1353 request_id = %request_id
1354 );
1355 tokio::spawn(
1356 async move {
1357 let rpc_start = Instant::now();
1358 let _ = client
1359 .call(
1360 "session.tools.handlePendingToolCall",
1361 Some(serde_json::json!({
1362 "sessionId": sid,
1363 "requestId": request_id,
1364 "error": format!("Failed to deserialize tool request: {e}"),
1365 })),
1366 )
1367 .await;
1368 tracing::debug!(
1369 elapsed_ms = rpc_start.elapsed().as_millis(),
1370 session_id = %sid,
1371 request_id = %request_id,
1372 "Session::handle_notification response sent successfully"
1373 );
1374 }
1375 .instrument(span),
1376 );
1377 return;
1378 }
1379 };
1380 let client = client.clone();
1381 let handler = handler.clone();
1382 let sid = session_id.clone();
1383 let span = tracing::error_span!(
1384 "external_tool_handler",
1385 session_id = %sid,
1386 request_id = %request_id
1387 );
1388 tokio::spawn(
1389 async move {
1390 if data.tool_call_id.is_empty() || data.tool_name.is_empty() {
1391 let error_msg = if data.tool_call_id.is_empty() {
1392 "Missing toolCallId"
1393 } else {
1394 "Missing toolName"
1395 };
1396 let rpc_start = Instant::now();
1397 let _ = client
1398 .call(
1399 "session.tools.handlePendingToolCall",
1400 Some(serde_json::json!({
1401 "sessionId": sid,
1402 "requestId": request_id,
1403 "error": error_msg,
1404 })),
1405 )
1406 .await;
1407 tracing::debug!(
1408 elapsed_ms = rpc_start.elapsed().as_millis(),
1409 session_id = %sid,
1410 request_id = %request_id,
1411 "Session::handle_notification response sent successfully"
1412 );
1413 return;
1414 }
1415 let tool_call_id = data.tool_call_id.clone();
1416 let tool_name = data.tool_name.clone();
1417 let invocation = ToolInvocation {
1418 session_id: sid.clone(),
1419 tool_call_id: data.tool_call_id,
1420 tool_name: data.tool_name,
1421 arguments: data
1422 .arguments
1423 .unwrap_or(Value::Object(serde_json::Map::new())),
1424 traceparent: data.traceparent,
1425 tracestate: data.tracestate,
1426 };
1427 let handler_start = Instant::now();
1428 let response = handler
1429 .on_event(HandlerEvent::ExternalTool { invocation })
1430 .await;
1431 tracing::debug!(
1432 elapsed_ms = handler_start.elapsed().as_millis(),
1433 session_id = %sid,
1434 request_id = %request_id,
1435 tool_call_id = %tool_call_id,
1436 tool_name = %tool_name,
1437 "ToolHandler::call dispatch"
1438 );
1439 let tool_result = match response {
1440 HandlerResponse::ToolResult(r) => r,
1441 _ => ToolResult::Text("Unexpected handler response".to_string()),
1442 };
1443 let result_value = serde_json::to_value(&tool_result).unwrap_or(Value::Null);
1444 let rpc_start = Instant::now();
1445 let _ = client
1446 .call(
1447 "session.tools.handlePendingToolCall",
1448 Some(serde_json::json!({
1449 "sessionId": sid,
1450 "requestId": request_id,
1451 "result": result_value,
1452 })),
1453 )
1454 .await;
1455 tracing::debug!(
1456 elapsed_ms = rpc_start.elapsed().as_millis(),
1457 session_id = %sid,
1458 request_id = %request_id,
1459 tool_call_id = %tool_call_id,
1460 tool_name = %tool_name,
1461 "Session::handle_notification response sent successfully"
1462 );
1463 }
1464 .instrument(span),
1465 );
1466 }
1467 SessionEventType::UserInputRequested => {
1468 }
1475 SessionEventType::ElicitationRequested => {
1476 let Some(request_id) = extract_request_id(¬ification.event.data) else {
1477 return;
1478 };
1479 let elicitation_data: ElicitationRequestedData =
1480 match serde_json::from_value(notification.event.data.clone()) {
1481 Ok(d) => d,
1482 Err(e) => {
1483 warn!(error = %e, "failed to deserialize elicitation request");
1484 return;
1485 }
1486 };
1487 let request = ElicitationRequest {
1488 message: elicitation_data.message,
1489 requested_schema: elicitation_data
1490 .requested_schema
1491 .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1492 mode: elicitation_data.mode.map(|m| match m {
1493 crate::generated::session_events::ElicitationRequestedMode::Form => {
1494 crate::types::ElicitationMode::Form
1495 }
1496 crate::generated::session_events::ElicitationRequestedMode::Url => {
1497 crate::types::ElicitationMode::Url
1498 }
1499 _ => crate::types::ElicitationMode::Unknown,
1500 }),
1501 elicitation_source: elicitation_data.elicitation_source,
1502 url: elicitation_data.url,
1503 };
1504 let client = client.clone();
1505 let handler = handler.clone();
1506 let sid = session_id.clone();
1507 let span = tracing::error_span!(
1508 "elicitation_request_handler",
1509 session_id = %sid,
1510 request_id = %request_id
1511 );
1512 tokio::spawn(
1513 async move {
1514 let cancel = ElicitationResult {
1515 action: "cancel".to_string(),
1516 content: None,
1517 };
1518 let handler_task = tokio::spawn({
1520 let sid = sid.clone();
1521 let request_id = request_id.clone();
1522 let span = tracing::error_span!(
1523 "elicitation_callback",
1524 session_id = %sid,
1525 request_id = %request_id
1526 );
1527 async move {
1528 let handler_start = Instant::now();
1529 let response = handler
1530 .on_event(HandlerEvent::ElicitationRequest {
1531 session_id: sid.clone(),
1532 request_id: request_id.clone(),
1533 request,
1534 })
1535 .await;
1536 tracing::debug!(
1537 elapsed_ms = handler_start.elapsed().as_millis(),
1538 session_id = %sid,
1539 request_id = %request_id,
1540 "SessionHandler::on_elicitation dispatch"
1541 );
1542 response
1543 }
1544 .instrument(span)
1545 });
1546 let result = match handler_task.await {
1547 Ok(HandlerResponse::Elicitation(r)) => r,
1548 _ => cancel.clone(),
1549 };
1550 let rpc_start = Instant::now();
1551 if let Err(e) = client
1552 .call(
1553 "session.ui.handlePendingElicitation",
1554 Some(serde_json::json!({
1555 "sessionId": sid,
1556 "requestId": request_id,
1557 "result": result,
1558 })),
1559 )
1560 .await
1561 {
1562 warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1564 let _ = client
1565 .call(
1566 "session.ui.handlePendingElicitation",
1567 Some(serde_json::json!({
1568 "sessionId": sid,
1569 "requestId": request_id,
1570 "result": cancel,
1571 })),
1572 )
1573 .await;
1574 } else {
1575 tracing::debug!(
1576 elapsed_ms = rpc_start.elapsed().as_millis(),
1577 session_id = %sid,
1578 request_id = %request_id,
1579 "Session::handle_notification response sent successfully"
1580 );
1581 }
1582 }
1583 .instrument(span),
1584 );
1585 }
1586 SessionEventType::CommandExecute => {
1587 let data: CommandExecuteData =
1588 match serde_json::from_value(notification.event.data.clone()) {
1589 Ok(d) => d,
1590 Err(e) => {
1591 warn!(error = %e, "failed to deserialize command.execute");
1592 return;
1593 }
1594 };
1595 let client = client.clone();
1596 let command_handlers = command_handlers.clone();
1597 let sid = session_id.clone();
1598 let span = tracing::error_span!("command_handler", session_id = %sid);
1599 tokio::spawn(
1600 async move {
1601 let request_id = data.request_id;
1602 let ack_error = match command_handlers.get(&data.command_name).cloned() {
1603 None => Some(format!("Unknown command: {}", data.command_name)),
1604 Some(handler) => {
1605 let command_name = data.command_name.clone();
1606 let ctx = CommandContext {
1607 session_id: sid.clone(),
1608 command: data.command,
1609 command_name: data.command_name,
1610 args: data.args,
1611 };
1612 let handler_start = Instant::now();
1613 let result = handler.on_command(ctx).await;
1614 tracing::debug!(
1615 elapsed_ms = handler_start.elapsed().as_millis(),
1616 session_id = %sid,
1617 request_id = %request_id,
1618 command_name = %command_name,
1619 "CommandHandler::call dispatch"
1620 );
1621 match result {
1622 Ok(()) => None,
1623 Err(e) => Some(e.to_string()),
1624 }
1625 }
1626 };
1627 let mut params = serde_json::json!({
1628 "sessionId": sid,
1629 "requestId": request_id,
1630 });
1631 if let Some(error_msg) = ack_error {
1632 params["error"] = serde_json::Value::String(error_msg);
1633 }
1634 let rpc_start = Instant::now();
1635 let _ = client
1636 .call("session.commands.handlePendingCommand", Some(params))
1637 .await;
1638 tracing::debug!(
1639 elapsed_ms = rpc_start.elapsed().as_millis(),
1640 session_id = %sid,
1641 request_id = %request_id,
1642 "Session::handle_notification response sent successfully"
1643 );
1644 }
1645 .instrument(span),
1646 );
1647 }
1648 _ => {}
1649 }
1650}
1651
1652async fn handle_request(
1654 session_id: &SessionId,
1655 client: &Client,
1656 handler: &Arc<dyn SessionHandler>,
1657 hooks: Option<&dyn SessionHooks>,
1658 transforms: Option<&dyn SystemMessageTransform>,
1659 session_fs_provider: Option<&Arc<dyn SessionFsProvider>>,
1660 request: crate::JsonRpcRequest,
1661) {
1662 let sid = session_id.clone();
1663
1664 if request.method.starts_with("sessionFs.") {
1665 crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
1666 return;
1667 }
1668
1669 match request.method.as_str() {
1670 "hooks.invoke" => {
1671 let params = request.params.as_ref();
1672 let hook_type = params
1673 .and_then(|p| p.get("hookType"))
1674 .and_then(|v| v.as_str())
1675 .unwrap_or("");
1676 let input = params
1677 .and_then(|p| p.get("input"))
1678 .cloned()
1679 .unwrap_or(Value::Object(Default::default()));
1680
1681 let rpc_result = if let Some(hooks) = hooks {
1682 match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
1683 Ok(output) => output,
1684 Err(e) => {
1685 warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
1686 serde_json::json!({ "output": {} })
1687 }
1688 }
1689 } else {
1690 serde_json::json!({ "output": {} })
1691 };
1692
1693 let rpc_response = JsonRpcResponse {
1694 jsonrpc: "2.0".to_string(),
1695 id: request.id,
1696 result: Some(rpc_result),
1697 error: None,
1698 };
1699 let _ = client.send_response(&rpc_response).await;
1700 }
1701
1702 "tool.call" => {
1703 let invocation: ToolInvocation = match request
1704 .params
1705 .as_ref()
1706 .and_then(|p| serde_json::from_value::<ToolInvocation>(p.clone()).ok())
1707 {
1708 Some(inv) => inv,
1709 None => {
1710 let _ = send_error_response(
1711 client,
1712 request.id,
1713 error_codes::INVALID_PARAMS,
1714 "invalid tool.call params",
1715 )
1716 .await;
1717 return;
1718 }
1719 };
1720 let tool_call_id = invocation.tool_call_id.clone();
1721 let tool_name = invocation.tool_name.clone();
1722 let handler_start = Instant::now();
1723 let response = handler
1724 .on_event(HandlerEvent::ExternalTool { invocation })
1725 .await;
1726 tracing::debug!(
1727 elapsed_ms = handler_start.elapsed().as_millis(),
1728 session_id = %sid,
1729 tool_call_id = %tool_call_id,
1730 tool_name = %tool_name,
1731 "ToolHandler::call dispatch"
1732 );
1733 let tool_result = match response {
1734 HandlerResponse::ToolResult(r) => r,
1735 _ => ToolResult::Text("Unexpected handler response".to_string()),
1736 };
1737 let rpc_response = JsonRpcResponse {
1738 jsonrpc: "2.0".to_string(),
1739 id: request.id,
1740 result: Some(serde_json::json!(ToolResultResponse {
1741 result: tool_result
1742 })),
1743 error: None,
1744 };
1745 let _ = client.send_response(&rpc_response).await;
1746 }
1747
1748 "userInput.request" => {
1749 let params = request.params.as_ref();
1750 let Some(question) = params
1751 .and_then(|p| p.get("question"))
1752 .and_then(|v| v.as_str())
1753 else {
1754 warn!("userInput.request missing 'question' field");
1755 let rpc_response = JsonRpcResponse {
1756 jsonrpc: "2.0".to_string(),
1757 id: request.id,
1758 result: None,
1759 error: Some(crate::JsonRpcError {
1760 code: error_codes::INVALID_PARAMS,
1761 message: "missing required field: question".to_string(),
1762 data: None,
1763 }),
1764 };
1765 let _ = client.send_response(&rpc_response).await;
1766 return;
1767 };
1768 let question = question.to_string();
1769 let choices = params
1770 .and_then(|p| p.get("choices"))
1771 .and_then(|v| v.as_array())
1772 .map(|arr| {
1773 arr.iter()
1774 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1775 .collect()
1776 });
1777 let allow_freeform = params
1778 .and_then(|p| p.get("allowFreeform"))
1779 .and_then(|v| v.as_bool());
1780
1781 let handler_start = Instant::now();
1782 let response = handler
1783 .on_event(HandlerEvent::UserInput {
1784 session_id: sid.clone(),
1785 question,
1786 choices,
1787 allow_freeform,
1788 })
1789 .await;
1790 tracing::debug!(
1791 elapsed_ms = handler_start.elapsed().as_millis(),
1792 session_id = %sid,
1793 "SessionHandler::on_user_input dispatch"
1794 );
1795
1796 let rpc_result = match response {
1797 HandlerResponse::UserInput(Some(UserInputResponse {
1798 answer,
1799 was_freeform,
1800 })) => serde_json::json!({
1801 "answer": answer,
1802 "wasFreeform": was_freeform,
1803 }),
1804 _ => serde_json::json!({ "noResponse": true }),
1805 };
1806 let rpc_response = JsonRpcResponse {
1807 jsonrpc: "2.0".to_string(),
1808 id: request.id,
1809 result: Some(rpc_result),
1810 error: None,
1811 };
1812 let _ = client.send_response(&rpc_response).await;
1813 }
1814
1815 "exitPlanMode.request" => {
1816 let params = request
1817 .params
1818 .as_ref()
1819 .cloned()
1820 .unwrap_or(Value::Object(serde_json::Map::new()));
1821 let data: ExitPlanModeData = match serde_json::from_value(params) {
1822 Ok(d) => d,
1823 Err(e) => {
1824 warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
1825 ExitPlanModeData::default()
1826 }
1827 };
1828
1829 let response = handler
1830 .on_event(HandlerEvent::ExitPlanMode {
1831 session_id: sid,
1832 data,
1833 })
1834 .await;
1835
1836 let rpc_result = match response {
1837 HandlerResponse::ExitPlanMode(result) => serde_json::to_value(result)
1838 .expect("ExitPlanModeResult serialization cannot fail"),
1839 _ => serde_json::json!({ "approved": true }),
1840 };
1841 let rpc_response = JsonRpcResponse {
1842 jsonrpc: "2.0".to_string(),
1843 id: request.id,
1844 result: Some(rpc_result),
1845 error: None,
1846 };
1847 let _ = client.send_response(&rpc_response).await;
1848 }
1849
1850 "autoModeSwitch.request" => {
1851 let error_code = request
1852 .params
1853 .as_ref()
1854 .and_then(|p| p.get("errorCode"))
1855 .and_then(|v| v.as_str())
1856 .map(|s| s.to_string());
1857 let retry_after_seconds = request
1858 .params
1859 .as_ref()
1860 .and_then(|p| p.get("retryAfterSeconds"))
1861 .and_then(|v| v.as_f64());
1862
1863 let response = handler
1864 .on_event(HandlerEvent::AutoModeSwitch {
1865 session_id: sid,
1866 error_code,
1867 retry_after_seconds,
1868 })
1869 .await;
1870
1871 let answer = match response {
1872 HandlerResponse::AutoModeSwitch(answer) => answer,
1873 _ => AutoModeSwitchResponse::No,
1874 };
1875 let rpc_response = JsonRpcResponse {
1876 jsonrpc: "2.0".to_string(),
1877 id: request.id,
1878 result: Some(serde_json::json!({ "response": answer })),
1879 error: None,
1880 };
1881 let _ = client.send_response(&rpc_response).await;
1882 }
1883
1884 "permission.request" => {
1885 let Some(request_id) = request
1886 .params
1887 .as_ref()
1888 .and_then(|p| p.get("requestId"))
1889 .and_then(|v| v.as_str())
1890 .filter(|s| !s.is_empty())
1891 else {
1892 warn!("permission.request missing 'requestId' field");
1893 let rpc_response = JsonRpcResponse {
1894 jsonrpc: "2.0".to_string(),
1895 id: request.id,
1896 result: None,
1897 error: Some(crate::JsonRpcError {
1898 code: error_codes::INVALID_PARAMS,
1899 message: "missing required field: requestId".to_string(),
1900 data: None,
1901 }),
1902 };
1903 let _ = client.send_response(&rpc_response).await;
1904 return;
1905 };
1906 let request_id = RequestId::new(request_id);
1907 let raw_params = request
1908 .params
1909 .as_ref()
1910 .cloned()
1911 .unwrap_or(Value::Object(serde_json::Map::new()));
1912 let data: PermissionRequestData =
1913 serde_json::from_value(raw_params.clone()).unwrap_or(PermissionRequestData {
1914 kind: None,
1915 tool_call_id: None,
1916 extra: raw_params,
1917 });
1918
1919 let handler_start = Instant::now();
1920 let response = handler
1921 .on_event(HandlerEvent::PermissionRequest {
1922 session_id: sid.clone(),
1923 request_id: request_id.clone(),
1924 data,
1925 })
1926 .await;
1927 tracing::debug!(
1928 elapsed_ms = handler_start.elapsed().as_millis(),
1929 session_id = %sid,
1930 request_id = %request_id,
1931 "SessionHandler::on_permission_request dispatch"
1932 );
1933 let rpc_response = JsonRpcResponse {
1934 jsonrpc: "2.0".to_string(),
1935 id: request.id,
1936 result: Some(direct_permission_payload(&response)),
1937 error: None,
1938 };
1939 let _ = client.send_response(&rpc_response).await;
1940 }
1941
1942 "systemMessage.transform" => {
1943 let params = request.params.as_ref();
1944 let sections: HashMap<String, crate::transforms::TransformSection> =
1945 match params.and_then(|p| p.get("sections")) {
1946 Some(v) => match serde_json::from_value(v.clone()) {
1947 Ok(s) => s,
1948 Err(e) => {
1949 let _ = send_error_response(
1950 client,
1951 request.id,
1952 error_codes::INVALID_PARAMS,
1953 &format!("invalid sections: {e}"),
1954 )
1955 .await;
1956 return;
1957 }
1958 },
1959 None => {
1960 let _ = send_error_response(
1961 client,
1962 request.id,
1963 error_codes::INVALID_PARAMS,
1964 "missing sections parameter",
1965 )
1966 .await;
1967 return;
1968 }
1969 };
1970
1971 let rpc_result = if let Some(transforms) = transforms {
1972 let transform_start = Instant::now();
1973 let response =
1974 crate::transforms::dispatch_transform(transforms, &sid, sections).await;
1975 tracing::debug!(
1976 elapsed_ms = transform_start.elapsed().as_millis(),
1977 session_id = %sid,
1978 "SystemMessageTransform::transform_section dispatch"
1979 );
1980 match serde_json::to_value(response) {
1981 Ok(v) => v,
1982 Err(e) => {
1983 warn!(error = %e, "failed to serialize transform response");
1984 serde_json::json!({ "sections": {} })
1985 }
1986 }
1987 } else {
1988 let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
1990 serde_json::json!({ "sections": passthrough })
1991 };
1992
1993 let rpc_response = JsonRpcResponse {
1994 jsonrpc: "2.0".to_string(),
1995 id: request.id,
1996 result: Some(rpc_result),
1997 error: None,
1998 };
1999 let _ = client.send_response(&rpc_response).await;
2000 }
2001
2002 method => {
2003 warn!(
2004 method = method,
2005 "unhandled request method in session event loop"
2006 );
2007 let _ = send_error_response(
2008 client,
2009 request.id,
2010 error_codes::METHOD_NOT_FOUND,
2011 &format!("unknown method: {method}"),
2012 )
2013 .await;
2014 }
2015 }
2016}
2017
2018async fn send_error_response(
2019 client: &Client,
2020 id: u64,
2021 code: i32,
2022 message: &str,
2023) -> Result<(), Error> {
2024 let response = JsonRpcResponse {
2025 jsonrpc: "2.0".to_string(),
2026 id,
2027 result: None,
2028 error: Some(crate::JsonRpcError {
2029 code,
2030 message: message.to_string(),
2031 data: None,
2032 }),
2033 };
2034 client.send_response(&response).await
2035}
2036
2037fn apply_transform_sections(
2041 sys_msg: &mut SystemMessageConfig,
2042 transforms: &dyn SystemMessageTransform,
2043) {
2044 sys_msg.mode = Some("customize".to_string());
2045 let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2046 for id in transforms.section_ids() {
2047 sections.entry(id).or_insert_with(|| SectionOverride {
2048 action: Some("transform".to_string()),
2049 content: None,
2050 });
2051 }
2052}
2053
2054fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2055 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2056 apply_transform_sections(sys_msg, transforms);
2057}
2058
2059fn inject_transform_sections_resume(
2060 config: &mut ResumeSessionConfig,
2061 transforms: &dyn SystemMessageTransform,
2062) {
2063 let sys_msg = config.system_message.get_or_insert_with(Default::default);
2064 apply_transform_sections(sys_msg, transforms);
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069 use serde_json::json;
2070
2071 use super::{
2072 direct_permission_payload, notification_permission_payload, pending_permission_result_kind,
2073 permission_request_response,
2074 };
2075 use crate::handler::{HandlerResponse, PermissionResult};
2076
2077 #[test]
2078 fn pending_permission_requests_use_decision_kinds() {
2079 assert_eq!(
2080 pending_permission_result_kind(&HandlerResponse::Permission(
2081 PermissionResult::Approved,
2082 )),
2083 "approve-once"
2084 );
2085 assert_eq!(
2086 pending_permission_result_kind(&HandlerResponse::Permission(PermissionResult::Denied)),
2087 "reject"
2088 );
2089 assert_eq!(
2090 pending_permission_result_kind(&HandlerResponse::Ok),
2091 "user-not-available"
2092 );
2093 }
2094
2095 #[test]
2096 fn direct_permission_requests_use_decision_response_kinds() {
2097 assert_eq!(
2098 serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2099 PermissionResult::Approved
2100 ),))
2101 .expect("serializing approved permission response should succeed"),
2102 json!({ "kind": "approve-once" })
2103 );
2104 assert_eq!(
2105 serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2106 PermissionResult::Denied
2107 ),))
2108 .expect("serializing denied permission response should succeed"),
2109 json!({ "kind": "reject" })
2110 );
2111 assert_eq!(
2112 serde_json::to_value(permission_request_response(&HandlerResponse::Ok))
2113 .expect("serializing fallback permission response should succeed"),
2114 json!({ "kind": "reject" })
2115 );
2116 }
2117
2118 #[test]
2119 fn notification_payload_handles_deferred_and_custom() {
2120 assert!(
2122 notification_permission_payload(&HandlerResponse::Permission(
2123 PermissionResult::Deferred,
2124 ))
2125 .is_none()
2126 );
2127
2128 let custom = json!({
2130 "kind": "approve-and-remember",
2131 "allowlist": ["ls", "grep"],
2132 });
2133 assert_eq!(
2134 notification_permission_payload(&HandlerResponse::Permission(
2135 PermissionResult::Custom(custom.clone()),
2136 )),
2137 Some(custom)
2138 );
2139
2140 assert_eq!(
2142 notification_permission_payload(&HandlerResponse::Permission(
2143 PermissionResult::Approved,
2144 )),
2145 Some(json!({ "kind": "approve-once" }))
2146 );
2147 assert_eq!(
2148 notification_permission_payload(
2149 &HandlerResponse::Permission(PermissionResult::Denied,)
2150 ),
2151 Some(json!({ "kind": "reject" }))
2152 );
2153 }
2154
2155 #[test]
2156 fn direct_payload_handles_deferred_and_custom() {
2157 let custom = json!({
2159 "kind": "approve-and-remember",
2160 "allowlist": ["ls", "grep"],
2161 });
2162 assert_eq!(
2163 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Custom(
2164 custom.clone(),
2165 ))),
2166 custom
2167 );
2168
2169 assert_eq!(
2171 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Deferred)),
2172 json!({ "kind": "approve-once" })
2173 );
2174
2175 assert_eq!(
2177 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Approved)),
2178 json!({ "kind": "approve-once" })
2179 );
2180 assert_eq!(
2181 direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Denied)),
2182 json!({ "kind": "reject" })
2183 );
2184 }
2185}