Skip to main content

unified_agent_sdk/providers/claude_code/
executor.rs

1//! Claude Code adapter implementation.
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use claude_code::{
11    ClaudeAgentOptions, ClaudeSdkClient, Error as ClaudeError, InputPrompt, Message, Prompt,
12    SubprocessCliTransport,
13};
14use tokio::sync::Mutex;
15
16use crate::error::{ExecutorError, Result};
17use crate::executor::{AgentCapabilities, AgentExecutor, AvailabilityStatus, SpawnConfig};
18use crate::session::{AgentSession, SessionMetadata};
19use crate::types::{ExecutorType, PermissionPolicy};
20
21const DEFAULT_SESSION_ID: &str = "default";
22const MAX_TRACKED_SESSIONS: usize = 64;
23static FALLBACK_SESSION_COUNTER: AtomicU64 = AtomicU64::new(1);
24
25/// Executor adapter backed by `claude_code::ClaudeSdkClient`.
26///
27/// # Examples
28///
29/// ```rust,no_run
30/// use unified_agent_sdk::{AgentExecutor, ClaudeCodeExecutor, executor::SpawnConfig};
31///
32/// # async fn run() -> unified_agent_sdk::Result<()> {
33/// let executor = ClaudeCodeExecutor::new();
34/// let cwd = std::env::current_dir()?;
35/// let _session = executor
36///     .spawn(&cwd, "Review this project architecture.", &SpawnConfig::default())
37///     .await?;
38/// # Ok(())
39/// # }
40/// ```
41#[derive(Clone)]
42pub struct ClaudeCodeExecutor {
43    base_options: ClaudeAgentOptions,
44    sessions: Arc<Mutex<HashMap<String, ClaudeSdkClient>>>,
45}
46
47impl ClaudeCodeExecutor {
48    /// Creates an executor with default Claude SDK options.
49    pub fn new() -> Self {
50        Self::with_options(ClaudeAgentOptions::default())
51    }
52
53    /// Creates an executor with pre-configured Claude SDK options.
54    ///
55    /// Per-session settings from [`SpawnConfig`] still take precedence at runtime.
56    pub fn with_options(options: ClaudeAgentOptions) -> Self {
57        Self {
58            base_options: options,
59            sessions: Arc::new(Mutex::new(HashMap::new())),
60        }
61    }
62
63    fn build_options(
64        &self,
65        working_dir: &Path,
66        config: &SpawnConfig,
67        resume_session: Option<&str>,
68        continue_conversation: bool,
69        fork_session: bool,
70    ) -> ClaudeAgentOptions {
71        let mut options = self.base_options.clone();
72        options.cwd = Some(working_dir.to_path_buf());
73
74        if config.model.is_some() {
75            options.model = config.model.clone();
76        }
77        if config.reasoning.is_some() {
78            options.effort = config.reasoning.clone();
79        }
80        if let Some(mode) = map_permission_policy(config.permission_policy) {
81            options.permission_mode = Some(mode);
82        }
83
84        options.env.extend(config.env.iter().cloned());
85        options.continue_conversation = continue_conversation;
86        options.resume = resume_session.map(str::to_owned);
87        options.fork_session = fork_session;
88        options
89    }
90
91    async fn store_client(&self, session_id: String, client: ClaudeSdkClient) -> Result<()> {
92        let old_client = {
93            let mut sessions = self.sessions.lock().await;
94            sessions.remove(&session_id)
95        };
96
97        if let Some(mut old_client) = old_client
98            && let Err(error) = old_client.disconnect().await
99        {
100            let mut sessions = self.sessions.lock().await;
101            sessions.entry(session_id).or_insert(old_client);
102            return Err(map_claude_error(
103                "failed to disconnect replaced claude session",
104                error,
105            ));
106        }
107
108        let mut sessions = self.sessions.lock().await;
109        let current_session_id = session_id.clone();
110        sessions.insert(current_session_id.clone(), client);
111        let evicted_client = if sessions.len() > MAX_TRACKED_SESSIONS {
112            let evicted_session_id = sessions
113                .keys()
114                .find(|session_id| *session_id != &current_session_id)
115                .cloned()
116                .unwrap_or_else(|| DEFAULT_SESSION_ID.to_string());
117            sessions.remove(&evicted_session_id)
118        } else {
119            None
120        };
121        drop(sessions);
122
123        if let Some(mut evicted_client) = evicted_client {
124            let _ = evicted_client.disconnect().await;
125        }
126
127        Ok(())
128    }
129
130    fn to_agent_session(
131        &self,
132        session_id: String,
133        working_dir: &Path,
134        context_window_override_tokens: Option<u32>,
135    ) -> AgentSession {
136        AgentSession::from_metadata_with_exit_status(
137            SessionMetadata {
138                session_id,
139                executor_type: ExecutorType::ClaudeCode,
140                working_dir: working_dir.to_path_buf(),
141                created_at: Utc::now(),
142                last_message_id: None,
143                context_window_override_tokens,
144            },
145            crate::types::ExitStatus {
146                code: None,
147                success: true,
148            },
149        )
150    }
151}
152
153impl Default for ClaudeCodeExecutor {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159#[async_trait]
160impl AgentExecutor for ClaudeCodeExecutor {
161    fn executor_type(&self) -> ExecutorType {
162        ExecutorType::ClaudeCode
163    }
164
165    async fn spawn(
166        &self,
167        working_dir: &Path,
168        prompt: &str,
169        config: &SpawnConfig,
170    ) -> Result<AgentSession> {
171        let options = self.build_options(working_dir, config, None, false, false);
172        let mut client = ClaudeSdkClient::new(Some(options), None);
173
174        client
175            .connect(None)
176            .await
177            .map_err(|error| map_claude_error("failed to connect claude sdk client", error))?;
178        client
179            .query(InputPrompt::Text(prompt.to_owned()), DEFAULT_SESSION_ID)
180            .await
181            .map_err(|error| {
182                map_claude_error("failed to send prompt to claude sdk client", error)
183            })?;
184
185        let messages = client.receive_response().await.map_err(|error| {
186            map_claude_error("failed to receive response from claude sdk client", error)
187        })?;
188        ensure_query_succeeded(&messages, "failed to execute prompt in claude sdk session")?;
189        let session_id = extract_session_id(&messages).unwrap_or_else(unique_fallback_session_id);
190
191        self.store_client(session_id.clone(), client).await?;
192        Ok(self.to_agent_session(
193            session_id,
194            working_dir,
195            config.context_window_override_tokens,
196        ))
197    }
198
199    async fn resume(
200        &self,
201        working_dir: &Path,
202        prompt: &str,
203        session_id: &str,
204        reset_to: Option<&str>,
205        config: &SpawnConfig,
206    ) -> Result<AgentSession> {
207        let options = self.build_options(working_dir, config, Some(session_id), true, false);
208        let mut client = ClaudeSdkClient::new(Some(options), None);
209
210        client
211            .connect(None)
212            .await
213            .map_err(|error| map_claude_error("failed to connect claude sdk client", error))?;
214
215        if let Some(reset_to) = reset_to {
216            client.rewind_files(reset_to).await.map_err(|error| {
217                map_claude_error("failed to rewind claude session files", error)
218            })?;
219        }
220
221        client
222            .query(InputPrompt::Text(prompt.to_owned()), session_id)
223            .await
224            .map_err(|error| {
225                map_claude_error("failed to send prompt to resumed claude session", error)
226            })?;
227
228        let messages = client.receive_response().await.map_err(|error| {
229            map_claude_error(
230                "failed to receive response from resumed claude session",
231                error,
232            )
233        })?;
234        ensure_query_succeeded(
235            &messages,
236            "failed to execute prompt in resumed claude session",
237        )?;
238        let resumed_session_id =
239            extract_session_id(&messages).unwrap_or_else(|| session_id.to_string());
240
241        self.store_client(resumed_session_id.clone(), client)
242            .await?;
243        Ok(self.to_agent_session(
244            resumed_session_id,
245            working_dir,
246            config.context_window_override_tokens,
247        ))
248    }
249
250    fn capabilities(&self) -> AgentCapabilities {
251        AgentCapabilities {
252            session_fork: true,
253            context_usage: true,
254            mcp_support: true,
255            structured_output: true,
256        }
257    }
258
259    fn availability(&self) -> AvailabilityStatus {
260        match resolve_cli_path(&self.base_options) {
261            Ok(path) => AvailabilityStatus {
262                available: true,
263                reason: Some(format!("Claude CLI found at {}", path.display())),
264            },
265            Err(reason) => AvailabilityStatus {
266                available: false,
267                reason: Some(reason),
268            },
269        }
270    }
271}
272
273fn map_permission_policy(policy: Option<PermissionPolicy>) -> Option<claude_code::PermissionMode> {
274    match policy {
275        Some(PermissionPolicy::Bypass) => Some(claude_code::PermissionMode::BypassPermissions),
276        Some(PermissionPolicy::Prompt) => Some(claude_code::PermissionMode::Default),
277        Some(PermissionPolicy::Deny) => Some(claude_code::PermissionMode::Plan),
278        None => None,
279    }
280}
281
282fn extract_session_id(messages: &[Message]) -> Option<String> {
283    messages.iter().rev().find_map(|message| match message {
284        Message::Result(result) => Some(result.session_id.clone()),
285        Message::StreamEvent(event) => Some(event.session_id.clone()),
286        _ => None,
287    })
288}
289
290fn unique_fallback_session_id() -> String {
291    let sequence = FALLBACK_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
292    let timestamp_nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
293    format!("{DEFAULT_SESSION_ID}-{timestamp_nanos}-{sequence}")
294}
295
296fn ensure_query_succeeded(messages: &[Message], context: &str) -> Result<()> {
297    let Some(result) = messages.iter().rev().find_map(|message| match message {
298        Message::Result(result) => Some(result),
299        _ => None,
300    }) else {
301        return Err(ExecutorError::execution_failed(
302            context,
303            "claude query completed without a terminal result message",
304        ));
305    };
306
307    if result.is_error {
308        let detail = result
309            .result
310            .as_deref()
311            .unwrap_or("claude query returned an error result");
312        return Err(ExecutorError::execution_failed(context, detail));
313    }
314
315    Ok(())
316}
317
318fn map_claude_error(context: &str, error: ClaudeError) -> ExecutorError {
319    match error {
320        ClaudeError::CLINotFound(err) => ExecutorError::unavailable(context, err),
321        ClaudeError::CLIConnection(err) => ExecutorError::spawn_failed(context, err),
322        ClaudeError::Io(err) => {
323            ExecutorError::Io(std::io::Error::new(err.kind(), format!("{context}: {err}")))
324        }
325        ClaudeError::Process(err) => ExecutorError::execution_failed(context, err),
326        ClaudeError::CLIJSONDecode(err) => ExecutorError::execution_failed(context, err),
327        ClaudeError::MessageParse(err) => ExecutorError::execution_failed(context, err),
328        ClaudeError::Json(err) => ExecutorError::Serialization(err),
329        ClaudeError::ClaudeSDK(err) => ExecutorError::invalid_config(context, err),
330        ClaudeError::Other(msg) => ExecutorError::other(context, msg),
331    }
332}
333
334fn resolve_cli_path(options: &ClaudeAgentOptions) -> std::result::Result<PathBuf, String> {
335    if let Some(cli_path) = &options.cli_path {
336        return validate_cli_path(cli_path, "configured Claude CLI path");
337    }
338
339    let transport = SubprocessCliTransport::new(Prompt::Messages, options.clone())
340        .map_err(|error| error.to_string())?;
341    let resolved = PathBuf::from(&transport.cli_path);
342    validate_cli_path(&resolved, "Claude CLI path resolved by SDK")
343}
344
345fn validate_cli_path(path: &Path, label: &str) -> std::result::Result<PathBuf, String> {
346    if !path.exists() {
347        return Err(format!("{label} does not exist: {}", path.display()));
348    }
349    if !path.is_file() {
350        return Err(format!(
351            "{label} must point to an executable file: {}",
352            path.display()
353        ));
354    }
355    if !is_executable_file(path) {
356        return Err(format!("{label} is not executable: {}", path.display()));
357    }
358    Ok(path.to_path_buf())
359}
360
361fn is_executable_file(path: &Path) -> bool {
362    #[cfg(unix)]
363    {
364        use std::os::unix::fs::PermissionsExt;
365        path.metadata()
366            .map(|metadata| metadata.permissions().mode() & 0o111 != 0)
367            .unwrap_or(false)
368    }
369    #[cfg(not(unix))]
370    {
371        path.is_file()
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use claude_code::ResultMessage;
379    use std::fs::{self, File};
380    use std::path::PathBuf;
381    use std::time::{SystemTime, UNIX_EPOCH};
382
383    fn result_message(is_error: bool, result: Option<&str>) -> Message {
384        Message::Result(ResultMessage {
385            subtype: if is_error {
386                "error".to_string()
387            } else {
388                "success".to_string()
389            },
390            duration_ms: 1,
391            duration_api_ms: 1,
392            is_error,
393            num_turns: 1,
394            session_id: "session-1".to_string(),
395            stop_reason: None,
396            total_cost_usd: None,
397            usage: None,
398            result: result.map(str::to_string),
399            structured_output: None,
400        })
401    }
402
403    #[test]
404    fn fallback_session_ids_are_unique() {
405        let first = unique_fallback_session_id();
406        let second = unique_fallback_session_id();
407
408        assert_ne!(first, second);
409        assert!(first.starts_with(DEFAULT_SESSION_ID));
410        assert!(second.starts_with(DEFAULT_SESSION_ID));
411    }
412
413    #[test]
414    fn query_success_check_rejects_error_result_messages() {
415        let messages = vec![result_message(true, Some("permission denied"))];
416
417        let error = ensure_query_succeeded(&messages, "spawn failed")
418            .expect_err("error result message should fail");
419        assert!(error.to_string().contains("permission denied"));
420    }
421
422    #[test]
423    fn query_success_check_accepts_success_result_messages() {
424        let messages = vec![result_message(false, Some("ok"))];
425        assert!(ensure_query_succeeded(&messages, "spawn failed").is_ok());
426    }
427
428    #[test]
429    fn query_success_check_rejects_missing_result_messages() {
430        let messages = Vec::new();
431
432        let error = ensure_query_succeeded(&messages, "spawn failed")
433            .expect_err("missing terminal result message should fail");
434        assert!(
435            error
436                .to_string()
437                .contains("without a terminal result message")
438        );
439    }
440
441    #[test]
442    fn resolve_cli_path_rejects_directory_override() {
443        let temp_dir = new_temp_path("claude-cli-dir");
444        fs::create_dir_all(&temp_dir).expect("directory should be created");
445
446        let options = ClaudeAgentOptions {
447            cli_path: Some(temp_dir.clone()),
448            ..ClaudeAgentOptions::default()
449        };
450
451        let result = resolve_cli_path(&options);
452        assert!(result.is_err());
453        assert!(
454            result
455                .expect_err("directory should be rejected")
456                .contains("must point to an executable file")
457        );
458
459        let _ = fs::remove_dir_all(temp_dir);
460    }
461
462    #[test]
463    fn resolve_cli_path_accepts_executable_file_override() {
464        let temp_file = new_temp_path("claude-cli-file");
465        File::create(&temp_file).expect("file should be created");
466        #[cfg(unix)]
467        {
468            use std::os::unix::fs::PermissionsExt;
469            let mut permissions = fs::metadata(&temp_file)
470                .expect("metadata should exist")
471                .permissions();
472            permissions.set_mode(0o755);
473            fs::set_permissions(&temp_file, permissions).expect("permissions should be set");
474        }
475
476        let options = ClaudeAgentOptions {
477            cli_path: Some(temp_file.clone()),
478            ..ClaudeAgentOptions::default()
479        };
480
481        let resolved = resolve_cli_path(&options).expect("file should be accepted");
482        assert_eq!(resolved, temp_file);
483
484        let _ = fs::remove_file(temp_file);
485    }
486
487    fn new_temp_path(prefix: &str) -> PathBuf {
488        std::env::temp_dir().join(format!(
489            "{prefix}-{}-{}",
490            std::process::id(),
491            SystemTime::now()
492                .duration_since(UNIX_EPOCH)
493                .expect("time should move forward")
494                .as_nanos()
495        ))
496    }
497}